Files
python_tools/coder_2ch/prottcp.py

516 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import serial.tools.list_ports
import threading
import time
import json
import socket
from PyQt5.QtCore import *
def crc16(data:bytearray,offset:int,len:int):
if(len>0):
crc=0xffff
for i in range(len-offset):
crc=(crc^data[i+offset])&0xffff
for j in range(8):
if(crc&1)!=0:
crc=((crc>>1)^0xa001)&0xffff
else:
crc=(crc>>1)&0xffff
return crc&0xff,(crc>>8)&0xff
return 0,0
def crc32(data:bytearray):
temp=0
crc=0xffffffff
i=0
if(len(data)%4!=0):
return 0
while(i<len(data)):
temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24)
i+=4
for j in range(32):
if((crc^temp)&0x80000000)!=0:
crc=0x04c11db7^(crc<<1)
else:
crc<<=1
temp<<=1
crc&=0xffffffff
return crc
# 把tcp封装为串口
class utcp:
is_open=False
def __init__(self,port:int)->None:
self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ser.bind(("",port))
self.ser.settimeout(10)
self.ser.listen(128)
print("wait for mcu connect.")
self.client,self.client_addr=self.ser.accept()
print("client:",self.client_addr)
self.is_open=True
def read(self,len:int):
# return self.ser.recv(len)
return self.client.recv(len)
def write(self,data:bytearray):
# return self.ser.send(data)
return self.client.send(data)
def close(self):
self.client.close()
self.ser.close()
self.is_open=False
# 生成一个任务的参数
def scheme_task_to_byte(j:json):
data=bytearray()
data.append(j["TaskID"])
data.append(j["TaskIndex"])
data.append(j["RetryCount"])
data.append(j["ErrJumpTo"])
data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4))
for i in j["ParamVal"]:
data.append(i&0xff)
data.append(i>>8)
return data
# 生成任务参数序列
def scheme_tasks_to_byte(j:json):
data=bytearray()
for i in j["TaskArray"]:
data+=scheme_task_to_byte(i)
length=len(data)
if(length<(0x700-4)):
for i in range(0x700-4-length):
data.append(0xff)
return data
# 生成任务id序列
def scheme_taskids_to_byte(j:json):
t=bytearray()
t.append(j["PlanID"]&0xff)
t.append((j["PlanID"]>>8)&0xff)
t.append((j["PlanID"]>>16)&0xff)
t.append((j["PlanID"]>>24)&0xff)
for i in j["TaskArray"]:
t.append(i["TaskID"])
length=len(t)
if(length<(0x100)):
for i in range(0x100-length):
t.append(0xff)
return t
# 根据方案生成小板用的字节数据
def scheme_to_byte(j:json):
t=bytearray()
t+=scheme_taskids_to_byte(j)
t+=scheme_tasks_to_byte(j)
crc=crc32(t)
t.append(crc&0xff)
t.append((crc>>8)&0xff)
t.append((crc>>16)&0xff)
t.append((crc>>24)&0xff)
return t
# int转数组
def arr_from_int(num:int):
return bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff])
# 提取方案中的范围数据, 先max后min
def scheme_get_task_range(j:json):
t=bytearray()
t+=arr_from_int(j["TaskID"])
t+=arr_from_int(j["TaskIndex"])
t+=arr_from_int(j["ReturnCount"])
t+=arr_from_int(j["ExecuteErrCode"])
index=0
for i in j["TestStandard"]:
t+=arr_from_int(i["Max"])
t+=arr_from_int(i["Min"])
if (index<len(j["ResultErrCode"])):
t+=arr_from_int(j["ResultErrCode"][index])
else:
t+=arr_from_int(0)
index+=1
# 不足16的部分填充为16
n=16-len(j["TestStandard"])
for i in range(n):
t+=arr_from_int(65535)
t+=arr_from_int(0)
t+=arr_from_int(0)
return t
# 根据方案生成主板用的字节数据
def scheme_to_host(j:json):
t=bytearray()
t+=arr_from_int(j["PlanID"])
t+=arr_from_int(j["TimeOutM"])
t+=arr_from_int(len(j["TaskArray"]))
for i in j["TaskArray"]:
t+=scheme_get_task_range(i)
return t
# 发送文件类
class handle:
stat=0
name=""
def __init__(self) -> None:
pass
def get_rate(self):
return self.packet_now*100//self.packet_all
def get_data(self):
if(self.stat==0):
length=len(self.data)
# 1是写0是读
t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff])
t+=self.name.encode()
self.stat=1
return t
elif(self.stat==1):
packet_bytes=200
if(len(self.data)<self.sent_bytes):
packet_bytes=len(self.data)-self.sent_bytes
if(packet_bytes>0):
self.packet_now+=1
t=bytearray()
t.append(2)
t.append(self.packet_now&0xff)
t.append((self.packet_now>>8)&0xff)
t.append(self.packet_all&0xff)
t.append((self.packet_all>>8)&0xff)
t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes]
self.sent_bytes+=packet_bytes
return t
else:
print("send done.")
return bytearray()
# 设置要发送的文件
def set_file(self,name:str):
self.data=bytearray()
with open(name,"rb") as f:
self.data=f.read()
self.name=f.name.split('/')[-1]
self.stat=0
self.packet_all=(len(self.data)+199)//200
self.sent_bytes=0
self.packet_now=0
self.packet_bytes=200
# 设置要发送的方案
def set_json(self,name:str):
self.data=bytearray()
with open(name,"rb") as f:
json_obj=json.loads(f.read())
d=scheme_to_byte(json_obj)
# print("len=",len(d),d.hex(","))
d+=scheme_to_host(json_obj)
# print("len=",len(d),d.hex(","))
self.data=d
self.name=f.name.split('/')[-1]
self.stat=0
self.packet_all=(len(self.data)+199)//200
self.sent_bytes=0
self.packet_now=0
self.packet_bytes=200
class protu(QObject):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
# 接收到数据信号
recv_signal =pyqtSignal([int,bytearray,str])
hand=handle()
def __init__(self) -> None:
QObject.__init__(self)
self.cmd=0
self.cmd_no=0
self.str_err=""
self.is_big_data=False
self.num_to_recv=0
self.recv_data=bytearray()
def init(self,com:str):
s=com.split(":")
try:
if(s[0]=="utcp"):
self.ser = utcp(int(s[1]))
else:
bsp=int(s[1])
self.ser = serial.Serial(port=s[0], baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,timeout=None)
except Exception as e:
print(str(e))
return False
return True
def encode(self,data:bytearray):
t=bytearray()
length=len(data)+3
t.append(0x59)
t.append(0x6d)
t.append(length&0xff)
t.append(length>>8)
t.append(self.cmd)
t.append(self.cmd_no&0xff)
t.append(self.cmd_no>>8)
t+=data
a,b=crc16(t,2,length+4)
t.append(a)
t.append(b)
# print("encode:",t.hex(","))
return t
def decode(self,data:bytearray):
self.str_err="ok"
if(len(data)<10):
print("recv data len too less.")
self.str_err="recv data len too less."
return bytearray()
if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
# print("frame head not 0x59 0x6d.")
self.str_err="frame head not 0x59 0x6d."
return bytearray()
length=data[3]|(data[4]<<8)
if(length==65535):
# 重新设置数据长度
length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
self.is_big_data=True
else:
self.is_big_data=False
if(length+7!=len(data)):
print("recv data have lossed")
self.str_err="recv data have lossed"
return bytearray()
a,b=crc16(data,3,length+5)
if(a!=data[-2] or b!=data[-1]):
print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
self.str_err="recv data check error."
self.cmd_no=data[6]|(data[7]<<8)
self.cmd=data[5]
if(self.is_big_data==False):
return bytearray(data[8:-2])
else:
return bytearray(data[12:-2])
# 不带校验的接收函数
def recv_(self):
while(self.ser.is_open):
try:
data=self.ser.read(500)
except Exception as a:
# print("err:",str(a))
print("port closed")
return
if(len(data)>0):
t=self.decode(data)
if(self.str_err=="ok"):
self.recv_data+=t
# print("recv",t.hex(","))
# print(type(self.cmd),type(t),type(self.str_err))
self.recv_signal.emit(self.cmd,t,self.str_err)
# self.send_file_next(self.cmd,t,self.str_err)
# print("sent signal---")
else:
print(data.decode("utf-8"))
# 带帧校验的接收函数
def recv(self):
# self.recv_signal.connect(self.send_file_next)
data=bytearray()
while(self.ser.is_open):
d=bytes()
try:
d=self.ser.read(1)
except Exception as a:
# print("err:",str(a))
print("port closed")
return
data+=d
if(len(data)==3):
if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
self.num_to_recv=5
else:
data=data[1:]
self.num_to_recv=0
elif(len(data)==5):
length=data[3]|(data[4]<<8)
if(length<65535):
self.num_to_recv+=length+2
self.is_big_data=False
else:
self.num_to_recv=12
self.is_big_data=True
elif(len(data)==12):
if(self.is_big_data==True):
length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
self.num_to_recv=5+length+2
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
# print("recv",data.hex(","))
t=self.decode(data)
self.recv_data+=t
self.recv_signal.emit(self.cmd,t,self.str_err)
# self.send_file_next(self.cmd,t,self.str_err)
# print("sent signal---")
data.clear()
# else:
# print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
def send(self,cmd:int,data:bytearray):
self.cmd=cmd
self.cmd_no+=1
d=self.encode(data)
# print("send",d.hex(","))
self.ser.write(d)
def send_str(self,txt:str):
self.ser.write(txt.encode("utf-8"))
def start_recv(self):
self.thread_ = threading.Thread(target=self.recv, args=())
self.thread_.start()
def wait(self):
self.thread_.join()
def close(self):
try:
if(self.ser.is_open):
self.ser.close()
except Exception as e:
print(str(e))
def send_file_next(self,cmd:int,data:bytearray,err:str):
print("send_file_next")
data=self.hand.get_data()
if(len(data)>0):
self.send(cmd,data)
else:
self.close()
def send_file(self,cmd:int,name:str):
self.start_recv()
self.hand.set_file(name)
# self.send_file_next(cmd,bytearray([0]),"ok")
self.recv_signal.emit(cmd,bytearray([0]),"ok")
self.wait()
# 0x30 开始检测
# 0x31 检测上报
# 0x32 读写方案
# 0x33 连接维护
# 0x34 自检数据
# 0x35 自检上报
# 0x36 升级脚本
# 0xed 主板升级
# 0xee 小板升级
# 0x40 电机矫正
# 0x41 小板电阻矫正
# 0x42 小板电阻测量
# 0x43 小板硬件版本
# with open("file/judge.lua","rb") as f:
# send_file("COM5",0x36,f.name.split('/')[-1],f.read())
# with open("file/JQ_JCXB_V54.bin","rb") as f:
# send_file("COM5",0xee,f.name.split('/')[-1],f.read())
# with open("file/checker_ye_cfg.json","rb") as f:
# json_obj=json.loads(f.read())
# d=scheme_to_byte(json_obj)
# print("len=",len(d),d.hex(","))
# d=scheme_to_host(json_obj)
# print("len=",len(d),d.hex(","))
# send_file("COM5",0x32,f.name.split('/')[-1],f.read())
if __name__ == "__main__":
u=protu()
# u.init("utcp:7777")
# u.send_file(0xee,"file/JQ_JCXB_V54.bin")
# u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin")
# 设置电阻 矫正值
# u.cmd=0x41
# data=bytearray([1,100,0x00,2,0,0x00,3,0,0x00,4,0,0x00,5,0,0x00,6,0,0x00,7,0,0x00,8,0,0x00,9,0,0x00,10,0,0x00,11,0,0x00,12,0,0x00,13,0,0x00,14,0,0x00,15,0,0x00,16,0,0x00,17,0,0x00,18,0,0x00,19,0,0x00,20,0,0x00])
# 测量电阻
u.cmd=0x42
data=bytearray([5])
# 设置硬件版本号
# u.cmd=0x43
# data=bytearray([1,50,0x00,2,51,0x00,3,52,0x00,4,53,0x00,5,54,0x00,6,55,0x00,7,56,0x00,8,57,0x00,9,58,0x00,10,59,0x00,11,60,0x00,12,61,0x00,13,62,0x00,14,63,0x00,15,64,0x00,16,65,0x00,17,66,0x00,18,67,0x00,19,68,0x00,20,69,0x00])
# 设置电机校正值
# u.cmd=0x40
# data=bytearray([0x01,100,0])
# data=bytearray([0x02]) # 上升
# data=bytearray([0x03]) # 下降
print(u.encode(data).hex(' '))
# with open("file/EX_Coder_Test_2023-07-6.json","rb") as f:
# json_obj=json.loads(f.read())
# d=scheme_to_byte(json_obj)
# print("len=",len(d),d.hex(","))
# d+=scheme_to_host(json_obj)
# print(int2str(20))
# s="{d:03d}".format(d=2)
# print(s)
# with open("file/7-15.json","rb") as f:
# u.cmd=0x22
# p=u.encode(f.read())
# print(p.hex(' '))
# 开始检测
# 59 6d 03 00 30 00 00 60 0f
# 结束应答
# 59 6D 03 00 31 00 00 31 CF