import serial import serial.tools.list_ports import threading import time import json import socket from PyQt5.QtCore import * def crc16(data:bytearray,offset:int,len:int): if(len>0): crc=0xffff for i in range(len-offset): crc=(crc^data[i+offset])&0xffff for j in range(8): if(crc&1)!=0: crc=((crc>>1)^0xa001)&0xffff else: crc=(crc>>1)&0xffff return crc&0xff,(crc>>8)&0xff return 0,0 def crc32(data:bytearray): temp=0 crc=0xffffffff i=0 if(len(data)%4!=0): return 0 while(iNone: self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ser.bind(("",port)) self.ser.settimeout(10) self.ser.listen(128) print("wait for mcu connect.") self.client,self.client_addr=self.ser.accept() print("client:",self.client_addr) self.is_open=True def read(self,len:int): # return self.ser.recv(len) return self.client.recv(len) def write(self,data:bytearray): # return self.ser.send(data) return self.client.send(data) def close(self): self.client.close() self.ser.close() self.is_open=False # 生成一个任务的参数 def scheme_task_to_byte(j:json): data=bytearray() data.append(j["TaskID"]) data.append(j["TaskIndex"]) data.append(j["RetryCount"]) data.append(j["ErrJumpTo"]) data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4)) for i in j["ParamVal"]: data.append(i&0xff) data.append(i>>8) return data # 生成任务参数序列 def scheme_tasks_to_byte(j:json): data=bytearray() for i in j["TaskArray"]: data+=scheme_task_to_byte(i) length=len(data) if(length<(0x700-4)): for i in range(0x700-4-length): data.append(0xff) return data # 生成任务id序列 def scheme_taskids_to_byte(j:json): t=bytearray() t.append(j["PlanID"]&0xff) t.append((j["PlanID"]>>8)&0xff) t.append((j["PlanID"]>>16)&0xff) t.append((j["PlanID"]>>24)&0xff) for i in j["TaskArray"]: t.append(i["TaskID"]) length=len(t) if(length<(0x100)): for i in range(0x100-length): t.append(0xff) return t # 根据方案生成小板用的字节数据 def scheme_to_byte(j:json): t=bytearray() t+=scheme_taskids_to_byte(j) t+=scheme_tasks_to_byte(j) crc=crc32(t) t.append(crc&0xff) t.append((crc>>8)&0xff) t.append((crc>>16)&0xff) t.append((crc>>24)&0xff) return t # int转数组 def arr_from_int(num:int): return bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff]) # 提取方案中的范围数据, 先max后min def scheme_get_task_range(j:json): t=bytearray() t+=arr_from_int(j["TaskID"]) t+=arr_from_int(j["TaskIndex"]) t+=arr_from_int(j["ReturnCount"]) t+=arr_from_int(j["ExecuteErrCode"]) index=0 for i in j["TestStandard"]: t+=arr_from_int(i["Max"]) t+=arr_from_int(i["Min"]) if (index None: pass def get_rate(self): return self.packet_now*100//self.packet_all def get_data(self): if(self.stat==0): length=len(self.data) # 1是写,0是读 t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff]) t+=self.name.encode() self.stat=1 return t elif(self.stat==1): packet_bytes=200 if(len(self.data)0): self.packet_now+=1 t=bytearray() t.append(2) t.append(self.packet_now&0xff) t.append((self.packet_now>>8)&0xff) t.append(self.packet_all&0xff) t.append((self.packet_all>>8)&0xff) t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes] self.sent_bytes+=packet_bytes return t else: print("send done.") return bytearray() # 设置要发送的文件 def set_file(self,name:str): self.data=bytearray() with open(name,"rb") as f: self.data=f.read() self.name=f.name.split('/')[-1] self.stat=0 self.packet_all=(len(self.data)+199)//200 self.sent_bytes=0 self.packet_now=0 self.packet_bytes=200 # 设置要发送的方案 def set_json(self,name:str): self.data=bytearray() with open(name,"rb") as f: json_obj=json.loads(f.read()) d=scheme_to_byte(json_obj) # print("len=",len(d),d.hex(",")) d+=scheme_to_host(json_obj) # print("len=",len(d),d.hex(",")) self.data=d self.name=f.name.split('/')[-1] self.stat=0 self.packet_all=(len(self.data)+199)//200 self.sent_bytes=0 self.packet_now=0 self.packet_bytes=200 class protu(QObject): # 进度信号,ip,1~100 rate_signal =pyqtSignal([int]) # 结束信号,ip,成败,描述 end_signal = pyqtSignal([bool,str]) # 接收到数据信号 recv_signal =pyqtSignal([int,bytearray,str]) hand=handle() def __init__(self) -> None: QObject.__init__(self) self.cmd=0 self.cmd_no=0 self.str_err="" self.is_big_data=False self.num_to_recv=0 self.recv_data=bytearray() def init(self,com:str): s=com.split(":") try: if(s[0]=="utcp"): self.ser = utcp(int(s[1])) else: bsp=int(s[1]) self.ser = serial.Serial(port=s[0], baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) except Exception as e: print(str(e)) return False return True def encode(self,data:bytearray): t=bytearray() length=len(data)+3 t.append(0x59) t.append(0x6d) t.append(length&0xff) t.append(length>>8) t.append(self.cmd) t.append(self.cmd_no&0xff) t.append(self.cmd_no>>8) t+=data a,b=crc16(t,2,length+4) t.append(a) t.append(b) # print("encode:",t.hex(",")) return t def decode(self,data:bytearray): self.str_err="ok" if(len(data)<10): print("recv data len too less.") self.str_err="recv data len too less." return bytearray() if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): # print("frame head not 0x59 0x6d.") self.str_err="frame head not 0x59 0x6d." return bytearray() length=data[3]|(data[4]<<8) if(length==65535): # 重新设置数据长度 length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); self.is_big_data=True else: self.is_big_data=False if(length+7!=len(data)): print("recv data have lossed") self.str_err="recv data have lossed" return bytearray() a,b=crc16(data,3,length+5) if(a!=data[-2] or b!=data[-1]): print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) self.str_err="recv data check error." self.cmd_no=data[6]|(data[7]<<8) self.cmd=data[5] if(self.is_big_data==False): return bytearray(data[8:-2]) else: return bytearray(data[12:-2]) # 不带校验的接收函数 def recv_(self): while(self.ser.is_open): try: data=self.ser.read(500) except Exception as a: # print("err:",str(a)) print("port closed") return if(len(data)>0): t=self.decode(data) if(self.str_err=="ok"): self.recv_data+=t # print("recv",t.hex(",")) # print(type(self.cmd),type(t),type(self.str_err)) self.recv_signal.emit(self.cmd,t,self.str_err) # self.send_file_next(self.cmd,t,self.str_err) # print("sent signal---") else: print(data.decode("utf-8")) # 带帧校验的接收函数 def recv(self): # self.recv_signal.connect(self.send_file_next) data=bytearray() while(self.ser.is_open): d=bytes() try: d=self.ser.read(1) except Exception as a: # print("err:",str(a)) print("port closed") return data+=d if(len(data)==3): if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): self.num_to_recv=5 else: data=data[1:] self.num_to_recv=0 elif(len(data)==5): length=data[3]|(data[4]<<8) if(length<65535): self.num_to_recv+=length+2 self.is_big_data=False else: self.num_to_recv=12 self.is_big_data=True elif(len(data)==12): if(self.is_big_data==True): length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) self.num_to_recv=5+length+2 if(self.num_to_recv>0 and self.num_to_recv==len(data)): # print("recv",data.hex(",")) t=self.decode(data) self.recv_data+=t self.recv_signal.emit(self.cmd,t,self.str_err) # self.send_file_next(self.cmd,t,self.str_err) # print("sent signal---") data.clear() # else: # print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv)) def send(self,cmd:int,data:bytearray): self.cmd=cmd self.cmd_no+=1 d=self.encode(data) print("send",d.hex(" ")) self.ser.write(d) def send_str(self,txt:str): self.ser.write(txt.encode("utf-8")) def start_recv(self): self.thread_ = threading.Thread(target=self.recv, args=()) self.thread_.start() def wait(self): self.thread_.join() def close(self): try: if(self.ser.is_open): self.ser.close() except Exception as e: print(str(e)) def send_file_next(self,cmd:int,data:bytearray,err:str): print("send_file_next") data=self.hand.get_data() if(len(data)>0): self.send(cmd,data) else: self.close() def send_file(self,cmd:int,name:str): self.start_recv() self.hand.set_file(name) # self.send_file_next(cmd,bytearray([0]),"ok") self.recv_signal.emit(cmd,bytearray([0]),"ok") self.wait() # 0x30 开始检测 # 0x31 检测上报 # 0x32 读写方案 # 0x33 连接维护 # 0x34 自检数据 # 0x35 自检上报 # 0x36 升级脚本 # 0xed 主板升级 # 0xee 小板升级 # 0x40 电机矫正 # 0x41 小板电阻矫正 # 0x42 小板电阻测量 # 0x43 小板硬件版本 # with open("file/judge.lua","rb") as f: # send_file("COM5",0x36,f.name.split('/')[-1],f.read()) # with open("file/JQ_JCXB_V54.bin","rb") as f: # send_file("COM5",0xee,f.name.split('/')[-1],f.read()) # with open("file/checker_ye_cfg.json","rb") as f: # json_obj=json.loads(f.read()) # d=scheme_to_byte(json_obj) # print("len=",len(d),d.hex(",")) # d=scheme_to_host(json_obj) # print("len=",len(d),d.hex(",")) # send_file("COM5",0x32,f.name.split('/')[-1],f.read()) if __name__ == "__main__": u=protu() # u.init("utcp:7777") # u.send_file(0xee,"file/JQ_JCXB_V54.bin") # u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin") # 设置电阻 矫正值 # u.cmd=0x41 # data=bytearray([1,100,0x00,2,0,0x00,3,0,0x00,4,0,0x00,5,0,0x00,6,0,0x00,7,0,0x00,8,0,0x00,9,0,0x00,10,0,0x00,11,0,0x00,12,0,0x00,13,0,0x00,14,0,0x00,15,0,0x00,16,0,0x00,17,0,0x00,18,0,0x00,19,0,0x00,20,0,0x00]) # 测量电阻 # u.cmd=0x42 # data=bytearray([5]) # 设置硬件版本号 # u.cmd=0x43 # data=bytearray([1,50,0x00,2,51,0x00,3,52,0x00,4,53,0x00,5,54,0x00,6,55,0x00,7,56,0x00,8,57,0x00,9,58,0x00,10,59,0x00,11,60,0x00,12,61,0x00,13,62,0x00,14,63,0x00,15,64,0x00,16,65,0x00,17,66,0x00,18,67,0x00,19,68,0x00,20,69,0x00]) # 设置电机校正值 # u.cmd=0x40 # data=bytearray([0x01,100,0]) # data=bytearray([0x02]) # 上升 # data=bytearray([0x03]) # 下降 # # 直接测量 # u.cmd=0x44 # data=bytearray([5]) # 赋码仪新检测命令 # u.cmd=0x10 # data=bytearray([0xff,0x03,0,0,0,0]) # 批检仪新检测命令,写入流水号 # u.cmd=0x48 # data=bytearray([2,1,2,3,0, 3,5,6,7,0, 9,8,9,10,0]) u.cmd=0x50 data=bytearray([6]) print(u.encode(data).hex(' ')) # with open("file/EX_Coder_Test_2023-07-6.json","rb") as f: # json_obj=json.loads(f.read()) # d=scheme_to_byte(json_obj) # print("len=",len(d),d.hex(",")) # d+=scheme_to_host(json_obj) # print(int2str(20)) # s="{d:03d}".format(d=2) # print(s) # with open("file/7-15.json","rb") as f: # u.cmd=0x22 # p=u.encode(f.read()) # print(p.hex(' ')) # 开始检测 # 59 6d 03 00 30 00 00 60 0f # 结束应答 # 59 6D 03 00 31 00 00 31 CF # 直接检测 # 59 6d 04 00 44 00 00 05 d5 6c # 电机归零 # 59 6d 04 00 50 00 00 01 d1 5f # 电机下降20000步 # 59 6d 06 00 50 00 00 02 20 4e f4 45 # 电机上升20000步 # 59 6d 06 00 50 00 00 03 20 4e a5 85 # 测量桥丝阻值 # 59 6d 04 00 50 00 00 04 11 5c # 总线设置20V # 59 6d 06 00 50 00 00 05 c8 00 8b b0 # 关总线 # 59 6d 04 00 50 00 00 06 90 9d # 检总线电流 # 59 6d 04 00 50 00 00 07 51 5d