按下按键开始检测,批检仪测试脚本

This commit is contained in:
ranchuan
2023-06-26 18:07:08 +08:00
parent cfda1c461b
commit a1a9b53780
13 changed files with 963 additions and 178 deletions

View File

@@ -3,6 +3,26 @@ import serial.tools.list_ports
import threading
import time
import socket
import checker_save
def crc16(data:bytearray,offset:int,len:int):
if(len>0):
crc=0xffff
for i in range(len-offset):
crc=(crc^data[i+offset])&0xffff
for j in range(8):
if(crc&1)!=0:
crc=((crc>>1)^0xa001)&0xffff
else:
crc=(crc>>1)&0xffff
return crc&0xff,(crc>>8)&0xff
return 0,0
# 把tcp封装为串口
class utcp:
@@ -27,7 +47,12 @@ class utcp:
class port:
def __init__(self) -> None:
pass
self.cmd=0
self.cmd_no=0
self.str_err=""
self.is_big_data=False
self.num_to_recv=0
self.recv_data=bytearray()
def open(self,name:str,bsp:int):
if(name!="utcp"):
self.ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
@@ -37,89 +62,116 @@ class port:
def start_recv(self):
self.thread_ = threading.Thread(target=self.recv, args=())
self.thread_.start()
def recv(self,num:int):
d=bytes()
try:
d=self.ser.read(num)
except Exception as a:
print("port closed")
return
# print("recv:",d.hex(" "))
def send(self,data:bytearray):
self.ser.write(data)
# 测试上位机
def ecode_test(self,times:int):
print("检测赋码系统耗时测试:")
def decode(self,data:bytearray):
self.str_err="ok"
if(len(data)<10):
print("recv data len too less.")
self.str_err="recv data len too less."
return bytearray()
if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
print("frame head not 0x59 0x6d.")
self.str_err="frame head not 0x59 0x6d."
return bytearray()
length=data[3]|(data[4]<<8)
if(length==65535):
# 重新设置数据长度
length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
self.is_big_data=True
else:
self.is_big_data=False
if(length+7!=len(data)):
print("recv data have lossed")
self.str_err="recv data have lossed"
return bytearray()
a,b=crc16(data,3,length+5)
if(a!=data[-2] or b!=data[-1]):
print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
self.str_err="recv data check error."
self.cmd_no=data[6]|(data[7]<<8)
self.cmd=data[5]
if(self.is_big_data==False):
return data[8:-2]
else:
return data[12:-2]
def recv(self):
data=bytearray()
while(True):
d=bytes()
try:
d=self.ser.read(1)
except Exception as a:
# print("err:",str(a))
print("port closed")
return
data+=d
if(len(data)==3):
if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
self.num_to_recv=5
else:
data=data[1:]
self.num_to_recv=0
elif(len(data)==5):
length=data[3]|(data[4]<<8)
if(length<65535):
self.num_to_recv+=length+2
self.is_big_data=False
else:
self.num_to_recv=12
self.is_big_data=True
elif(len(data)==12):
if(self.is_big_data==True):
length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
self.num_to_recv=5+length+2
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
# print("recv:",data.hex(" "))
return self.decode(data)
def encode(self,data:bytearray):
t=bytearray()
length=len(data)+3
t.append(0x59)
t.append(0x6d)
t.append(length&0xff)
t.append(length>>8)
t.append(self.cmd)
t.append(self.cmd_no&0xff)
t.append(self.cmd_no>>8)
t+=data
a,b=crc16(t,2,length+4)
t.append(a)
t.append(b)
# print("encode:",t.hex(","))
return t
def send(self,cmd:int,data:bytearray):
self.cmd=cmd
self.cmd_no+=1
# print("send:",data.hex(","))
self.ser.write(self.encode(data))
# 测试批检仪
def checker_test(self,times:int):
print("批检仪测试:")
tick=0
save=checker_save.save()
data=bytearray()
while(tick<times):
tick+=1
print("当前={d1},总共={d2}".format(d1=tick,d2=times))
# 等待数据库写入
time.sleep(3)
start = time.perf_counter()
# 开始检测
self.send(bytearray([0xB0, 0x00, 0x02, 0x00, 0x60]))
# 接收指令应答
self.recv(5)
# 接收检测结果
self.recv(24)
end = time.perf_counter()
print("检测耗时:",end-start)
start2=time.perf_counter()
# 开始注码
self.send(bytearray([0xF0, 0x00, 0x02, 0xE0, 0x41, 0xB1, 0x00, 0x02, 0x00, 0xFB]))
# 接收指令应答
self.recv(5)
# 接收注码 结果
self.recv(305)
end = time.perf_counter()
print("注码耗时:",end-start2)
# 打标结束,保存数据库
self.send(bytearray([0xF0, 0x00, 0x02, 0xE2, 0x23, 0xB2, 0x00, 0x02, 0x00, 0x67]))
# 接收指令应答
self.recv(5)
end = time.perf_counter()
print("总耗时:",end-start)
# 测试注码仪
def coder_test(self,times:int):
print("赋码仪耗时测试:")
tick=0
while(tick<times):
tick+=1
start = time.perf_counter()
print("当前={d1},总共={d2}".format(d1=tick,d2=times))
# 开始检测
self.send(bytearray([0x59, 0x6D, 0x05, 0x00, 0x01, 0x66, 0x04, 0x05, 0x00, 0x2F, 0xD9]))
# 接收指令应答
self.recv(13)
# 接收检测结果
self.recv(72)
end = time.perf_counter()
print("检测耗时:",end-start)
start2=time.perf_counter()
# 开始注码
self.send(bytearray([0x59,0x6D,0x94,0x00,0x02,0xBE,0x01,0x0A,0x32,0x30,0x32,0x33,0x00,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x39,0x01,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x38,0x02,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x37,0x03,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x36,0x04,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x35,0x05,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x34,0x06,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x33,0x07,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x32,0x08,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x31,0x09,0x35,0x38,0x33,0x30,0x36,0x31,0x39,0x41,0x30,0x34,0x30,0x30,0x30,0x44,0xC1]))
# 接收指令应答
self.recv(12)
# 接收注码 结果
self.recv(401)
end = time.perf_counter()
print("注码耗时:",end-start2)
end = time.perf_counter()
print("总耗时:",end-start)
# 接收请求检测命令
data=self.recv()
if(self.cmd==0x37):
self.send(0x30,bytearray())
# 接收指令应答
self.recv()
# 接收检测结果
data=self.recv()
if(self.cmd==0x31):
self.send(0x31,bytearray())
save.save(data)
end = time.perf_counter()
print("检测耗时:",end-start)
if __name__ == "__main__":
p=port()
# 检测赋码仪系统测试
p.open("com8",115200)
p.ecode_test(5)
# 赋码仪设备串口测试
# p.open("com5",9600)
# p.coder_test(5)
# p.open("com5",115200)
# p.coder_test(5)
# 赋码仪设备网口测试
# p.open("utcp",0)
# p.coder_test(5)
# 批检仪测试
p.open("utcp",0)
p.checker_test(5)