import serial import serial.tools.list_ports import threading import time import json import socket from PyQt5.QtCore import * def crc16(data:bytearray,offset:int,len:int): if(len>0): crc=0xffff for i in range(len-offset): crc=(crc^data[i+offset])&0xffff for j in range(8): if(crc&1)!=0: crc=((crc>>1)^0xa001)&0xffff else: crc=(crc>>1)&0xffff return crc&0xff,(crc>>8)&0xff return 0,0 def crc32(data:bytearray): temp=0 crc=0xffffffff i=0 if(len(data)%4!=0): return 0 while(iNone: self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ser.bind(("",port)) self.ser.settimeout(10) self.ser.listen(128) print("wait for mcu connect.") self.client,self.client_addr=self.ser.accept() print("client:",self.client_addr) self.is_open=True def read(self,len:int): # return self.ser.recv(len) return self.client.recv(len) def write(self,data:bytearray): # return self.ser.send(data) return self.client.send(data) def close(self): self.client.close() self.ser.close() self.is_open=False # 生成一个任务的参数 def scheme_task_to_byte(j:json): data=bytearray() data.append(j["TaskID"]) data.append(j["TaskIndex"]) data.append(j["RetryCount"]) data.append(j["ErrJumpTo"]) data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4)) for i in j["ParamVal"]: data.append(i&0xff) data.append(i>>8) return data # 生成任务参数序列 def scheme_tasks_to_byte(j:json): data=bytearray() for i in j["TaskArray"]: data+=scheme_task_to_byte(i) length=len(data) if(length<(0x700-4)): for i in range(0x700-4-length): data.append(0xff) return data # 生成任务id序列 def scheme_taskids_to_byte(j:json): t=bytearray() t.append(j["PlanID"]&0xff) t.append((j["PlanID"]>>8)&0xff) t.append((j["PlanID"]>>16)&0xff) t.append((j["PlanID"]>>24)&0xff) for i in j["TaskArray"]: t.append(i["TaskID"]) length=len(t) if(length<(0x100)): for i in range(0x100-length): t.append(0xff) return t # 根据方案生成小板用的字节数据 def scheme_to_byte(j:json): t=bytearray() t+=scheme_taskids_to_byte(j) t+=scheme_tasks_to_byte(j) crc=crc32(t) t.append(crc&0xff) t.append((crc>>8)&0xff) t.append((crc>>16)&0xff) t.append((crc>>24)&0xff) return t # int转数组 def arr_from_int(num:int): return bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff]) # 提取方案中的范围数据, 先max后min def scheme_get_task_range(j:json): t=bytearray() return_count=j["ReturnCount"] for i in j["TestStandard"]: t+=arr_from_int(i["Max"]) t+=arr_from_int(i["Min"]) n=return_count-len(j["TestStandard"]) for i in range(n): t+=arr_from_int(65535) t+=arr_from_int(0) return t # 根据方案生成主板用的字节数据 def scheme_to_host(j:json): t=bytearray() t+=arr_from_int(j["PlanID"]) t+=arr_from_int(j["TimeOutM"]) b=bytearray() for i in j["TaskArray"]: b+=scheme_get_task_range(i) t+=arr_from_int(len(b)//8) t+=b return t # 发送文件类 class handle: stat=0 name="" def __init__(self) -> None: pass def get_rate(self): return self.packet_now*100//self.packet_all def get_data(self): if(self.stat==0): length=len(self.data) # 1是写,0是读 t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff]) t+=self.name.encode() self.stat=1 return t elif(self.stat==1): packet_bytes=200 if(len(self.data)0): self.packet_now+=1 t=bytearray() t.append(2) t.append(self.packet_now&0xff) t.append((self.packet_now>>8)&0xff) t.append(self.packet_all&0xff) t.append((self.packet_all>>8)&0xff) t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes] self.sent_bytes+=packet_bytes return t else: print("send done.") return bytearray() # 设置要发送的文件 def set_file(self,name:str): self.data=bytearray() with open(name,"rb") as f: self.data=f.read() self.name=f.name.split('/')[-1] self.stat=0 self.packet_all=(len(self.data)+199)//200 self.sent_bytes=0 self.packet_now=0 self.packet_bytes=200 # 设置要发送的方案 def set_json(self,name:str): self.data=bytearray() with open(name,"rb") as f: json_obj=json.loads(f.read()) d=scheme_to_byte(json_obj) # print("len=",len(d),d.hex(",")) d+=scheme_to_host(json_obj) # print("len=",len(d),d.hex(",")) self.data=d self.name=f.name.split('/')[-1] self.stat=0 self.packet_all=(len(self.data)+199)//200 self.sent_bytes=0 self.packet_now=0 self.packet_bytes=200 class protu(QObject): # 进度信号,ip,1~100 rate_signal =pyqtSignal([int]) # 结束信号,ip,成败,描述 end_signal = pyqtSignal([bool,str]) # 接收到数据信号 recv_signal =pyqtSignal([int,bytearray,str]) hand=handle() def __init__(self) -> None: QObject.__init__(self) self.cmd=0 self.cmd_no=0 self.str_err="" self.is_big_data=False self.num_to_recv=0 self.recv_data=bytearray() def init(self,com:str): s=com.split(":") try: if(s[0]=="utcp"): self.ser = utcp(int(s[1])) else: self.ser = serial.Serial(port=s[0], baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) except Exception as e: print(str(e)) return False return True def encode(self,data:bytearray): t=bytearray() length=len(data)+3 t.append(0x59) t.append(0x6d) t.append(length&0xff) t.append(length>>8) t.append(self.cmd) t.append(self.cmd_no&0xff) t.append(self.cmd_no>>8) t+=data a,b=crc16(t,2,length+4) t.append(a) t.append(b) # print("encode:",t.hex(",")) return t def decode(self,data:bytearray): self.str_err="ok" if(len(data)<10): print("recv data len too less.") self.str_err="recv data len too less." return bytearray() if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): print("frame head not 0x59 0x6d.") self.str_err="frame head not 0x59 0x6d." return bytearray() length=data[3]|(data[4]<<8) if(length==65535): # 重新设置数据长度 length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); self.is_big_data=True else: self.is_big_data=False if(length+7!=len(data)): print("recv data have lossed") self.str_err="recv data have lossed" return bytearray() a,b=crc16(data,3,length+5) if(a!=data[-2] or b!=data[-1]): print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) self.str_err="recv data check error." self.cmd_no=data[6]|(data[7]<<8) self.cmd=data[5] if(self.is_big_data==False): return data[8:-2] else: return data[12:-2] def recv(self): # self.recv_signal.connect(self.send_file_next) data=bytearray() while(self.ser.is_open): d=bytes() try: d=self.ser.read(1) except Exception as a: # print("err:",str(a)) print("port closed") return data+=d if(len(data)==3): if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): self.num_to_recv=5 else: data=data[1:] self.num_to_recv=0 elif(len(data)==5): length=data[3]|(data[4]<<8) if(length<65535): self.num_to_recv+=length+2 self.is_big_data=False else: self.num_to_recv=12 self.is_big_data=True elif(len(data)==12): if(self.is_big_data==True): length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) self.num_to_recv=5+length+2 if(self.num_to_recv>0 and self.num_to_recv==len(data)): t=self.decode(data) self.recv_data+=t # print("recv",t.hex(",")) self.recv_signal.emit(self.cmd,t,self.str_err) # self.send_file_next(self.cmd,t,self.str_err) # print("sent signal---") data.clear() # else: # print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv)) def send(self,cmd:int,data:bytearray): self.cmd=cmd self.cmd_no+=1 # print("send:",data.hex(",")) self.ser.write(self.encode(data)) def start_recv(self): self.thread_ = threading.Thread(target=self.recv, args=()) self.thread_.start() def wait(self): self.thread_.join() def close(self): try: if(self.ser.is_open): self.ser.close() except Exception as e: print(str(e)) def send_file_next(self,cmd:int,data:bytearray,err:str): print("send_file_next") data=self.hand.get_data() if(len(data)>0): self.send(cmd,data) else: self.close() def send_file(self,cmd:int,name:str): self.start_recv() self.hand.set_file(name) # self.send_file_next(cmd,bytearray([0]),"ok") self.recv_signal.emit(cmd,bytearray([0]),"ok") self.wait() # 0x30 开始检测 # 0x31 检测上报 # 0x32 读写方案 # 0x33 连接维护 # 0x34 自检数据 # 0x35 自检上报 # 0x36 升级脚本 # 0xed 主板升级 # 0xee 小板升级 # with open("file/judge.lua","rb") as f: # send_file("COM5",0x36,f.name.split('/')[-1],f.read()) # with open("file/JQ_JCXB_V54.bin","rb") as f: # send_file("COM5",0xee,f.name.split('/')[-1],f.read()) # with open("file/checker_ye_cfg.json","rb") as f: # json_obj=json.loads(f.read()) # d=scheme_to_byte(json_obj) # print("len=",len(d),d.hex(",")) # d=scheme_to_host(json_obj) # print("len=",len(d),d.hex(",")) # send_file("COM5",0x32,f.name.split('/')[-1],f.read()) if __name__ == "__main__": u=protu() # u.init("utcp:7777") # u.send_file(0xee,"file/JQ_JCXB_V54.bin") # u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin") u.cmd=0x34 print(u.encode(bytearray()).hex(' '))