添加批检仪和赋码仪联动的脚本 code_with_checker.py
This commit is contained in:
@@ -174,6 +174,6 @@ class port:
|
||||
if __name__ == "__main__":
|
||||
p=port()
|
||||
# 批检仪测试
|
||||
# p.open("com5",115200)
|
||||
p.open("utcp",9527)
|
||||
p.checker_test(5000,"file/EX_Coder_Test_2023-07-4.json","file/judge-xt.lua")
|
||||
p.open("com16",115200)
|
||||
# p.open("utcp",9527)
|
||||
p.checker_test(5000,"file/EX_Coder_Test_2023-07-7.json","file/judge-xt.lua")
|
||||
|
229
python/code_with_checker.py
Normal file
229
python/code_with_checker.py
Normal file
@@ -0,0 +1,229 @@
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
import threading
|
||||
import time
|
||||
import socket
|
||||
import checker_save
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# 使用批检仪和赋码仪联动,
|
||||
# 批检仪控制电机,赋码仪注码
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def crc16(data:bytearray,offset:int,len:int):
|
||||
if(len>0):
|
||||
crc=0xffff
|
||||
for i in range(len-offset):
|
||||
crc=(crc^data[i+offset])&0xffff
|
||||
for j in range(8):
|
||||
if(crc&1)!=0:
|
||||
crc=((crc>>1)^0xa001)&0xffff
|
||||
else:
|
||||
crc=(crc>>1)&0xffff
|
||||
return crc&0xff,(crc>>8)&0xff
|
||||
return 0,0
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# 把tcp封装为串口
|
||||
class utcp:
|
||||
is_open=False
|
||||
def __init__(self,port:int)->None:
|
||||
self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.ser.bind(("",port))
|
||||
self.ser.settimeout(10)
|
||||
self.ser.listen(128)
|
||||
print("wait for mcu connect.")
|
||||
self.client,self.client_addr=self.ser.accept()
|
||||
print("client:",self.client_addr)
|
||||
self.is_open=True
|
||||
def read(self,len:int):
|
||||
return self.client.recv(len)
|
||||
def write(self,data:bytearray):
|
||||
return self.client.send(data)
|
||||
def close(self):
|
||||
self.client.close()
|
||||
self.ser.close()
|
||||
self.is_open=False
|
||||
|
||||
class port:
|
||||
def __init__(self) -> None:
|
||||
self.cmd=0
|
||||
self.cmd_no=0
|
||||
self.str_err=""
|
||||
self.is_big_data=False
|
||||
self.num_to_recv=0
|
||||
self.recv_data=bytearray()
|
||||
def open(self,name:str,bsp:int):
|
||||
if(name!="utcp"):
|
||||
self.ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,timeout=None)
|
||||
else:
|
||||
self.ser=utcp(bsp)
|
||||
# 打开赋码仪端口
|
||||
def open_code(self,name:str,bsp:int):
|
||||
if(name!="utcp"):
|
||||
self.code_ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,timeout=None)
|
||||
else:
|
||||
self.code_ser=utcp(bsp)
|
||||
|
||||
def start_recv(self):
|
||||
self.thread_ = threading.Thread(target=self.recv, args=())
|
||||
self.thread_.start()
|
||||
def decode(self,data:bytearray):
|
||||
self.str_err="ok"
|
||||
if(len(data)<10):
|
||||
print("recv data len too less.")
|
||||
self.str_err="recv data len too less."
|
||||
return bytearray()
|
||||
if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
|
||||
print("frame head not 0x59 0x6d.")
|
||||
self.str_err="frame head not 0x59 0x6d."
|
||||
return bytearray()
|
||||
length=data[3]|(data[4]<<8)
|
||||
if(length==65535):
|
||||
# 重新设置数据长度
|
||||
length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
|
||||
self.is_big_data=True
|
||||
else:
|
||||
self.is_big_data=False
|
||||
if(length+7!=len(data)):
|
||||
print("recv data have lossed")
|
||||
self.str_err="recv data have lossed"
|
||||
return bytearray()
|
||||
a,b=crc16(data,3,length+5)
|
||||
if(a!=data[-2] or b!=data[-1]):
|
||||
print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
|
||||
self.str_err="recv data check error."
|
||||
self.cmd_no=data[6]|(data[7]<<8)
|
||||
self.cmd=data[5]
|
||||
if(self.is_big_data==False):
|
||||
return data[8:-2]
|
||||
else:
|
||||
return data[12:-2]
|
||||
def recv(self):
|
||||
data=bytearray()
|
||||
while(True):
|
||||
d=bytes()
|
||||
try:
|
||||
d=self.ser.read(1)
|
||||
except Exception as a:
|
||||
# print("err:",str(a))
|
||||
print("port closed")
|
||||
return
|
||||
data+=d
|
||||
if(len(data)==3):
|
||||
if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
|
||||
self.num_to_recv=5
|
||||
else:
|
||||
data=data[1:]
|
||||
self.num_to_recv=0
|
||||
elif(len(data)==5):
|
||||
length=data[3]|(data[4]<<8)
|
||||
if(length<65535):
|
||||
self.num_to_recv+=length+2
|
||||
self.is_big_data=False
|
||||
else:
|
||||
self.num_to_recv=12
|
||||
self.is_big_data=True
|
||||
elif(len(data)==12):
|
||||
if(self.is_big_data==True):
|
||||
length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
|
||||
self.num_to_recv=5+length+2
|
||||
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
|
||||
# print("recv:",data.hex(" "))
|
||||
return self.decode(data)
|
||||
def recv_code(self,num:int):
|
||||
d=bytearray()
|
||||
try:
|
||||
while(num>len(d)):
|
||||
d+=self.code_ser.read(num-len(d))
|
||||
except Exception as a:
|
||||
print("port closed")
|
||||
return 0
|
||||
print("recv code:",d.hex(" "))
|
||||
# return d[0]
|
||||
def send_code(self,data:bytearray):
|
||||
self.code_ser.write(data)
|
||||
print("send code:",data.hex(' '))
|
||||
def encode(self,data:bytearray):
|
||||
t=bytearray()
|
||||
length=len(data)+3
|
||||
t.append(0x59)
|
||||
t.append(0x6d)
|
||||
t.append(length&0xff)
|
||||
t.append(length>>8)
|
||||
t.append(self.cmd)
|
||||
t.append(self.cmd_no&0xff)
|
||||
t.append(self.cmd_no>>8)
|
||||
t+=data
|
||||
a,b=crc16(t,2,length+4)
|
||||
t.append(a)
|
||||
t.append(b)
|
||||
# print("encode:",t.hex(","))
|
||||
return t
|
||||
def send(self,cmd:int,data:bytearray):
|
||||
self.cmd=cmd
|
||||
self.cmd_no+=1
|
||||
d=self.encode(data)
|
||||
# print("send:",d.hex(","))
|
||||
self.ser.write(d)
|
||||
# 测试批检仪
|
||||
def checker_test(self,times:int,scheme:str,script:str):
|
||||
print("批检仪测试:")
|
||||
tick=0
|
||||
save=checker_save.save(scheme,script)
|
||||
data=bytearray()
|
||||
while(tick<times):
|
||||
tick+=1
|
||||
print("当前={d1},总共={d2}".format(d1=tick,d2=times))
|
||||
start = time.perf_counter()
|
||||
# 接收请求检测命令
|
||||
data=self.recv()
|
||||
if(self.cmd==0x37):
|
||||
self.send(0x30,bytearray([0]))
|
||||
# 接收指令应答
|
||||
self.recv()
|
||||
# 延时2秒等电机下压
|
||||
time.sleep(2)
|
||||
# 开始检测
|
||||
self.send_code(bytearray([0xB0, 0x00, 0x02, 0x00, 0x60]))
|
||||
# 接收指令应答
|
||||
self.recv_code(5)
|
||||
# 接收检测结果
|
||||
self.recv_code(24)
|
||||
# 开始注码
|
||||
self.send_code(bytearray([0xF0, 0x00, 0x02, 0xE0, 0x41, 0xB1, 0x00, 0x02, 0x00, 0xFB]))
|
||||
# 接收指令应答
|
||||
self.recv_code(5)
|
||||
# 接收注码 结果
|
||||
self.recv_code(305)
|
||||
# 打标结束,保存数据库
|
||||
self.send_code(bytearray([0xF0, 0x00, 0x02, 0xE2, 0x23, 0xB2, 0x00, 0x02, 0x00, 0x67]))
|
||||
# 接收指令应答
|
||||
self.recv_code(5)
|
||||
|
||||
# 接收检测结果
|
||||
data=self.recv()
|
||||
if(self.cmd==0x31):
|
||||
self.send(0x31,bytearray([0]))
|
||||
# save.save(data)
|
||||
end = time.perf_counter()
|
||||
print("检测耗时:",end-start)
|
||||
|
||||
if __name__ == "__main__":
|
||||
p=port()
|
||||
# 批检仪测试
|
||||
p.open("com5",115200)
|
||||
p.open_code("com8",115200)
|
||||
# p.open("utcp",9527)
|
||||
p.checker_test(5000,"file/EX_Coder_Test_2023-07-4.json","file/judge-xt.lua")
|
BIN
python/file/Ym_Coder_Lock.bin
Normal file
BIN
python/file/Ym_Coder_Lock.bin
Normal file
Binary file not shown.
@@ -424,6 +424,10 @@ class protu(QObject):
|
||||
# 0x36 升级脚本
|
||||
# 0xed 主板升级
|
||||
# 0xee 小板升级
|
||||
# 0x40 电机矫正
|
||||
# 0x41 小板电阻矫正
|
||||
# 0x42 小板电阻测量
|
||||
# 0x43 小板硬件版本
|
||||
|
||||
# with open("file/judge.lua","rb") as f:
|
||||
# send_file("COM5",0x36,f.name.split('/')[-1],f.read())
|
||||
@@ -451,18 +455,32 @@ if __name__ == "__main__":
|
||||
# u.init("utcp:7777")
|
||||
# u.send_file(0xee,"file/JQ_JCXB_V54.bin")
|
||||
# u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin")
|
||||
u.cmd=0x30
|
||||
u.cmd_no=0x15|(0xaf<<8)
|
||||
print(u.encode(bytearray([0x00])).hex(' '))
|
||||
|
||||
# 设置电阻 矫正值
|
||||
# u.cmd=0x41
|
||||
# data=bytearray([1,50,0x00,2,51,0x00,3,52,0x00,4,53,0x00,5,54,0x00,6,55,0x00,7,56,0x00,8,57,0x00,9,58,0x00,10,59,0x00,11,60,0x00,12,61,0x00,13,62,0x00,14,63,0x00,15,64,0x00,16,65,0x00,17,66,0x00,18,67,0x00,19,68,0x00,20,69,0x00])
|
||||
# 测量电阻
|
||||
# u.cmd=0x42
|
||||
# data=bytearray([0])
|
||||
# 设置硬件版本号
|
||||
# u.cmd=0x43
|
||||
# data=bytearray([1,50,0x00,2,51,0x00,3,52,0x00,4,53,0x00,5,54,0x00,6,55,0x00,7,56,0x00,8,57,0x00,9,58,0x00,10,59,0x00,11,60,0x00,12,61,0x00,13,62,0x00,14,63,0x00,15,64,0x00,16,65,0x00,17,66,0x00,18,67,0x00,19,68,0x00,20,69,0x00])
|
||||
# 设置电机校正值
|
||||
u.cmd=0x40
|
||||
# data=bytearray([0x01,100,0])
|
||||
# data=bytearray([0x02]) # 上升
|
||||
data=bytearray([0x03]) # 下降
|
||||
|
||||
print(u.encode(data).hex(' '))
|
||||
# with open("file/EX_Coder_Test_2023-07-6.json","rb") as f:
|
||||
# json_obj=json.loads(f.read())
|
||||
# d=scheme_to_byte(json_obj)
|
||||
# print("len=",len(d),d.hex(","))
|
||||
# d+=scheme_to_host(json_obj)
|
||||
|
||||
print(int2str(20))
|
||||
s="{d:03d}".format(d=2)
|
||||
print(s)
|
||||
# print(int2str(20))
|
||||
# s="{d:03d}".format(d=2)
|
||||
# print(s)
|
||||
|
||||
|
||||
# 开始检测
|
||||
|
Reference in New Issue
Block a user