import serial import serial.tools.list_ports import threading import time import socket import checker_save import sys def crc16(data:bytearray,offset:int,len:int): if(len>0): crc=0xffff for i in range(len-offset): crc=(crc^data[i+offset])&0xffff for j in range(8): if(crc&1)!=0: crc=((crc>>1)^0xa001)&0xffff else: crc=(crc>>1)&0xffff return crc&0xff,(crc>>8)&0xff return 0,0 # 把tcp封装为串口 class utcp: is_open=False def __init__(self,port:int)->None: self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ser.bind(("",port)) self.ser.settimeout(10) self.ser.listen(128) print("wait for mcu connect.") self.client,self.client_addr=self.ser.accept() print("client:",self.client_addr) self.is_open=True def read(self,len:int): return self.client.recv(len) def write(self,data:bytearray): return self.client.send(data) def close(self): self.client.close() self.ser.close() self.is_open=False class port: def __init__(self) -> None: self.cmd=0 self.cmd_no=0 self.str_err="" self.is_big_data=False self.num_to_recv=0 self.recv_data=bytearray() def open(self,name:str,bsp:int): if(name!="utcp"): self.ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) else: self.ser=utcp(bsp) def start_recv(self): self.thread_ = threading.Thread(target=self.recv, args=()) self.thread_.start() def decode(self,data:bytearray): self.str_err="ok" if(len(data)<10): print("recv data len too less.") self.str_err="recv data len too less." return bytearray() if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): print("frame head not 0x59 0x6d.") self.str_err="frame head not 0x59 0x6d." return bytearray() length=data[3]|(data[4]<<8) if(length==65535): # 重新设置数据长度 length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); self.is_big_data=True else: self.is_big_data=False if(length+7!=len(data)): print("recv data have lossed") self.str_err="recv data have lossed" return bytearray() a,b=crc16(data,3,length+5) if(a!=data[-2] or b!=data[-1]): print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) self.str_err="recv data check error." self.cmd_no=data[6]|(data[7]<<8) self.cmd=data[5] if(self.is_big_data==False): return data[8:-2] else: return data[12:-2] def recv(self): data=bytearray() while(True): d=bytes() try: d=self.ser.read(1) except Exception as a: # print("err:",str(a)) print("port closed") return data+=d if(len(data)==3): if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): self.num_to_recv=5 else: data=data[1:] self.num_to_recv=0 elif(len(data)==5): length=data[3]|(data[4]<<8) if(length<65535): self.num_to_recv+=length+2 self.is_big_data=False else: self.num_to_recv=12 self.is_big_data=True elif(len(data)==12): if(self.is_big_data==True): length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) self.num_to_recv=5+length+2 if(self.num_to_recv>0 and self.num_to_recv==len(data)): # print("recv:",data.hex(" ")) return self.decode(data) def encode(self,data:bytearray): t=bytearray() length=len(data)+3 t.append(0x59) t.append(0x6d) t.append(length&0xff) t.append(length>>8) t.append(self.cmd) t.append(self.cmd_no&0xff) t.append(self.cmd_no>>8) t+=data a,b=crc16(t,2,length+4) t.append(a) t.append(b) # print("encode:",t.hex(",")) return t def send(self,cmd:int,data:bytearray): self.cmd=cmd self.cmd_no+=1 d=self.encode(data) # print("send:",d.hex(",")) self.ser.write(d) # 测试批检仪 def checker_test(self,times:int,scheme:str,script:str): print("批检仪测试:") tick=0 save=checker_save.save(scheme,script) data=bytearray() while(tick