Files
checker_slave/python/protmcu.py
ranchuan 17b4ebf188 移植Test命令,验证成功
EJ EX 写工厂信息验证成功
2023-10-12 18:43:17 +08:00

145 lines
3.9 KiB
Python

import serial
import threading
def crc8(data:bytearray):
crc=0
num=len(data)
for j in range(num):
crc^=data[j]
crc&=0xff
for i in range(8):
if((crc&0x01)!=0):
crc>>=1
crc^=0x8c
crc&=0xff
else:
crc>>=1
return crc
class protm:
def __init__(self,com:str) -> None:
self.cmd=0
self.cmd_no=0
self.addr=0
self.host_addr=0
self.str_err=""
self.is_big_data=False
self.num_to_recv=0
self.recv_data=bytearray()
self.com=com
def init(self):
self.ser = serial.Serial(port=self.com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,timeout=None)
def encode(self,data:bytearray):
t=bytearray()
length=len(data)+10
t.append(0x59)
t.append(0x65)
t.append(length&0xff)
t.append(length>>8)
t.append(self.addr)
t.append(self.host_addr)
t.append(self.cmd)
t.append(self.cmd_no&0xff)
t.append(self.cmd_no>>8)
t+=data
a=crc8(t)
t.append(a)
# print("encode:",t.hex(","))
return t
def decode(self,data:bytearray):
self.str_err="ok"
if(len(data)<10):
print("recv data len too less.")
self.str_err="recv data len too less."
return bytearray()
self.host_addr=data[4]
self.addr=data[5]# 回应任意地址的命令
length=data[2]|(data[3]<<8)
crc=crc8(data[:-1])
if(length!=len(data)):
print("recv data have lossed")
self.str_err="recv data have lossed"
return bytearray()
self.cmd_no=data[7]|(data[8]<<8)
self.cmd=data[6]
if(crc!=data[-1]):
print("recv data check error.h_crc=%02x,crc=%02x",crc,data[-1])
self.str_err="recv data check error."
return data[9:-1]
def recv(self):
data=bytearray()
while(self.ser.is_open):
d=bytes()
try:
d=self.ser.read(1)
except Exception as a:
print("err:",str(a))
return
data+=d
if(len(data)==2):
if(data[0]==0x59 and data[1]==0x65):
self.num_to_recv=4
else:
print("data={d}".format(d=data.hex(",")))
data=data[1:]
self.num_to_recv=0
elif(len(data)==4):
length=data[2]|(data[3]<<8)
self.num_to_recv=length
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
t=self.decode(data)
self.recv_data+=t
# print("recv",t.hex(","))
self.handle(self.cmd,t)
data.clear()
# else:
# print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
def send(self,cmd:int,data:bytearray):
self.cmd=cmd
self.cmd_no+=1
print("send:",data.hex(","))
self.ser.write(self.encode(data))
def start_recv(self):
self.thread_ = threading.Thread(target=self.recv, args=())
self.thread_.start()
def wait(self):
self.thread_.join()
def close(self):
if(self.ser.is_open):
self.ser.close()
def handle(self,cmd:int,data:bytearray):
print("cmd={d1},data={d2}".format(d1=cmd,d2=data.hex(",")))
if(self.addr==1):
print("skip addr:1")
return
t=bytearray()
for i in range(23):
t.append(i+1)
self.send(0x16,t)
u=protm("com7")
# u.start_recv()
# u.wait()
u.addr=0
u.host_addr=0
u.cmd=0x19
data=bytearray([0x11,0x22,0x33,0x44])
crc=crc8(data)
data=bytes([4,crc])+data
print(u.encode(data).hex(" "))