520 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			520 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import serial | |||
|  | import serial.tools.list_ports | |||
|  | import threading | |||
|  | import time | |||
|  | import json | |||
|  | import socket | |||
|  | from PyQt5.QtCore import * | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | def crc16(data:bytearray,offset:int,len:int): | |||
|  |     if(len>0): | |||
|  |         crc=0xffff | |||
|  |         for i in range(len-offset): | |||
|  |             crc=(crc^data[i+offset])&0xffff | |||
|  |             for j in range(8): | |||
|  |                 if(crc&1)!=0: | |||
|  |                     crc=((crc>>1)^0xa001)&0xffff | |||
|  |                 else: | |||
|  |                     crc=(crc>>1)&0xffff | |||
|  |         return crc&0xff,(crc>>8)&0xff | |||
|  |     return 0,0 | |||
|  | 
 | |||
|  | 
 | |||
|  | def crc32(data:bytearray): | |||
|  |     temp=0 | |||
|  |     crc=0xffffffff | |||
|  |     i=0 | |||
|  |     if(len(data)%4!=0): | |||
|  |         return 0 | |||
|  |     while(i<len(data)): | |||
|  |         temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24) | |||
|  |         i+=4 | |||
|  |         for j in range(32): | |||
|  |             if((crc^temp)&0x80000000)!=0: | |||
|  |                 crc=0x04c11db7^(crc<<1) | |||
|  |             else: | |||
|  |                 crc<<=1 | |||
|  |             temp<<=1 | |||
|  |         crc&=0xffffffff | |||
|  |     return crc | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | # 把tcp封装为串口 | |||
|  | class utcp: | |||
|  |     is_open=False | |||
|  |     def __init__(self,port:int)->None: | |||
|  |         self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |||
|  |         self.ser.bind(("",port)) | |||
|  |         self.ser.settimeout(10) | |||
|  |         self.ser.listen(128) | |||
|  |         print("wait for mcu connect.") | |||
|  |         self.client,self.client_addr=self.ser.accept() | |||
|  |         print("client:",self.client_addr) | |||
|  |         self.is_open=True | |||
|  |     def read(self,len:int): | |||
|  |         # return self.ser.recv(len) | |||
|  |         return self.client.recv(len) | |||
|  |     def write(self,data:bytearray): | |||
|  |         # return self.ser.send(data) | |||
|  |         return self.client.send(data) | |||
|  |     def close(self): | |||
|  |         self.client.close() | |||
|  |         self.ser.close() | |||
|  |         self.is_open=False | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | # 生成一个任务的参数 | |||
|  | def scheme_task_to_byte(j:json): | |||
|  |     data=bytearray() | |||
|  |     data.append(j["TaskID"]) | |||
|  |     data.append(j["TaskIndex"]) | |||
|  |     data.append(j["RetryCount"]) | |||
|  |     data.append(j["ErrJumpTo"]) | |||
|  |     data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4)) | |||
|  |     for i in j["ParamVal"]: | |||
|  |         data.append(i&0xff) | |||
|  |         data.append(i>>8) | |||
|  |     return data | |||
|  | 
 | |||
|  | # 生成任务参数序列 | |||
|  | def scheme_tasks_to_byte(j:json): | |||
|  |     data=bytearray() | |||
|  |     for i in j["TaskArray"]: | |||
|  |         data+=scheme_task_to_byte(i) | |||
|  |     length=len(data) | |||
|  |     if(length<(0x700-4)): | |||
|  |         for i in range(0x700-4-length): | |||
|  |             data.append(0xff) | |||
|  |     return data | |||
|  | 
 | |||
|  | # 生成任务id序列 | |||
|  | def scheme_taskids_to_byte(j:json): | |||
|  |     t=bytearray() | |||
|  |     t.append(j["PlanID"]&0xff) | |||
|  |     t.append((j["PlanID"]>>8)&0xff) | |||
|  |     t.append((j["PlanID"]>>16)&0xff) | |||
|  |     t.append((j["PlanID"]>>24)&0xff) | |||
|  |     for i in j["TaskArray"]: | |||
|  |         t.append(i["TaskID"]) | |||
|  |     length=len(t) | |||
|  |     if(length<(0x100)): | |||
|  |         for i in range(0x100-length): | |||
|  |             t.append(0xff) | |||
|  |     return t | |||
|  | 
 | |||
|  | # 根据方案生成小板用的字节数据 | |||
|  | def scheme_to_byte(j:json): | |||
|  |     t=bytearray() | |||
|  |     t+=scheme_taskids_to_byte(j) | |||
|  |     t+=scheme_tasks_to_byte(j) | |||
|  |     crc=crc32(t) | |||
|  |     t.append(crc&0xff) | |||
|  |     t.append((crc>>8)&0xff) | |||
|  |     t.append((crc>>16)&0xff) | |||
|  |     t.append((crc>>24)&0xff) | |||
|  |     return t | |||
|  | 
 | |||
|  | 
 | |||
|  | # int转数组 | |||
|  | def arr_from_int(num:int): | |||
|  |     return  bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff]) | |||
|  | 
 | |||
|  | 
 | |||
|  | # 提取方案中的范围数据, 先max后min | |||
|  | def scheme_get_task_range(j:json): | |||
|  |     t=bytearray() | |||
|  |     t+=arr_from_int(j["TaskID"]) | |||
|  |     t+=arr_from_int(j["TaskIndex"]) | |||
|  |     t+=arr_from_int(j["ReturnCount"]) | |||
|  |     t+=arr_from_int(j["ExecuteErrCode"]) | |||
|  |     index=0 | |||
|  |     for i in j["TestStandard"]: | |||
|  |         t+=arr_from_int(i["Max"]) | |||
|  |         t+=arr_from_int(i["Min"]) | |||
|  |         if (index<len(j["ResultErrCode"])): | |||
|  |             t+=arr_from_int(j["ResultErrCode"][index]) | |||
|  |         else: | |||
|  |             t+=arr_from_int(0) | |||
|  |         index+=1 | |||
|  |     # 不足16的部分填充为16 | |||
|  |     n=16-len(j["TestStandard"]) | |||
|  |     for i in range(n): | |||
|  |         t+=arr_from_int(65535) | |||
|  |         t+=arr_from_int(0) | |||
|  |         t+=arr_from_int(0) | |||
|  |     return t | |||
|  | 
 | |||
|  | # 根据方案生成主板用的字节数据 | |||
|  | def scheme_to_host(j:json): | |||
|  |     t=bytearray() | |||
|  |     t+=arr_from_int(j["PlanID"]) | |||
|  |     t+=arr_from_int(j["TimeOutM"]) | |||
|  |     t+=arr_from_int(len(j["TaskArray"])) | |||
|  |     for i in j["TaskArray"]: | |||
|  |         t+=scheme_get_task_range(i) | |||
|  |     return t | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | # 发送文件类 | |||
|  | class handle: | |||
|  |     stat=0 | |||
|  |     name="" | |||
|  |     def __init__(self) -> None: | |||
|  |         pass | |||
|  | 
 | |||
|  |     def get_rate(self): | |||
|  |         return self.packet_now*100//self.packet_all | |||
|  |     def get_data(self): | |||
|  |         if(self.stat==0): | |||
|  |             length=len(self.data) | |||
|  |             # 1是写,0是读 | |||
|  |             t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff]) | |||
|  |             t+=self.name.encode() | |||
|  |             self.stat=1 | |||
|  |             return t | |||
|  |         elif(self.stat==1): | |||
|  |             packet_bytes=200 | |||
|  |             if(len(self.data)<self.sent_bytes): | |||
|  |                 packet_bytes=len(self.data)-self.sent_bytes | |||
|  |             if(packet_bytes>0): | |||
|  |                 self.packet_now+=1 | |||
|  |                 t=bytearray() | |||
|  |                 t.append(2) | |||
|  |                 t.append(self.packet_now&0xff) | |||
|  |                 t.append((self.packet_now>>8)&0xff) | |||
|  |                 t.append(self.packet_all&0xff) | |||
|  |                 t.append((self.packet_all>>8)&0xff) | |||
|  |                 t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes] | |||
|  |                 self.sent_bytes+=packet_bytes | |||
|  |                 return t | |||
|  |             else: | |||
|  |                 print("send done.") | |||
|  |                 return bytearray() | |||
|  | 
 | |||
|  |     # 设置要发送的文件 | |||
|  |     def set_file(self,name:str): | |||
|  |         self.data=bytearray() | |||
|  |         with open(name,"rb") as f: | |||
|  |             self.data=f.read() | |||
|  |             self.name=f.name.split('/')[-1] | |||
|  |         self.stat=0 | |||
|  |         self.packet_all=(len(self.data)+199)//200 | |||
|  |         self.sent_bytes=0 | |||
|  |         self.packet_now=0 | |||
|  |         self.packet_bytes=200 | |||
|  | 
 | |||
|  |     # 设置要发送的方案 | |||
|  |     def set_json(self,name:str): | |||
|  |         self.data=bytearray() | |||
|  |         with open(name,"rb") as f: | |||
|  |             json_obj=json.loads(f.read()) | |||
|  |             d=scheme_to_byte(json_obj) | |||
|  |             # print("len=",len(d),d.hex(",")) | |||
|  |             d+=scheme_to_host(json_obj) | |||
|  |             # print("len=",len(d),d.hex(",")) | |||
|  |             self.data=d | |||
|  |             self.name=f.name.split('/')[-1] | |||
|  |         self.stat=0 | |||
|  |         self.packet_all=(len(self.data)+199)//200 | |||
|  |         self.sent_bytes=0 | |||
|  |         self.packet_now=0 | |||
|  |         self.packet_bytes=200 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | class protu(QObject): | |||
|  |     # 进度信号,ip,1~100 | |||
|  |     rate_signal =pyqtSignal([int]) | |||
|  |     # 结束信号,ip,成败,描述 | |||
|  |     end_signal = pyqtSignal([bool,str]) | |||
|  |     # 接收到数据信号 | |||
|  |     recv_signal =pyqtSignal([int,bytearray,str]) | |||
|  | 
 | |||
|  |     hand=handle() | |||
|  |     def __init__(self) -> None: | |||
|  |         QObject.__init__(self) | |||
|  |         self.cmd=0 | |||
|  |         self.cmd_no=0 | |||
|  |         self.str_err="" | |||
|  |         self.is_big_data=False | |||
|  |         self.num_to_recv=0 | |||
|  |         self.recv_data=bytearray() | |||
|  |     def init(self,com:str): | |||
|  |         s=com.split(":") | |||
|  |         try: | |||
|  |             if(s[0]=="utcp"): | |||
|  |                 self.ser = utcp(int(s[1])) | |||
|  |             else: | |||
|  |                 bsp=int(s[1]) | |||
|  |                 self.ser = serial.Serial(port=s[0], baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, | |||
|  |                     stopbits=serial.STOPBITS_ONE,timeout=None) | |||
|  |         except Exception as e: | |||
|  |             print(str(e)) | |||
|  |             return False | |||
|  |         return True | |||
|  |          | |||
|  |     def encode(self,data:bytearray): | |||
|  |         t=bytearray() | |||
|  |         length=len(data)+3 | |||
|  |         t.append(0x59) | |||
|  |         t.append(0x6d) | |||
|  |         t.append(length&0xff) | |||
|  |         t.append(length>>8) | |||
|  |         t.append(self.cmd) | |||
|  |         t.append(self.cmd_no&0xff) | |||
|  |         t.append(self.cmd_no>>8) | |||
|  |         t+=data | |||
|  |         a,b=crc16(t,2,length+4) | |||
|  |         t.append(a) | |||
|  |         t.append(b) | |||
|  |         # print("encode:",t.hex(",")) | |||
|  |         return t | |||
|  |     def decode(self,data:bytearray): | |||
|  |         self.str_err="ok" | |||
|  |         if(len(data)<10): | |||
|  |             print("recv data len too less.") | |||
|  |             self.str_err="recv data len too less." | |||
|  |             return bytearray() | |||
|  |         if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): | |||
|  |             # print("frame head not 0x59 0x6d.") | |||
|  |             self.str_err="frame head not 0x59 0x6d." | |||
|  |             return bytearray() | |||
|  |         length=data[3]|(data[4]<<8) | |||
|  |         if(length==65535): | |||
|  |             # 重新设置数据长度 | |||
|  |             length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); | |||
|  |             self.is_big_data=True | |||
|  |         else: | |||
|  |             self.is_big_data=False | |||
|  |         if(length+7!=len(data)): | |||
|  |             print("recv data have lossed") | |||
|  |             self.str_err="recv data have lossed" | |||
|  |             return bytearray() | |||
|  |         a,b=crc16(data,3,length+5) | |||
|  |         if(a!=data[-2] or b!=data[-1]): | |||
|  |             print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) | |||
|  |             self.str_err="recv data check error." | |||
|  |         self.cmd_no=data[6]|(data[7]<<8) | |||
|  |         self.cmd=data[5] | |||
|  |         if(self.is_big_data==False): | |||
|  |             return bytearray(data[8:-2]) | |||
|  |         else: | |||
|  |             return bytearray(data[12:-2]) | |||
|  |         # 不带校验的接收函数 | |||
|  |     def recv_(self): | |||
|  |         while(self.ser.is_open): | |||
|  |             try: | |||
|  |                 data=self.ser.read(500) | |||
|  |             except Exception as a: | |||
|  |                 # print("err:",str(a)) | |||
|  |                 print("port closed") | |||
|  |                 return | |||
|  |             if(len(data)>0): | |||
|  |                 t=self.decode(data) | |||
|  |                 if(self.str_err=="ok"): | |||
|  |                     self.recv_data+=t | |||
|  |                     # print("recv",t.hex(",")) | |||
|  |                     # print(type(self.cmd),type(t),type(self.str_err)) | |||
|  |                     self.recv_signal.emit(self.cmd,t,self.str_err) | |||
|  |                     # self.send_file_next(self.cmd,t,self.str_err) | |||
|  |                     # print("sent signal---") | |||
|  |                 else: | |||
|  |                     print(data.decode("utf-8")) | |||
|  |         # 带帧校验的接收函数 | |||
|  |     def recv(self): | |||
|  |         # self.recv_signal.connect(self.send_file_next) | |||
|  |         data=bytearray() | |||
|  |         while(self.ser.is_open): | |||
|  |             d=bytes() | |||
|  |             try: | |||
|  |                 d=self.ser.read(1) | |||
|  |             except Exception as a: | |||
|  |                 # print("err:",str(a)) | |||
|  |                 print("port closed") | |||
|  |                 return | |||
|  |             data+=d | |||
|  |             if(len(data)==3): | |||
|  |                 if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): | |||
|  |                     self.num_to_recv=5 | |||
|  |                 else: | |||
|  |                     data=data[1:] | |||
|  |                     self.num_to_recv=0 | |||
|  |             elif(len(data)==5): | |||
|  |                 length=data[3]|(data[4]<<8) | |||
|  |                 if(length<65535): | |||
|  |                     self.num_to_recv+=length+2 | |||
|  |                     self.is_big_data=False | |||
|  |                 else: | |||
|  |                     self.num_to_recv=12 | |||
|  |                     self.is_big_data=True | |||
|  |             elif(len(data)==12): | |||
|  |                 if(self.is_big_data==True): | |||
|  |                     length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) | |||
|  |                     self.num_to_recv=5+length+2 | |||
|  |             if(self.num_to_recv>0 and self.num_to_recv==len(data)): | |||
|  |                 t=self.decode(data) | |||
|  |                  | |||
|  |                 self.recv_data+=t | |||
|  |                 # print("recv",t.hex(",")) | |||
|  |                 self.recv_signal.emit(self.cmd,t,self.str_err) | |||
|  |                 # self.send_file_next(self.cmd,t,self.str_err) | |||
|  |                 # print("sent signal---") | |||
|  |                 data.clear() | |||
|  |             # else: | |||
|  |             #     print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv)) | |||
|  |     def send(self,cmd:int,data:bytearray): | |||
|  |         self.cmd=cmd | |||
|  |         self.cmd_no+=1 | |||
|  |         self.ser.write(self.encode(data)) | |||
|  |     def send_str(self,txt:str): | |||
|  |         self.ser.write(txt.encode("utf-8")) | |||
|  |     def start_recv(self): | |||
|  |         self.thread_ = threading.Thread(target=self.recv, args=()) | |||
|  |         self.thread_.start() | |||
|  |     def wait(self): | |||
|  |         self.thread_.join() | |||
|  |     def close(self): | |||
|  |         try: | |||
|  |             if(self.ser.is_open): | |||
|  |                 self.ser.close() | |||
|  |         except Exception as e: | |||
|  |             print(str(e)) | |||
|  |     def send_file_next(self,cmd:int,data:bytearray,err:str): | |||
|  |         print("send_file_next") | |||
|  |         data=self.hand.get_data() | |||
|  |         if(len(data)>0): | |||
|  |             self.send(cmd,data) | |||
|  |         else: | |||
|  |             self.close() | |||
|  |     def send_file(self,cmd:int,name:str): | |||
|  |         self.start_recv() | |||
|  |         self.hand.set_file(name) | |||
|  |         # self.send_file_next(cmd,bytearray([0]),"ok") | |||
|  |         self.recv_signal.emit(cmd,bytearray([0]),"ok") | |||
|  |         self.wait() | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | # 0x30 开始检测 | |||
|  | # 0x31 检测上报 | |||
|  | # 0x32 读写方案 | |||
|  | # 0x33 连接维护 | |||
|  | # 0x34 自检数据 | |||
|  | # 0x35 自检上报 | |||
|  | # 0x36 升级脚本 | |||
|  | # 0xed 主板升级 | |||
|  | # 0xee 小板升级 | |||
|  | # 0x40 电机矫正 | |||
|  | # 0x41 小板电阻矫正 | |||
|  | # 0x42 小板电阻测量 | |||
|  | # 0x43 小板硬件版本 | |||
|  | 
 | |||
|  | # with open("file/judge.lua","rb") as f: | |||
|  | #     send_file("COM5",0x36,f.name.split('/')[-1],f.read()) | |||
|  | # with open("file/JQ_JCXB_V54.bin","rb") as f: | |||
|  | #     send_file("COM5",0xee,f.name.split('/')[-1],f.read()) | |||
|  | # with open("file/checker_ye_cfg.json","rb") as f: | |||
|  | #     json_obj=json.loads(f.read()) | |||
|  |     # d=scheme_to_byte(json_obj) | |||
|  |     # print("len=",len(d),d.hex(",")) | |||
|  |     # d=scheme_to_host(json_obj) | |||
|  |     # print("len=",len(d),d.hex(",")) | |||
|  |     # send_file("COM5",0x32,f.name.split('/')[-1],f.read()) | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | def int2str(num:int): | |||
|  |     s=str(num//100) | |||
|  |     s=s+str(num%100//10) | |||
|  |     s=s+str(num%10) | |||
|  |     return s | |||
|  | 
 | |||
|  | 
 | |||
|  | if __name__ == "__main__": | |||
|  |     u=protu() | |||
|  |     # u.init("utcp:7777") | |||
|  |     # u.send_file(0xee,"file/JQ_JCXB_V54.bin") | |||
|  |     # u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin") | |||
|  | 
 | |||
|  |     # 设置电阻 矫正值 | |||
|  |     u.cmd=0x41 | |||
|  |     data=bytearray([1,0,0x00,2,0,0x00,3,0,0x00,4,0,0x00,5,0,0x00,6,0,0x00,7,0,0x00,8,0,0x00,9,0,0x00,10,0,0x00,11,0,0x00,12,0,0x00,13,0,0x00,14,0,0x00,15,0,0x00,16,0,0x00,17,0,0x00,18,0,0x00,19,0,0x00,20,0,0x00]) | |||
|  |     # 测量电阻 | |||
|  |     # u.cmd=0x42 | |||
|  |     # data=bytearray([0]) | |||
|  |     # 设置硬件版本号 | |||
|  |     # u.cmd=0x43 | |||
|  |     # data=bytearray([1,50,0x00,2,51,0x00,3,52,0x00,4,53,0x00,5,54,0x00,6,55,0x00,7,56,0x00,8,57,0x00,9,58,0x00,10,59,0x00,11,60,0x00,12,61,0x00,13,62,0x00,14,63,0x00,15,64,0x00,16,65,0x00,17,66,0x00,18,67,0x00,19,68,0x00,20,69,0x00]) | |||
|  |     # 设置电机校正值  | |||
|  |     # u.cmd=0x40 | |||
|  |     # data=bytearray([0x01,100,0]) | |||
|  |     # data=bytearray([0x02]) # 上升 | |||
|  |     # data=bytearray([0x03]) # 下降 | |||
|  | 
 | |||
|  |     print(u.encode(data).hex(' ')) | |||
|  |     # with open("file/EX_Coder_Test_2023-07-6.json","rb") as f: | |||
|  |     #     json_obj=json.loads(f.read()) | |||
|  |     #     d=scheme_to_byte(json_obj) | |||
|  |     #     print("len=",len(d),d.hex(",")) | |||
|  |     #     d+=scheme_to_host(json_obj) | |||
|  |      | |||
|  |     # print(int2str(20)) | |||
|  |     # s="{d:03d}".format(d=2) | |||
|  |     # print(s) | |||
|  | 
 | |||
|  |     # with open("file/7-15.json","rb") as f: | |||
|  |     #     u.cmd=0x22 | |||
|  |     #     p=u.encode(f.read()) | |||
|  |     #     print(p.hex(' ')) | |||
|  | 
 | |||
|  | 
 | |||
|  | # 开始检测 | |||
|  | # 59 6d 03 00 30 00 00 60 0f | |||
|  | # 结束应答 | |||
|  | # 59 6D 03 00 31 00 00 31 CF | |||
|  | 
 | |||
|  | 
 |