520 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			520 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import serial
 | ||
| import serial.tools.list_ports
 | ||
| import threading
 | ||
| import time
 | ||
| import json
 | ||
| import socket
 | ||
| from PyQt5.QtCore import *
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def crc16(data:bytearray,offset:int,len:int):
 | ||
|     if(len>0):
 | ||
|         crc=0xffff
 | ||
|         for i in range(len-offset):
 | ||
|             crc=(crc^data[i+offset])&0xffff
 | ||
|             for j in range(8):
 | ||
|                 if(crc&1)!=0:
 | ||
|                     crc=((crc>>1)^0xa001)&0xffff
 | ||
|                 else:
 | ||
|                     crc=(crc>>1)&0xffff
 | ||
|         return crc&0xff,(crc>>8)&0xff
 | ||
|     return 0,0
 | ||
| 
 | ||
| 
 | ||
| def crc32(data:bytearray):
 | ||
|     temp=0
 | ||
|     crc=0xffffffff
 | ||
|     i=0
 | ||
|     if(len(data)%4!=0):
 | ||
|         return 0
 | ||
|     while(i<len(data)):
 | ||
|         temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24)
 | ||
|         i+=4
 | ||
|         for j in range(32):
 | ||
|             if((crc^temp)&0x80000000)!=0:
 | ||
|                 crc=0x04c11db7^(crc<<1)
 | ||
|             else:
 | ||
|                 crc<<=1
 | ||
|             temp<<=1
 | ||
|         crc&=0xffffffff
 | ||
|     return crc
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 把tcp封装为串口
 | ||
| class utcp:
 | ||
|     is_open=False
 | ||
|     def __init__(self,port:int)->None:
 | ||
|         self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | ||
|         self.ser.bind(("",port))
 | ||
|         self.ser.settimeout(10)
 | ||
|         self.ser.listen(128)
 | ||
|         print("wait for mcu connect.")
 | ||
|         self.client,self.client_addr=self.ser.accept()
 | ||
|         print("client:",self.client_addr)
 | ||
|         self.is_open=True
 | ||
|     def read(self,len:int):
 | ||
|         # return self.ser.recv(len)
 | ||
|         return self.client.recv(len)
 | ||
|     def write(self,data:bytearray):
 | ||
|         # return self.ser.send(data)
 | ||
|         return self.client.send(data)
 | ||
|     def close(self):
 | ||
|         self.client.close()
 | ||
|         self.ser.close()
 | ||
|         self.is_open=False
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 生成一个任务的参数
 | ||
| def scheme_task_to_byte(j:json):
 | ||
|     data=bytearray()
 | ||
|     data.append(j["TaskID"])
 | ||
|     data.append(j["TaskIndex"])
 | ||
|     data.append(j["RetryCount"])
 | ||
|     data.append(j["ErrJumpTo"])
 | ||
|     data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4))
 | ||
|     for i in j["ParamVal"]:
 | ||
|         data.append(i&0xff)
 | ||
|         data.append(i>>8)
 | ||
|     return data
 | ||
| 
 | ||
| # 生成任务参数序列
 | ||
| def scheme_tasks_to_byte(j:json):
 | ||
|     data=bytearray()
 | ||
|     for i in j["TaskArray"]:
 | ||
|         data+=scheme_task_to_byte(i)
 | ||
|     length=len(data)
 | ||
|     if(length<(0x700-4)):
 | ||
|         for i in range(0x700-4-length):
 | ||
|             data.append(0xff)
 | ||
|     return data
 | ||
| 
 | ||
| # 生成任务id序列
 | ||
| def scheme_taskids_to_byte(j:json):
 | ||
|     t=bytearray()
 | ||
|     t.append(j["PlanID"]&0xff)
 | ||
|     t.append((j["PlanID"]>>8)&0xff)
 | ||
|     t.append((j["PlanID"]>>16)&0xff)
 | ||
|     t.append((j["PlanID"]>>24)&0xff)
 | ||
|     for i in j["TaskArray"]:
 | ||
|         t.append(i["TaskID"])
 | ||
|     length=len(t)
 | ||
|     if(length<(0x100)):
 | ||
|         for i in range(0x100-length):
 | ||
|             t.append(0xff)
 | ||
|     return t
 | ||
| 
 | ||
| # 根据方案生成小板用的字节数据
 | ||
| def scheme_to_byte(j:json):
 | ||
|     t=bytearray()
 | ||
|     t+=scheme_taskids_to_byte(j)
 | ||
|     t+=scheme_tasks_to_byte(j)
 | ||
|     crc=crc32(t)
 | ||
|     t.append(crc&0xff)
 | ||
|     t.append((crc>>8)&0xff)
 | ||
|     t.append((crc>>16)&0xff)
 | ||
|     t.append((crc>>24)&0xff)
 | ||
|     return t
 | ||
| 
 | ||
| 
 | ||
| # int转数组
 | ||
| def arr_from_int(num:int):
 | ||
|     return  bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff])
 | ||
| 
 | ||
| 
 | ||
| # 提取方案中的范围数据, 先max后min
 | ||
| def scheme_get_task_range(j:json):
 | ||
|     t=bytearray()
 | ||
|     t+=arr_from_int(j["TaskID"])
 | ||
|     t+=arr_from_int(j["TaskIndex"])
 | ||
|     t+=arr_from_int(j["ReturnCount"])
 | ||
|     t+=arr_from_int(j["ExecuteErrCode"])
 | ||
|     index=0
 | ||
|     for i in j["TestStandard"]:
 | ||
|         t+=arr_from_int(i["Max"])
 | ||
|         t+=arr_from_int(i["Min"])
 | ||
|         if (index<len(j["ResultErrCode"])):
 | ||
|             t+=arr_from_int(j["ResultErrCode"][index])
 | ||
|         else:
 | ||
|             t+=arr_from_int(0)
 | ||
|         index+=1
 | ||
|     # 不足16的部分填充为16
 | ||
|     n=16-len(j["TestStandard"])
 | ||
|     for i in range(n):
 | ||
|         t+=arr_from_int(65535)
 | ||
|         t+=arr_from_int(0)
 | ||
|         t+=arr_from_int(0)
 | ||
|     return t
 | ||
| 
 | ||
| # 根据方案生成主板用的字节数据
 | ||
| def scheme_to_host(j:json):
 | ||
|     t=bytearray()
 | ||
|     t+=arr_from_int(j["PlanID"])
 | ||
|     t+=arr_from_int(j["TimeOutM"])
 | ||
|     t+=arr_from_int(len(j["TaskArray"]))
 | ||
|     for i in j["TaskArray"]:
 | ||
|         t+=scheme_get_task_range(i)
 | ||
|     return t
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 发送文件类
 | ||
| class handle:
 | ||
|     stat=0
 | ||
|     name=""
 | ||
|     def __init__(self) -> None:
 | ||
|         pass
 | ||
| 
 | ||
|     def get_rate(self):
 | ||
|         return self.packet_now*100//self.packet_all
 | ||
|     def get_data(self):
 | ||
|         if(self.stat==0):
 | ||
|             length=len(self.data)
 | ||
|             # 1是写,0是读
 | ||
|             t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff])
 | ||
|             t+=self.name.encode()
 | ||
|             self.stat=1
 | ||
|             return t
 | ||
|         elif(self.stat==1):
 | ||
|             packet_bytes=200
 | ||
|             if(len(self.data)<self.sent_bytes):
 | ||
|                 packet_bytes=len(self.data)-self.sent_bytes
 | ||
|             if(packet_bytes>0):
 | ||
|                 self.packet_now+=1
 | ||
|                 t=bytearray()
 | ||
|                 t.append(2)
 | ||
|                 t.append(self.packet_now&0xff)
 | ||
|                 t.append((self.packet_now>>8)&0xff)
 | ||
|                 t.append(self.packet_all&0xff)
 | ||
|                 t.append((self.packet_all>>8)&0xff)
 | ||
|                 t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes]
 | ||
|                 self.sent_bytes+=packet_bytes
 | ||
|                 return t
 | ||
|             else:
 | ||
|                 print("send done.")
 | ||
|                 return bytearray()
 | ||
| 
 | ||
|     # 设置要发送的文件
 | ||
|     def set_file(self,name:str):
 | ||
|         self.data=bytearray()
 | ||
|         with open(name,"rb") as f:
 | ||
|             self.data=f.read()
 | ||
|             self.name=f.name.split('/')[-1]
 | ||
|         self.stat=0
 | ||
|         self.packet_all=(len(self.data)+199)//200
 | ||
|         self.sent_bytes=0
 | ||
|         self.packet_now=0
 | ||
|         self.packet_bytes=200
 | ||
| 
 | ||
|     # 设置要发送的方案
 | ||
|     def set_json(self,name:str):
 | ||
|         self.data=bytearray()
 | ||
|         with open(name,"rb") as f:
 | ||
|             json_obj=json.loads(f.read())
 | ||
|             d=scheme_to_byte(json_obj)
 | ||
|             # print("len=",len(d),d.hex(","))
 | ||
|             d+=scheme_to_host(json_obj)
 | ||
|             # print("len=",len(d),d.hex(","))
 | ||
|             self.data=d
 | ||
|             self.name=f.name.split('/')[-1]
 | ||
|         self.stat=0
 | ||
|         self.packet_all=(len(self.data)+199)//200
 | ||
|         self.sent_bytes=0
 | ||
|         self.packet_now=0
 | ||
|         self.packet_bytes=200
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| class protu(QObject):
 | ||
|     # 进度信号,ip,1~100
 | ||
|     rate_signal =pyqtSignal([int])
 | ||
|     # 结束信号,ip,成败,描述
 | ||
|     end_signal = pyqtSignal([bool,str])
 | ||
|     # 接收到数据信号
 | ||
|     recv_signal =pyqtSignal([int,bytearray,str])
 | ||
| 
 | ||
|     hand=handle()
 | ||
|     def __init__(self) -> None:
 | ||
|         QObject.__init__(self)
 | ||
|         self.cmd=0
 | ||
|         self.cmd_no=0
 | ||
|         self.str_err=""
 | ||
|         self.is_big_data=False
 | ||
|         self.num_to_recv=0
 | ||
|         self.recv_data=bytearray()
 | ||
|     def init(self,com:str):
 | ||
|         s=com.split(":")
 | ||
|         try:
 | ||
|             if(s[0]=="utcp"):
 | ||
|                 self.ser = utcp(int(s[1]))
 | ||
|             else:
 | ||
|                 bsp=int(s[1])
 | ||
|                 self.ser = serial.Serial(port=s[0], baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
 | ||
|                     stopbits=serial.STOPBITS_ONE,timeout=None)
 | ||
|         except Exception as e:
 | ||
|             print(str(e))
 | ||
|             return False
 | ||
|         return True
 | ||
|         
 | ||
|     def encode(self,data:bytearray):
 | ||
|         t=bytearray()
 | ||
|         length=len(data)+3
 | ||
|         t.append(0x59)
 | ||
|         t.append(0x6d)
 | ||
|         t.append(length&0xff)
 | ||
|         t.append(length>>8)
 | ||
|         t.append(self.cmd)
 | ||
|         t.append(self.cmd_no&0xff)
 | ||
|         t.append(self.cmd_no>>8)
 | ||
|         t+=data
 | ||
|         a,b=crc16(t,2,length+4)
 | ||
|         t.append(a)
 | ||
|         t.append(b)
 | ||
|         # print("encode:",t.hex(","))
 | ||
|         return t
 | ||
|     def decode(self,data:bytearray):
 | ||
|         self.str_err="ok"
 | ||
|         if(len(data)<10):
 | ||
|             print("recv data len too less.")
 | ||
|             self.str_err="recv data len too less."
 | ||
|             return bytearray()
 | ||
|         if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
 | ||
|             # print("frame head not 0x59 0x6d.")
 | ||
|             self.str_err="frame head not 0x59 0x6d."
 | ||
|             return bytearray()
 | ||
|         length=data[3]|(data[4]<<8)
 | ||
|         if(length==65535):
 | ||
|             # 重新设置数据长度
 | ||
|             length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
 | ||
|             self.is_big_data=True
 | ||
|         else:
 | ||
|             self.is_big_data=False
 | ||
|         if(length+7!=len(data)):
 | ||
|             print("recv data have lossed")
 | ||
|             self.str_err="recv data have lossed"
 | ||
|             return bytearray()
 | ||
|         a,b=crc16(data,3,length+5)
 | ||
|         if(a!=data[-2] or b!=data[-1]):
 | ||
|             print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
 | ||
|             self.str_err="recv data check error."
 | ||
|         self.cmd_no=data[6]|(data[7]<<8)
 | ||
|         self.cmd=data[5]
 | ||
|         if(self.is_big_data==False):
 | ||
|             return bytearray(data[8:-2])
 | ||
|         else:
 | ||
|             return bytearray(data[12:-2])
 | ||
|         # 不带校验的接收函数
 | ||
|     def recv_(self):
 | ||
|         while(self.ser.is_open):
 | ||
|             try:
 | ||
|                 data=self.ser.read(500)
 | ||
|             except Exception as a:
 | ||
|                 # print("err:",str(a))
 | ||
|                 print("port closed")
 | ||
|                 return
 | ||
|             if(len(data)>0):
 | ||
|                 t=self.decode(data)
 | ||
|                 if(self.str_err=="ok"):
 | ||
|                     self.recv_data+=t
 | ||
|                     # print("recv",t.hex(","))
 | ||
|                     # print(type(self.cmd),type(t),type(self.str_err))
 | ||
|                     self.recv_signal.emit(self.cmd,t,self.str_err)
 | ||
|                     # self.send_file_next(self.cmd,t,self.str_err)
 | ||
|                     # print("sent signal---")
 | ||
|                 else:
 | ||
|                     print(data.decode("utf-8"))
 | ||
|         # 带帧校验的接收函数
 | ||
|     def recv(self):
 | ||
|         # self.recv_signal.connect(self.send_file_next)
 | ||
|         data=bytearray()
 | ||
|         while(self.ser.is_open):
 | ||
|             d=bytes()
 | ||
|             try:
 | ||
|                 d=self.ser.read(1)
 | ||
|             except Exception as a:
 | ||
|                 # print("err:",str(a))
 | ||
|                 print("port closed")
 | ||
|                 return
 | ||
|             data+=d
 | ||
|             if(len(data)==3):
 | ||
|                 if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
 | ||
|                     self.num_to_recv=5
 | ||
|                 else:
 | ||
|                     data=data[1:]
 | ||
|                     self.num_to_recv=0
 | ||
|             elif(len(data)==5):
 | ||
|                 length=data[3]|(data[4]<<8)
 | ||
|                 if(length<65535):
 | ||
|                     self.num_to_recv+=length+2
 | ||
|                     self.is_big_data=False
 | ||
|                 else:
 | ||
|                     self.num_to_recv=12
 | ||
|                     self.is_big_data=True
 | ||
|             elif(len(data)==12):
 | ||
|                 if(self.is_big_data==True):
 | ||
|                     length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
 | ||
|                     self.num_to_recv=5+length+2
 | ||
|             if(self.num_to_recv>0 and self.num_to_recv==len(data)):
 | ||
|                 t=self.decode(data)
 | ||
|                 
 | ||
|                 self.recv_data+=t
 | ||
|                 # print("recv",t.hex(","))
 | ||
|                 self.recv_signal.emit(self.cmd,t,self.str_err)
 | ||
|                 # self.send_file_next(self.cmd,t,self.str_err)
 | ||
|                 # print("sent signal---")
 | ||
|                 data.clear()
 | ||
|             # else:
 | ||
|             #     print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
 | ||
|     def send(self,cmd:int,data:bytearray):
 | ||
|         self.cmd=cmd
 | ||
|         self.cmd_no+=1
 | ||
|         self.ser.write(self.encode(data))
 | ||
|     def send_str(self,txt:str):
 | ||
|         self.ser.write(txt.encode("utf-8"))
 | ||
|     def start_recv(self):
 | ||
|         self.thread_ = threading.Thread(target=self.recv, args=())
 | ||
|         self.thread_.start()
 | ||
|     def wait(self):
 | ||
|         self.thread_.join()
 | ||
|     def close(self):
 | ||
|         try:
 | ||
|             if(self.ser.is_open):
 | ||
|                 self.ser.close()
 | ||
|         except Exception as e:
 | ||
|             print(str(e))
 | ||
|     def send_file_next(self,cmd:int,data:bytearray,err:str):
 | ||
|         print("send_file_next")
 | ||
|         data=self.hand.get_data()
 | ||
|         if(len(data)>0):
 | ||
|             self.send(cmd,data)
 | ||
|         else:
 | ||
|             self.close()
 | ||
|     def send_file(self,cmd:int,name:str):
 | ||
|         self.start_recv()
 | ||
|         self.hand.set_file(name)
 | ||
|         # self.send_file_next(cmd,bytearray([0]),"ok")
 | ||
|         self.recv_signal.emit(cmd,bytearray([0]),"ok")
 | ||
|         self.wait()
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 0x30 开始检测
 | ||
| # 0x31 检测上报
 | ||
| # 0x32 读写方案
 | ||
| # 0x33 连接维护
 | ||
| # 0x34 自检数据
 | ||
| # 0x35 自检上报
 | ||
| # 0x36 升级脚本
 | ||
| # 0xed 主板升级
 | ||
| # 0xee 小板升级
 | ||
| # 0x40 电机矫正
 | ||
| # 0x41 小板电阻矫正
 | ||
| # 0x42 小板电阻测量
 | ||
| # 0x43 小板硬件版本
 | ||
| 
 | ||
| # with open("file/judge.lua","rb") as f:
 | ||
| #     send_file("COM5",0x36,f.name.split('/')[-1],f.read())
 | ||
| # with open("file/JQ_JCXB_V54.bin","rb") as f:
 | ||
| #     send_file("COM5",0xee,f.name.split('/')[-1],f.read())
 | ||
| # with open("file/checker_ye_cfg.json","rb") as f:
 | ||
| #     json_obj=json.loads(f.read())
 | ||
|     # d=scheme_to_byte(json_obj)
 | ||
|     # print("len=",len(d),d.hex(","))
 | ||
|     # d=scheme_to_host(json_obj)
 | ||
|     # print("len=",len(d),d.hex(","))
 | ||
|     # send_file("COM5",0x32,f.name.split('/')[-1],f.read())
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def int2str(num:int):
 | ||
|     s=str(num//100)
 | ||
|     s=s+str(num%100//10)
 | ||
|     s=s+str(num%10)
 | ||
|     return s
 | ||
| 
 | ||
| 
 | ||
| if __name__ == "__main__":
 | ||
|     u=protu()
 | ||
|     # u.init("utcp:7777")
 | ||
|     # u.send_file(0xee,"file/JQ_JCXB_V54.bin")
 | ||
|     # u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin")
 | ||
| 
 | ||
|     # 设置电阻 矫正值
 | ||
|     u.cmd=0x41
 | ||
|     data=bytearray([1,0,0x00,2,0,0x00,3,0,0x00,4,0,0x00,5,0,0x00,6,0,0x00,7,0,0x00,8,0,0x00,9,0,0x00,10,0,0x00,11,0,0x00,12,0,0x00,13,0,0x00,14,0,0x00,15,0,0x00,16,0,0x00,17,0,0x00,18,0,0x00,19,0,0x00,20,0,0x00])
 | ||
|     # 测量电阻
 | ||
|     # u.cmd=0x42
 | ||
|     # data=bytearray([0])
 | ||
|     # 设置硬件版本号
 | ||
|     # u.cmd=0x43
 | ||
|     # data=bytearray([1,50,0x00,2,51,0x00,3,52,0x00,4,53,0x00,5,54,0x00,6,55,0x00,7,56,0x00,8,57,0x00,9,58,0x00,10,59,0x00,11,60,0x00,12,61,0x00,13,62,0x00,14,63,0x00,15,64,0x00,16,65,0x00,17,66,0x00,18,67,0x00,19,68,0x00,20,69,0x00])
 | ||
|     # 设置电机校正值 
 | ||
|     # u.cmd=0x40
 | ||
|     # data=bytearray([0x01,100,0])
 | ||
|     # data=bytearray([0x02]) # 上升
 | ||
|     # data=bytearray([0x03]) # 下降
 | ||
| 
 | ||
|     print(u.encode(data).hex(' '))
 | ||
|     # with open("file/EX_Coder_Test_2023-07-6.json","rb") as f:
 | ||
|     #     json_obj=json.loads(f.read())
 | ||
|     #     d=scheme_to_byte(json_obj)
 | ||
|     #     print("len=",len(d),d.hex(","))
 | ||
|     #     d+=scheme_to_host(json_obj)
 | ||
|     
 | ||
|     # print(int2str(20))
 | ||
|     # s="{d:03d}".format(d=2)
 | ||
|     # print(s)
 | ||
| 
 | ||
|     # with open("file/7-15.json","rb") as f:
 | ||
|     #     u.cmd=0x22
 | ||
|     #     p=u.encode(f.read())
 | ||
|     #     print(p.hex(' '))
 | ||
| 
 | ||
| 
 | ||
| # 开始检测
 | ||
| # 59 6d 03 00 30 00 00 60 0f
 | ||
| # 结束应答
 | ||
| # 59 6D 03 00 31 00 00 31 CF
 | ||
| 
 | ||
| 
 | 
