import serial import serial.tools.list_ports import threading import time import json # ports_list = list(serial.tools.list_ports.comports()) # if len(ports_list) <= 0: # print("无串口设备。") # else: # print("可用的串口设备如下:") # for comport in ports_list: # print(list(comport)[0], list(comport)[1]) def crc16(data:bytearray,offset:int,len:int): if(len>0): crc=0xffff for i in range(len-offset): crc=(crc^data[i+offset])&0xffff for j in range(8): if(crc&1)!=0: crc=((crc>>1)^0xa001)&0xffff else: crc=(crc>>1)&0xffff return crc&0xff,(crc>>8)&0xff return 0,0 def crc32(data:bytearray): temp=0 crc=0xffffffff i=0 if(len(data)%4!=0): return 0 while(i None: self.cmd=0 self.cmd_no=0 self.str_err="" self.is_big_data=False self.num_to_recv=0 self.recv_data=bytearray() self.ser = serial.Serial(port=com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) def encode(self,data:bytearray): t=bytearray() length=len(data)+3 t.append(0x59) t.append(0x6d) t.append(length&0xff) t.append(length>>8) t.append(self.cmd) t.append(self.cmd_no&0xff) t.append(self.cmd_no>>8) t+=data a,b=crc16(t,2,length+4) t.append(a) t.append(b) # print("encode:",t.hex(",")) return t def decode(self,data:bytearray): self.str_err="ok" if(len(data)<10): print("recv data len too less.") self.str_err="recv data len too less." return bytearray() if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): print("frame head not 0x59 0x6d.") self.str_err="frame head not 0x59 0x6d." return bytearray() length=data[3]|(data[4]<<8) if(length==65535): # 重新设置数据长度 length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); self.is_big_data=True else: self.is_big_data=False if(length+7!=len(data)): print("recv data have lossed") self.str_err="recv data have lossed" return bytearray() a,b=crc16(data,3,length+5) if(a!=data[-2] or b!=data[-1]): print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) self.str_err="recv data check error." self.cmd_no=data[6]|(data[7]<<8) self.cmd=data[5] if(self.is_big_data==False): return data[8:-2] else: return data[12:-2] def recv(self,dolater): data=bytearray() while(self.ser.is_open): d=bytes() try: d=self.ser.read(1) except Exception as a: print("err:",str(a)) return data+=d if(len(data)==3): if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): self.num_to_recv=5 else: data=data[1:] self.num_to_recv=0 elif(len(data)==5): length=data[3]|(data[4]<<8) if(length<65535): self.num_to_recv+=length+2 self.is_big_data=False else: self.num_to_recv=12 self.is_big_data=True elif(len(data)==12): if(self.is_big_data==True): length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) self.num_to_recv=5+length+2 if(self.num_to_recv>0 and self.num_to_recv==len(data)): t=self.decode(data) self.recv_data+=t # dolater(self.cmd,t,self.str_err) print("recv",t.hex(",")) try: print("send next:") next(dolater) except: print("dolater end.") return data.clear() # else: # print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv)) def send(self,cmd:int,data:bytearray): self.cmd=cmd self.cmd_no+=1 # print("send:",data.hex(",")) self.ser.write(self.encode(data)) def start_recv(self,dolater): self.thread_ = threading.Thread(target=self.recv, args=(dolater,)) self.thread_.start() def wait(self): self.thread_.join() def close(self): if(self.ser.is_open): self.ser.close() def make_dolater(cmd:int,data:bytearray,u:protu): dataa=data def dolater(cmd:int,data_:bytearray,str_err:str): # print("cmd={d1},data={d2},err={d3}".format(d1=cmd,d2=data.hex(","),d3=str_err)) packet_all=(len(dataa)+199)//200 sent_bytes=0 packet_now=0 packet_bytes=200 while True: packet_bytes=200 if(len(dataa)0): packet_now+=1 t=bytearray() t.append(2) t.append(packet_now&0xff) t.append((packet_now>>8)&0xff) t.append(packet_all&0xff) t.append((packet_all>>8)&0xff) t+=dataa[sent_bytes:sent_bytes+packet_bytes] u.send(cmd,t) sent_bytes+=packet_bytes yield else: print("send done.") u.close() break return dolater(cmd,data,u) def scheme_task_to_byte(j:json): data=bytearray() data.append(j["TaskID"]) data.append(j["TaskIndex"]) data.append(j["RetryCount"]) data.append(j["ErrJumpTo"]) data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4)) for i in j["ParamVal"]: data.append(i&0xff) data.append(i>>8) return data def scheme_tasks_to_byte(j:json): data=bytearray() for i in j["TaskArray"]: data+=scheme_task_to_byte(i) length=len(data) if(length<(0x700-4)): for i in range(0x700-4-length): data.append(0xff) return data def scheme_taskids_to_byte(j:json): t=bytearray() t.append(j["PlanID"]&0xff) t.append((j["PlanID"]>>8)&0xff) t.append((j["PlanID"]>>16)&0xff) t.append((j["PlanID"]>>24)&0xff) for i in j["TaskArray"]: t.append(i["TaskID"]) length=len(t) if(length<(0x100)): for i in range(0x100-length): t.append(0xff) return t def scheme_to_byte(j:json): t=bytearray() t+=scheme_taskids_to_byte(j) t+=scheme_tasks_to_byte(j) crc=crc32(t) t.append(crc&0xff) t.append((crc>>8)&0xff) t.append((crc>>16)&0xff) t.append((crc>>24)&0xff) return t # 1是写,0,是读 def send_file(com:str,cmd:int,name:str,data:bytearray): u=protu(com) u.start_recv(make_dolater(cmd,data,u)) length=len(data) t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff]) t+=name.encode() u.send(cmd,t) u.wait() # 0x30 开始检测 # 0x31 检测上报 # 0x32 读写方案 # 0x33 连接维护 # 0x34 自检数据 # 0x35 自检上报 # 0x36 升级脚本 # 0xed 主板升级 # 0xee 小板升级 # with open("file/judge.lua","rb") as f: # send_file("COM5",0x36,f.name.split('/')[-1],f.read()) with open("file/checker_ye_cfg.json","rb") as f: json_obj=json.loads(f.read()) d=scheme_to_byte(json_obj) print("len=",len(d),d.hex(",")) # send_file("COM5",0x32,f.name.split('/')[-1],f.read())