436 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			436 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import serial
 | ||
| import serial.tools.list_ports
 | ||
| import threading
 | ||
| import time
 | ||
| import json
 | ||
| import socket
 | ||
| from PyQt5.QtCore import *
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def crc16(data:bytearray,offset:int,len:int):
 | ||
|     if(len>0):
 | ||
|         crc=0xffff
 | ||
|         for i in range(len-offset):
 | ||
|             crc=(crc^data[i+offset])&0xffff
 | ||
|             for j in range(8):
 | ||
|                 if(crc&1)!=0:
 | ||
|                     crc=((crc>>1)^0xa001)&0xffff
 | ||
|                 else:
 | ||
|                     crc=(crc>>1)&0xffff
 | ||
|         return crc&0xff,(crc>>8)&0xff
 | ||
|     return 0,0
 | ||
| 
 | ||
| 
 | ||
| def crc32(data:bytearray):
 | ||
|     temp=0
 | ||
|     crc=0xffffffff
 | ||
|     i=0
 | ||
|     if(len(data)%4!=0):
 | ||
|         return 0
 | ||
|     while(i<len(data)):
 | ||
|         temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24)
 | ||
|         i+=4
 | ||
|         for j in range(32):
 | ||
|             if((crc^temp)&0x80000000)!=0:
 | ||
|                 crc=0x04c11db7^(crc<<1)
 | ||
|             else:
 | ||
|                 crc<<=1
 | ||
|             temp<<=1
 | ||
|         crc&=0xffffffff
 | ||
|     return crc
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 把tcp封装为串口
 | ||
| class utcp:
 | ||
|     is_open=False
 | ||
|     def __init__(self,port:int)->None:
 | ||
|         self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | ||
|         self.ser.bind(("",port))
 | ||
|         self.ser.settimeout(10)
 | ||
|         self.ser.listen(128)
 | ||
|         print("wait for mcu connect.")
 | ||
|         self.client,self.client_addr=self.ser.accept()
 | ||
|         print("client:",self.client_addr)
 | ||
|         self.is_open=True
 | ||
|     def read(self,len:int):
 | ||
|         # return self.ser.recv(len)
 | ||
|         return self.client.recv(len)
 | ||
|     def write(self,data:bytearray):
 | ||
|         # return self.ser.send(data)
 | ||
|         return self.client.send(data)
 | ||
|     def close(self):
 | ||
|         self.client.close()
 | ||
|         self.ser.close()
 | ||
|         self.is_open=False
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 生成一个任务的参数
 | ||
| def scheme_task_to_byte(j:json):
 | ||
|     data=bytearray()
 | ||
|     data.append(j["TaskID"])
 | ||
|     data.append(j["TaskIndex"])
 | ||
|     data.append(j["RetryCount"])
 | ||
|     data.append(j["ErrJumpTo"])
 | ||
|     data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4))
 | ||
|     for i in j["ParamVal"]:
 | ||
|         data.append(i&0xff)
 | ||
|         data.append(i>>8)
 | ||
|     return data
 | ||
| 
 | ||
| # 生成任务参数序列
 | ||
| def scheme_tasks_to_byte(j:json):
 | ||
|     data=bytearray()
 | ||
|     for i in j["TaskArray"]:
 | ||
|         data+=scheme_task_to_byte(i)
 | ||
|     length=len(data)
 | ||
|     if(length<(0x700-4)):
 | ||
|         for i in range(0x700-4-length):
 | ||
|             data.append(0xff)
 | ||
|     return data
 | ||
| 
 | ||
| # 生成任务id序列
 | ||
| def scheme_taskids_to_byte(j:json):
 | ||
|     t=bytearray()
 | ||
|     t.append(j["PlanID"]&0xff)
 | ||
|     t.append((j["PlanID"]>>8)&0xff)
 | ||
|     t.append((j["PlanID"]>>16)&0xff)
 | ||
|     t.append((j["PlanID"]>>24)&0xff)
 | ||
|     for i in j["TaskArray"]:
 | ||
|         t.append(i["TaskID"])
 | ||
|     length=len(t)
 | ||
|     if(length<(0x100)):
 | ||
|         for i in range(0x100-length):
 | ||
|             t.append(0xff)
 | ||
|     return t
 | ||
| 
 | ||
| # 根据方案生成小板用的字节数据
 | ||
| def scheme_to_byte(j:json):
 | ||
|     t=bytearray()
 | ||
|     t+=scheme_taskids_to_byte(j)
 | ||
|     t+=scheme_tasks_to_byte(j)
 | ||
|     crc=crc32(t)
 | ||
|     t.append(crc&0xff)
 | ||
|     t.append((crc>>8)&0xff)
 | ||
|     t.append((crc>>16)&0xff)
 | ||
|     t.append((crc>>24)&0xff)
 | ||
|     return t
 | ||
| 
 | ||
| 
 | ||
| # int转数组
 | ||
| def arr_from_int(num:int):
 | ||
|     return  bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff])
 | ||
| 
 | ||
| 
 | ||
| # 提取方案中的范围数据, 先max后min
 | ||
| def scheme_get_task_range(j:json):
 | ||
|     t=bytearray()
 | ||
|     return_count=j["ReturnCount"]
 | ||
|     for i in j["TestStandard"]:
 | ||
|         t+=arr_from_int(i["Max"])
 | ||
|         t+=arr_from_int(i["Min"])
 | ||
|     n=return_count-len(j["TestStandard"])
 | ||
|     for i in range(n):
 | ||
|         t+=arr_from_int(65535)
 | ||
|         t+=arr_from_int(0)
 | ||
|     return t
 | ||
| 
 | ||
| # 根据方案生成主板用的字节数据
 | ||
| def scheme_to_host(j:json):
 | ||
|     t=bytearray()
 | ||
|     t+=arr_from_int(j["PlanID"])
 | ||
|     t+=arr_from_int(j["TimeOutM"])
 | ||
|     b=bytearray()
 | ||
|     for i in j["TaskArray"]:
 | ||
|         b+=scheme_get_task_range(i)
 | ||
|     t+=arr_from_int(len(b)//8)
 | ||
|     t+=b
 | ||
|     return t
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 发送文件类
 | ||
| class handle:
 | ||
|     stat=0
 | ||
|     name=""
 | ||
|     def __init__(self) -> None:
 | ||
|         pass
 | ||
| 
 | ||
|     def get_rate(self):
 | ||
|         return self.packet_now*100//self.packet_all
 | ||
|     def get_data(self):
 | ||
|         if(self.stat==0):
 | ||
|             length=len(self.data)
 | ||
|             # 1是写,0是读
 | ||
|             t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff])
 | ||
|             t+=self.name.encode()
 | ||
|             self.stat=1
 | ||
|             return t
 | ||
|         elif(self.stat==1):
 | ||
|             packet_bytes=200
 | ||
|             if(len(self.data)<self.sent_bytes):
 | ||
|                 packet_bytes=len(self.data)-self.sent_bytes
 | ||
|             if(packet_bytes>0):
 | ||
|                 self.packet_now+=1
 | ||
|                 t=bytearray()
 | ||
|                 t.append(2)
 | ||
|                 t.append(self.packet_now&0xff)
 | ||
|                 t.append((self.packet_now>>8)&0xff)
 | ||
|                 t.append(self.packet_all&0xff)
 | ||
|                 t.append((self.packet_all>>8)&0xff)
 | ||
|                 t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes]
 | ||
|                 self.sent_bytes+=packet_bytes
 | ||
|                 return t
 | ||
|             else:
 | ||
|                 print("send done.")
 | ||
|                 return bytearray()
 | ||
| 
 | ||
|     # 设置要发送的文件
 | ||
|     def set_file(self,name:str):
 | ||
|         self.data=bytearray()
 | ||
|         with open(name,"rb") as f:
 | ||
|             self.data=f.read()
 | ||
|             self.name=f.name.split('/')[-1]
 | ||
|         self.stat=0
 | ||
|         self.packet_all=(len(self.data)+199)//200
 | ||
|         self.sent_bytes=0
 | ||
|         self.packet_now=0
 | ||
|         self.packet_bytes=200
 | ||
| 
 | ||
|     # 设置要发送的方案
 | ||
|     def set_json(self,name:str):
 | ||
|         self.data=bytearray()
 | ||
|         with open(name,"rb") as f:
 | ||
|             json_obj=json.loads(f.read())
 | ||
|             d=scheme_to_byte(json_obj)
 | ||
|             # print("len=",len(d),d.hex(","))
 | ||
|             d+=scheme_to_host(json_obj)
 | ||
|             # print("len=",len(d),d.hex(","))
 | ||
|             self.data=d
 | ||
|             self.name=f.name.split('/')[-1]
 | ||
|         self.stat=0
 | ||
|         self.packet_all=(len(self.data)+199)//200
 | ||
|         self.sent_bytes=0
 | ||
|         self.packet_now=0
 | ||
|         self.packet_bytes=200
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| class protu(QObject):
 | ||
|     # 进度信号,ip,1~100
 | ||
|     rate_signal =pyqtSignal([int])
 | ||
|     # 结束信号,ip,成败,描述
 | ||
|     end_signal = pyqtSignal([bool,str])
 | ||
|     # 接收到数据信号
 | ||
|     recv_signal =pyqtSignal([int,bytearray,str])
 | ||
| 
 | ||
|     hand=handle()
 | ||
|     def __init__(self) -> None:
 | ||
|         QObject.__init__(self)
 | ||
|         self.cmd=0
 | ||
|         self.cmd_no=0
 | ||
|         self.str_err=""
 | ||
|         self.is_big_data=False
 | ||
|         self.num_to_recv=0
 | ||
|         self.recv_data=bytearray()
 | ||
|     def init(self,com:str):
 | ||
|         s=com.split(":")
 | ||
|         try:
 | ||
|             if(s[0]=="utcp"):
 | ||
|                 self.ser = utcp(int(s[1]))
 | ||
|             else:
 | ||
|                 self.ser = serial.Serial(port=s[0], baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
 | ||
|                     stopbits=serial.STOPBITS_ONE,timeout=None)
 | ||
|         except Exception as e:
 | ||
|             print(str(e))
 | ||
|             return False
 | ||
|         return True
 | ||
|         
 | ||
|     def encode(self,data:bytearray):
 | ||
|         t=bytearray()
 | ||
|         length=len(data)+3
 | ||
|         t.append(0x59)
 | ||
|         t.append(0x6d)
 | ||
|         t.append(length&0xff)
 | ||
|         t.append(length>>8)
 | ||
|         t.append(self.cmd)
 | ||
|         t.append(self.cmd_no&0xff)
 | ||
|         t.append(self.cmd_no>>8)
 | ||
|         t+=data
 | ||
|         a,b=crc16(t,2,length+4)
 | ||
|         t.append(a)
 | ||
|         t.append(b)
 | ||
|         # print("encode:",t.hex(","))
 | ||
|         return t
 | ||
|     def decode(self,data:bytearray):
 | ||
|         self.str_err="ok"
 | ||
|         if(len(data)<10):
 | ||
|             print("recv data len too less.")
 | ||
|             self.str_err="recv data len too less."
 | ||
|             return bytearray()
 | ||
|         if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
 | ||
|             print("frame head not 0x59 0x6d.")
 | ||
|             self.str_err="frame head not 0x59 0x6d."
 | ||
|             return bytearray()
 | ||
|         length=data[3]|(data[4]<<8)
 | ||
|         if(length==65535):
 | ||
|             # 重新设置数据长度
 | ||
|             length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
 | ||
|             self.is_big_data=True
 | ||
|         else:
 | ||
|             self.is_big_data=False
 | ||
|         if(length+7!=len(data)):
 | ||
|             print("recv data have lossed")
 | ||
|             self.str_err="recv data have lossed"
 | ||
|             return bytearray()
 | ||
|         a,b=crc16(data,3,length+5)
 | ||
|         if(a!=data[-2] or b!=data[-1]):
 | ||
|             print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
 | ||
|             self.str_err="recv data check error."
 | ||
|         self.cmd_no=data[6]|(data[7]<<8)
 | ||
|         self.cmd=data[5]
 | ||
|         if(self.is_big_data==False):
 | ||
|             return data[8:-2]
 | ||
|         else:
 | ||
|             return data[12:-2]
 | ||
|     def recv(self):
 | ||
|         # self.recv_signal.connect(self.send_file_next)
 | ||
|         data=bytearray()
 | ||
|         while(self.ser.is_open):
 | ||
|             d=bytes()
 | ||
|             try:
 | ||
|                 d=self.ser.read(1)
 | ||
|             except Exception as a:
 | ||
|                 # print("err:",str(a))
 | ||
|                 print("port closed")
 | ||
|                 return
 | ||
|             data+=d
 | ||
|             if(len(data)==3):
 | ||
|                 if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
 | ||
|                     self.num_to_recv=5
 | ||
|                 else:
 | ||
|                     data=data[1:]
 | ||
|                     self.num_to_recv=0
 | ||
|             elif(len(data)==5):
 | ||
|                 length=data[3]|(data[4]<<8)
 | ||
|                 if(length<65535):
 | ||
|                     self.num_to_recv+=length+2
 | ||
|                     self.is_big_data=False
 | ||
|                 else:
 | ||
|                     self.num_to_recv=12
 | ||
|                     self.is_big_data=True
 | ||
|             elif(len(data)==12):
 | ||
|                 if(self.is_big_data==True):
 | ||
|                     length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
 | ||
|                     self.num_to_recv=5+length+2
 | ||
|             if(self.num_to_recv>0 and self.num_to_recv==len(data)):
 | ||
|                 t=self.decode(data)
 | ||
|                 
 | ||
|                 self.recv_data+=t
 | ||
|                 # print("recv",t.hex(","))
 | ||
|                 self.recv_signal.emit(self.cmd,t,self.str_err)
 | ||
|                 # self.send_file_next(self.cmd,t,self.str_err)
 | ||
|                 # print("sent signal---")
 | ||
|                 data.clear()
 | ||
|             # else:
 | ||
|             #     print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
 | ||
|     def send(self,cmd:int,data:bytearray):
 | ||
|         self.cmd=cmd
 | ||
|         self.cmd_no+=1
 | ||
|         # print("send:",data.hex(","))
 | ||
|         self.ser.write(self.encode(data))
 | ||
|     def start_recv(self):
 | ||
|         self.thread_ = threading.Thread(target=self.recv, args=())
 | ||
|         self.thread_.start()
 | ||
|     def wait(self):
 | ||
|         self.thread_.join()
 | ||
|     def close(self):
 | ||
|         try:
 | ||
|             if(self.ser.is_open):
 | ||
|                 self.ser.close()
 | ||
|         except Exception as e:
 | ||
|             print(str(e))
 | ||
|     def send_file_next(self,cmd:int,data:bytearray,err:str):
 | ||
|         print("send_file_next")
 | ||
|         data=self.hand.get_data()
 | ||
|         if(len(data)>0):
 | ||
|             self.send(cmd,data)
 | ||
|         else:
 | ||
|             self.close()
 | ||
|     def send_file(self,cmd:int,name:str):
 | ||
|         self.start_recv()
 | ||
|         self.hand.set_file(name)
 | ||
|         # self.send_file_next(cmd,bytearray([0]),"ok")
 | ||
|         self.recv_signal.emit(cmd,bytearray([0]),"ok")
 | ||
|         self.wait()
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| # 0x30 开始检测
 | ||
| # 0x31 检测上报
 | ||
| # 0x32 读写方案
 | ||
| # 0x33 连接维护
 | ||
| # 0x34 自检数据
 | ||
| # 0x35 自检上报
 | ||
| # 0x36 升级脚本
 | ||
| # 0xed 主板升级
 | ||
| # 0xee 小板升级
 | ||
| 
 | ||
| # with open("file/judge.lua","rb") as f:
 | ||
| #     send_file("COM5",0x36,f.name.split('/')[-1],f.read())
 | ||
| # with open("file/JQ_JCXB_V54.bin","rb") as f:
 | ||
| #     send_file("COM5",0xee,f.name.split('/')[-1],f.read())
 | ||
| # with open("file/checker_ye_cfg.json","rb") as f:
 | ||
| #     json_obj=json.loads(f.read())
 | ||
|     # d=scheme_to_byte(json_obj)
 | ||
|     # print("len=",len(d),d.hex(","))
 | ||
|     # d=scheme_to_host(json_obj)
 | ||
|     # print("len=",len(d),d.hex(","))
 | ||
|     # send_file("COM5",0x32,f.name.split('/')[-1],f.read())
 | ||
| 
 | ||
| if __name__ == "__main__":
 | ||
|     u=protu()
 | ||
|     u.init("utcp:7777")
 | ||
|     # u.send_file(0xee,"file/JQ_JCXB_V54.bin")
 | ||
|     u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin")
 | 
