293 lines
8.1 KiB
Python
293 lines
8.1 KiB
Python
![]() |
import serial
|
||
|
import serial.tools.list_ports
|
||
|
import threading
|
||
|
import time
|
||
|
import json
|
||
|
|
||
|
|
||
|
# ports_list = list(serial.tools.list_ports.comports())
|
||
|
# if len(ports_list) <= 0:
|
||
|
# print("无串口设备。")
|
||
|
# else:
|
||
|
# print("可用的串口设备如下:")
|
||
|
# for comport in ports_list:
|
||
|
# print(list(comport)[0], list(comport)[1])
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def crc16(data:bytearray,offset:int,len:int):
|
||
|
if(len>0):
|
||
|
crc=0xffff
|
||
|
for i in range(len-offset):
|
||
|
crc=(crc^data[i+offset])&0xffff
|
||
|
for j in range(8):
|
||
|
if(crc&1)!=0:
|
||
|
crc=((crc>>1)^0xa001)&0xffff
|
||
|
else:
|
||
|
crc=(crc>>1)&0xffff
|
||
|
return crc&0xff,(crc>>8)&0xff
|
||
|
return 0,0
|
||
|
|
||
|
|
||
|
def crc32(data:bytearray):
|
||
|
temp=0
|
||
|
crc=0xffffffff
|
||
|
i=0
|
||
|
if(len(data)%4!=0):
|
||
|
return 0
|
||
|
while(i<len(data)):
|
||
|
temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24)
|
||
|
i+=4
|
||
|
for j in range(32):
|
||
|
if((crc^temp)&0x80000000)!=0:
|
||
|
crc=0x04c11db7^(crc<<1)
|
||
|
else:
|
||
|
crc<<=1
|
||
|
temp<<=1
|
||
|
crc&=0xffffffff
|
||
|
return crc
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class protu:
|
||
|
|
||
|
def __init__(self,com:str) -> None:
|
||
|
self.cmd=0
|
||
|
self.cmd_no=0
|
||
|
self.str_err=""
|
||
|
self.is_big_data=False
|
||
|
self.num_to_recv=0
|
||
|
self.recv_data=bytearray()
|
||
|
self.ser = serial.Serial(port=com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
|
||
|
stopbits=serial.STOPBITS_ONE,timeout=None)
|
||
|
def encode(self,data:bytearray):
|
||
|
t=bytearray()
|
||
|
length=len(data)+3
|
||
|
t.append(0x59)
|
||
|
t.append(0x6d)
|
||
|
t.append(length&0xff)
|
||
|
t.append(length>>8)
|
||
|
t.append(self.cmd)
|
||
|
t.append(self.cmd_no&0xff)
|
||
|
t.append(self.cmd_no>>8)
|
||
|
t+=data
|
||
|
a,b=crc16(t,2,length+4)
|
||
|
t.append(a)
|
||
|
t.append(b)
|
||
|
# print("encode:",t.hex(","))
|
||
|
return t
|
||
|
def decode(self,data:bytearray):
|
||
|
self.str_err="ok"
|
||
|
if(len(data)<10):
|
||
|
print("recv data len too less.")
|
||
|
self.str_err="recv data len too less."
|
||
|
return bytearray()
|
||
|
if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
|
||
|
print("frame head not 0x59 0x6d.")
|
||
|
self.str_err="frame head not 0x59 0x6d."
|
||
|
return bytearray()
|
||
|
length=data[3]|(data[4]<<8)
|
||
|
if(length==65535):
|
||
|
# 重新设置数据长度
|
||
|
length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
|
||
|
self.is_big_data=True
|
||
|
else:
|
||
|
self.is_big_data=False
|
||
|
if(length+7!=len(data)):
|
||
|
print("recv data have lossed")
|
||
|
self.str_err="recv data have lossed"
|
||
|
return bytearray()
|
||
|
a,b=crc16(data,3,length+5)
|
||
|
if(a!=data[-2] or b!=data[-1]):
|
||
|
print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
|
||
|
self.str_err="recv data check error."
|
||
|
self.cmd_no=data[6]|(data[7]<<8)
|
||
|
self.cmd=data[5]
|
||
|
if(self.is_big_data==False):
|
||
|
return data[8:-2]
|
||
|
else:
|
||
|
return data[12:-2]
|
||
|
def recv(self,dolater):
|
||
|
data=bytearray()
|
||
|
while(self.ser.is_open):
|
||
|
d=bytes()
|
||
|
try:
|
||
|
d=self.ser.read(1)
|
||
|
except Exception as a:
|
||
|
print("err:",str(a))
|
||
|
return
|
||
|
data+=d
|
||
|
if(len(data)==3):
|
||
|
if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
|
||
|
self.num_to_recv=5
|
||
|
else:
|
||
|
data=data[1:]
|
||
|
self.num_to_recv=0
|
||
|
elif(len(data)==5):
|
||
|
length=data[3]|(data[4]<<8)
|
||
|
if(length<65535):
|
||
|
self.num_to_recv+=length+2
|
||
|
self.is_big_data=False
|
||
|
else:
|
||
|
self.num_to_recv=12
|
||
|
self.is_big_data=True
|
||
|
elif(len(data)==12):
|
||
|
if(self.is_big_data==True):
|
||
|
length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
|
||
|
self.num_to_recv=5+length+2
|
||
|
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
|
||
|
t=self.decode(data)
|
||
|
|
||
|
self.recv_data+=t
|
||
|
# dolater(self.cmd,t,self.str_err)
|
||
|
print("recv",t.hex(","))
|
||
|
try:
|
||
|
print("send next:")
|
||
|
next(dolater)
|
||
|
except:
|
||
|
print("dolater end.")
|
||
|
return
|
||
|
data.clear()
|
||
|
# else:
|
||
|
# print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
|
||
|
def send(self,cmd:int,data:bytearray):
|
||
|
self.cmd=cmd
|
||
|
self.cmd_no+=1
|
||
|
# print("send:",data.hex(","))
|
||
|
self.ser.write(self.encode(data))
|
||
|
def start_recv(self,dolater):
|
||
|
self.thread_ = threading.Thread(target=self.recv, args=(dolater,))
|
||
|
self.thread_.start()
|
||
|
def wait(self):
|
||
|
self.thread_.join()
|
||
|
def close(self):
|
||
|
if(self.ser.is_open):
|
||
|
self.ser.close()
|
||
|
|
||
|
|
||
|
def make_dolater(cmd:int,data:bytearray,u:protu):
|
||
|
dataa=data
|
||
|
def dolater(cmd:int,data_:bytearray,str_err:str):
|
||
|
# print("cmd={d1},data={d2},err={d3}".format(d1=cmd,d2=data.hex(","),d3=str_err))
|
||
|
packet_all=(len(dataa)+199)//200
|
||
|
sent_bytes=0
|
||
|
packet_now=0
|
||
|
packet_bytes=200
|
||
|
while True:
|
||
|
packet_bytes=200
|
||
|
if(len(dataa)<sent_bytes):
|
||
|
packet_bytes=len(dataa)-sent_bytes
|
||
|
if(packet_bytes>0):
|
||
|
packet_now+=1
|
||
|
t=bytearray()
|
||
|
t.append(2)
|
||
|
t.append(packet_now&0xff)
|
||
|
t.append((packet_now>>8)&0xff)
|
||
|
t.append(packet_all&0xff)
|
||
|
t.append((packet_all>>8)&0xff)
|
||
|
t+=dataa[sent_bytes:sent_bytes+packet_bytes]
|
||
|
u.send(cmd,t)
|
||
|
sent_bytes+=packet_bytes
|
||
|
yield
|
||
|
else:
|
||
|
print("send done.")
|
||
|
u.close()
|
||
|
break
|
||
|
return dolater(cmd,data,u)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def scheme_task_to_byte(j:json):
|
||
|
data=bytearray()
|
||
|
data.append(j["TaskID"])
|
||
|
data.append(j["TaskIndex"])
|
||
|
data.append(j["RetryCount"])
|
||
|
data.append(j["ErrJumpTo"])
|
||
|
data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4))
|
||
|
for i in j["ParamVal"]:
|
||
|
data.append(i&0xff)
|
||
|
data.append(i>>8)
|
||
|
return data
|
||
|
|
||
|
def scheme_tasks_to_byte(j:json):
|
||
|
data=bytearray()
|
||
|
for i in j["TaskArray"]:
|
||
|
data+=scheme_task_to_byte(i)
|
||
|
length=len(data)
|
||
|
if(length<(0x700-4)):
|
||
|
for i in range(0x700-4-length):
|
||
|
data.append(0xff)
|
||
|
return data
|
||
|
|
||
|
|
||
|
def scheme_taskids_to_byte(j:json):
|
||
|
t=bytearray()
|
||
|
t.append(j["PlanID"]&0xff)
|
||
|
t.append((j["PlanID"]>>8)&0xff)
|
||
|
t.append((j["PlanID"]>>16)&0xff)
|
||
|
t.append((j["PlanID"]>>24)&0xff)
|
||
|
for i in j["TaskArray"]:
|
||
|
t.append(i["TaskID"])
|
||
|
length=len(t)
|
||
|
if(length<(0x100)):
|
||
|
for i in range(0x100-length):
|
||
|
t.append(0xff)
|
||
|
return t
|
||
|
|
||
|
|
||
|
def scheme_to_byte(j:json):
|
||
|
t=bytearray()
|
||
|
t+=scheme_taskids_to_byte(j)
|
||
|
t+=scheme_tasks_to_byte(j)
|
||
|
crc=crc32(t)
|
||
|
t.append(crc&0xff)
|
||
|
t.append((crc>>8)&0xff)
|
||
|
t.append((crc>>16)&0xff)
|
||
|
t.append((crc>>24)&0xff)
|
||
|
return t
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
# 1是写,0,是读
|
||
|
def send_file(com:str,cmd:int,name:str,data:bytearray):
|
||
|
u=protu(com)
|
||
|
u.start_recv(make_dolater(cmd,data,u))
|
||
|
length=len(data)
|
||
|
t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff])
|
||
|
t+=name.encode()
|
||
|
u.send(cmd,t)
|
||
|
u.wait()
|
||
|
|
||
|
|
||
|
# 0x30 开始检测
|
||
|
# 0x31 检测上报
|
||
|
# 0x32 读写方案
|
||
|
# 0x33 连接维护
|
||
|
# 0x34 自检数据
|
||
|
# 0x35 自检上报
|
||
|
# 0x36 升级脚本
|
||
|
# 0xed 主板升级
|
||
|
# 0xee 小板升级
|
||
|
|
||
|
# with open("file/judge.lua","rb") as f:
|
||
|
# send_file("COM5",0x36,f.name.split('/')[-1],f.read())
|
||
|
with open("file/checker_ye_cfg.json","rb") as f:
|
||
|
json_obj=json.loads(f.read())
|
||
|
d=scheme_to_byte(json_obj)
|
||
|
print("len=",len(d),d.hex(","))
|
||
|
# send_file("COM5",0x32,f.name.split('/')[-1],f.read())
|
||
|
|
||
|
|