From 5ee17b6ce24aa64d4831eae704153177ea437511 Mon Sep 17 00:00:00 2001 From: ranchuan Date: Sun, 8 Oct 2023 18:25:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0jwt=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E5=8D=87=E7=BA=A7=E5=91=BD=E4=BB=A40xec?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- ReadMe.txt | 5 + coder/code_with_checker.py | 233 +++++++++++++++++++++++++++++++++++++ coder/coder_test.py | 32 ++++- updata/prottcp.py | 1 + updata/updata_uart.py | 6 +- 6 files changed, 272 insertions(+), 8 deletions(-) create mode 100644 coder/code_with_checker.py diff --git a/.gitignore b/.gitignore index a113fd4..e1c0386 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ __pycache__/ download/ *.csv quest_info.txt -scheme_export_info.txt \ No newline at end of file +scheme_export_info.txt +device_log.txt diff --git a/ReadMe.txt b/ReadMe.txt index ef61fbe..eeb7dd8 100644 --- a/ReadMe.txt +++ b/ReadMe.txt @@ -67,4 +67,9 @@ 赋码工具在模块接触异常时也保存管码 2023.9.28 修改 daemon.py 检测sd卡目录为 /run/media/sda/updata ,解决sd卡未挂载完成造成的无法打开目录的问题 +2023.10.8 + 添加jwt模块程序升级命令0xec + 添加赋码仪批检仪联动脚本 + 发送文件时打印文件的crc32 + diff --git a/coder/code_with_checker.py b/coder/code_with_checker.py new file mode 100644 index 0000000..4fc684e --- /dev/null +++ b/coder/code_with_checker.py @@ -0,0 +1,233 @@ +import serial +import serial.tools.list_ports +import threading +import time +import socket +import sys +# import checker_save + + + + + +# 使用批检仪和赋码仪联动, +# 批检仪控制电机,赋码仪注码 + + + + + +def crc16(data:bytearray,offset:int,len:int): + if(len>0): + crc=0xffff + for i in range(len-offset): + crc=(crc^data[i+offset])&0xffff + for j in range(8): + if(crc&1)!=0: + crc=((crc>>1)^0xa001)&0xffff + else: + crc=(crc>>1)&0xffff + return crc&0xff,(crc>>8)&0xff + return 0,0 + + + + + +# 把tcp封装为串口 +class utcp: + is_open=False + def __init__(self,port:int)->None: + self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.ser.bind(("",port)) + self.ser.settimeout(10) + self.ser.listen(128) + print("wait for mcu connect.") + self.client,self.client_addr=self.ser.accept() + print("client:",self.client_addr) + self.is_open=True + def read(self,len:int): + return self.client.recv(len) + def write(self,data:bytearray): + return self.client.send(data) + def close(self): + self.client.close() + self.ser.close() + self.is_open=False + +class port: + def __init__(self) -> None: + self.cmd=0 + self.cmd_no=0 + self.str_err="" + self.is_big_data=False + self.num_to_recv=0 + self.recv_data=bytearray() + def open(self,name:str,bsp:int): + if(name!="utcp"): + self.ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE,timeout=None) + else: + self.ser=utcp(bsp) + # 打开赋码仪端口 + def open_code(self,name:str,bsp:int): + if(name!="utcp"): + self.code_ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE,timeout=None) + else: + self.code_ser=utcp(bsp) + + def start_recv(self): + self.thread_ = threading.Thread(target=self.recv, args=()) + self.thread_.start() + def decode(self,data:bytearray): + self.str_err="ok" + if(len(data)<10): + print("recv data len too less.") + self.str_err="recv data len too less." + return bytearray() + if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): + print("frame head not 0x59 0x6d.") + self.str_err="frame head not 0x59 0x6d." + return bytearray() + length=data[3]|(data[4]<<8) + if(length==65535): + # 重新设置数据长度 + length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); + self.is_big_data=True + else: + self.is_big_data=False + if(length+7!=len(data)): + print("recv data have lossed") + self.str_err="recv data have lossed" + return bytearray() + a,b=crc16(data,3,length+5) + if(a!=data[-2] or b!=data[-1]): + print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) + self.str_err="recv data check error." + self.cmd_no=data[6]|(data[7]<<8) + self.cmd=data[5] + if(self.is_big_data==False): + return data[8:-2] + else: + return data[12:-2] + def recv(self): + data=bytearray() + while(True): + d=bytes() + try: + d=self.ser.read(1) + except Exception as a: + # print("err:",str(a)) + print("port closed") + return + data+=d + if(len(data)==3): + if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): + self.num_to_recv=5 + else: + data=data[1:] + self.num_to_recv=0 + elif(len(data)==5): + length=data[3]|(data[4]<<8) + if(length<65535): + self.num_to_recv+=length+2 + self.is_big_data=False + else: + self.num_to_recv=12 + self.is_big_data=True + elif(len(data)==12): + if(self.is_big_data==True): + length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) + self.num_to_recv=5+length+2 + if(self.num_to_recv>0 and self.num_to_recv==len(data)): + # print("recv:",data.hex(" ")) + return self.decode(data) + def recv_code(self,num:int): + d=bytearray() + try: + while(num>len(d)): + d+=self.code_ser.read(num-len(d)) + except Exception as a: + print("port closed") + return 0 + print("recv code:",d.hex(" ")) + # return d[0] + def send_code(self,data:bytearray): + self.code_ser.write(data) + print("send code:",data.hex(' ')) + def encode(self,data:bytearray): + t=bytearray() + length=len(data)+3 + t.append(0x59) + t.append(0x6d) + t.append(length&0xff) + t.append(length>>8) + t.append(self.cmd) + t.append(self.cmd_no&0xff) + t.append(self.cmd_no>>8) + t+=data + a,b=crc16(t,2,length+4) + t.append(a) + t.append(b) + # print("encode:",t.hex(",")) + return t + def send(self,cmd:int,data:bytearray): + self.cmd=cmd + self.cmd_no+=1 + d=self.encode(data) + # print("send:",d.hex(",")) + self.ser.write(d) + # 测试批检仪 + def checker_test(self,times:int,scheme:str,script:str): + print("批检仪测试:") + tick=0 + # save=checker_save.save(scheme,script) + data=bytearray() + while(tick