import serial import threading def crc8(data:bytearray): crc=0 num=len(data) for j in range(num): crc^=data[j] crc&=0xff for i in range(8): if((crc&0x01)!=0): crc>>=1 crc^=0x8c crc&=0xff else: crc>>=1 return crc class protm: def __init__(self,com:str) -> None: self.cmd=0 self.cmd_no=0 self.addr=0 self.host_addr=0 self.str_err="" self.is_big_data=False self.num_to_recv=0 self.recv_data=bytearray() self.com=com def init(self): self.ser = serial.Serial(port=self.com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) def encode(self,data:bytearray): t=bytearray() length=len(data)+10 t.append(0x59) t.append(0x65) t.append(length&0xff) t.append(length>>8) t.append(self.addr) t.append(self.host_addr) t.append(self.cmd) t.append(self.cmd_no&0xff) t.append(self.cmd_no>>8) t+=data a=crc8(t) t.append(a) # print("encode:",t.hex(",")) return t def decode(self,data:bytearray): self.str_err="ok" if(len(data)<10): print("recv data len too less.") self.str_err="recv data len too less." return bytearray() self.host_addr=data[4] self.addr=data[5]# 回应任意地址的命令 length=data[2]|(data[3]<<8) crc=crc8(data[:-1]) if(length!=len(data)): print("recv data have lossed") self.str_err="recv data have lossed" return bytearray() self.cmd_no=data[7]|(data[8]<<8) self.cmd=data[6] if(crc!=data[-1]): print("recv data check error.h_crc=%02x,crc=%02x",crc,data[-1]) self.str_err="recv data check error." return data[9:-1] def recv(self): data=bytearray() while(self.ser.is_open): d=bytes() try: d=self.ser.read(1) except Exception as a: print("err:",str(a)) return data+=d if(len(data)==2): if(data[0]==0x59 and data[1]==0x65): self.num_to_recv=4 else: print("data={d}".format(d=data.hex(","))) data=data[1:] self.num_to_recv=0 elif(len(data)==4): length=data[2]|(data[3]<<8) self.num_to_recv=length if(self.num_to_recv>0 and self.num_to_recv==len(data)): t=self.decode(data) self.recv_data+=t # print("recv",t.hex(",")) self.handle(self.cmd,t) data.clear() # else: # print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv)) def send(self,cmd:int,data:bytearray): self.cmd=cmd self.cmd_no+=1 print("send:",data.hex(",")) self.ser.write(self.encode(data)) def start_recv(self): self.thread_ = threading.Thread(target=self.recv, args=()) self.thread_.start() def wait(self): self.thread_.join() def close(self): if(self.ser.is_open): self.ser.close() def handle(self,cmd:int,data:bytearray): print("cmd={d1},data={d2}".format(d1=cmd,d2=data.hex(","))) if(self.addr==1): print("skip addr:1") return t=bytearray() for i in range(23): t.append(i+1) self.send(0x16,t) u=protm("com5") # u.start_recv() # u.wait() u.cmd=1 u.addr=1 u.host_addr=0 print(u.encode(bytearray()).hex(","))