230 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			230 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import serial | ||
|  | import serial.tools.list_ports | ||
|  | import threading | ||
|  | import time | ||
|  | import socket | ||
|  | import checker_save | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | # 使用批检仪和赋码仪联动, | ||
|  | # 批检仪控制电机,赋码仪注码 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | def crc16(data:bytearray,offset:int,len:int): | ||
|  |     if(len>0): | ||
|  |         crc=0xffff | ||
|  |         for i in range(len-offset): | ||
|  |             crc=(crc^data[i+offset])&0xffff | ||
|  |             for j in range(8): | ||
|  |                 if(crc&1)!=0: | ||
|  |                     crc=((crc>>1)^0xa001)&0xffff | ||
|  |                 else: | ||
|  |                     crc=(crc>>1)&0xffff | ||
|  |         return crc&0xff,(crc>>8)&0xff | ||
|  |     return 0,0 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | # 把tcp封装为串口 | ||
|  | class utcp: | ||
|  |     is_open=False | ||
|  |     def __init__(self,port:int)->None: | ||
|  |         self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
|  |         self.ser.bind(("",port)) | ||
|  |         self.ser.settimeout(10) | ||
|  |         self.ser.listen(128) | ||
|  |         print("wait for mcu connect.") | ||
|  |         self.client,self.client_addr=self.ser.accept() | ||
|  |         print("client:",self.client_addr) | ||
|  |         self.is_open=True | ||
|  |     def read(self,len:int): | ||
|  |         return self.client.recv(len) | ||
|  |     def write(self,data:bytearray): | ||
|  |         return self.client.send(data) | ||
|  |     def close(self): | ||
|  |         self.client.close() | ||
|  |         self.ser.close() | ||
|  |         self.is_open=False | ||
|  | 
 | ||
|  | class port: | ||
|  |     def __init__(self) -> None: | ||
|  |         self.cmd=0 | ||
|  |         self.cmd_no=0 | ||
|  |         self.str_err="" | ||
|  |         self.is_big_data=False | ||
|  |         self.num_to_recv=0 | ||
|  |         self.recv_data=bytearray() | ||
|  |     def open(self,name:str,bsp:int): | ||
|  |         if(name!="utcp"): | ||
|  |             self.ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, | ||
|  |                         stopbits=serial.STOPBITS_ONE,timeout=None) | ||
|  |         else: | ||
|  |             self.ser=utcp(bsp) | ||
|  |     # 打开赋码仪端口 | ||
|  |     def open_code(self,name:str,bsp:int): | ||
|  |         if(name!="utcp"): | ||
|  |             self.code_ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, | ||
|  |                         stopbits=serial.STOPBITS_ONE,timeout=None) | ||
|  |         else: | ||
|  |             self.code_ser=utcp(bsp) | ||
|  |          | ||
|  |     def start_recv(self): | ||
|  |         self.thread_ = threading.Thread(target=self.recv, args=()) | ||
|  |         self.thread_.start() | ||
|  |     def decode(self,data:bytearray): | ||
|  |         self.str_err="ok" | ||
|  |         if(len(data)<10): | ||
|  |             print("recv data len too less.") | ||
|  |             self.str_err="recv data len too less." | ||
|  |             return bytearray() | ||
|  |         if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43): | ||
|  |             print("frame head not 0x59 0x6d.") | ||
|  |             self.str_err="frame head not 0x59 0x6d." | ||
|  |             return bytearray() | ||
|  |         length=data[3]|(data[4]<<8) | ||
|  |         if(length==65535): | ||
|  |             # 重新设置数据长度 | ||
|  |             length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24); | ||
|  |             self.is_big_data=True | ||
|  |         else: | ||
|  |             self.is_big_data=False | ||
|  |         if(length+7!=len(data)): | ||
|  |             print("recv data have lossed") | ||
|  |             self.str_err="recv data have lossed" | ||
|  |             return bytearray() | ||
|  |         a,b=crc16(data,3,length+5) | ||
|  |         if(a!=data[-2] or b!=data[-1]): | ||
|  |             print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1]) | ||
|  |             self.str_err="recv data check error." | ||
|  |         self.cmd_no=data[6]|(data[7]<<8) | ||
|  |         self.cmd=data[5] | ||
|  |         if(self.is_big_data==False): | ||
|  |             return data[8:-2] | ||
|  |         else: | ||
|  |             return data[12:-2] | ||
|  |     def recv(self): | ||
|  |         data=bytearray() | ||
|  |         while(True): | ||
|  |             d=bytes() | ||
|  |             try: | ||
|  |                 d=self.ser.read(1) | ||
|  |             except Exception as a: | ||
|  |                 # print("err:",str(a)) | ||
|  |                 print("port closed") | ||
|  |                 return | ||
|  |             data+=d | ||
|  |             if(len(data)==3): | ||
|  |                 if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43): | ||
|  |                     self.num_to_recv=5 | ||
|  |                 else: | ||
|  |                     data=data[1:] | ||
|  |                     self.num_to_recv=0 | ||
|  |             elif(len(data)==5): | ||
|  |                 length=data[3]|(data[4]<<8) | ||
|  |                 if(length<65535): | ||
|  |                     self.num_to_recv+=length+2 | ||
|  |                     self.is_big_data=False | ||
|  |                 else: | ||
|  |                     self.num_to_recv=12 | ||
|  |                     self.is_big_data=True | ||
|  |             elif(len(data)==12): | ||
|  |                 if(self.is_big_data==True): | ||
|  |                     length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24) | ||
|  |                     self.num_to_recv=5+length+2 | ||
|  |             if(self.num_to_recv>0 and self.num_to_recv==len(data)): | ||
|  |                 # print("recv:",data.hex(" ")) | ||
|  |                 return self.decode(data) | ||
|  |     def recv_code(self,num:int): | ||
|  |         d=bytearray() | ||
|  |         try: | ||
|  |             while(num>len(d)): | ||
|  |                 d+=self.code_ser.read(num-len(d)) | ||
|  |         except Exception as a: | ||
|  |             print("port closed") | ||
|  |             return 0 | ||
|  |         print("recv code:",d.hex(" ")) | ||
|  |         # return d[0] | ||
|  |     def send_code(self,data:bytearray): | ||
|  |         self.code_ser.write(data) | ||
|  |         print("send code:",data.hex(' ')) | ||
|  |     def encode(self,data:bytearray): | ||
|  |         t=bytearray() | ||
|  |         length=len(data)+3 | ||
|  |         t.append(0x59) | ||
|  |         t.append(0x6d) | ||
|  |         t.append(length&0xff) | ||
|  |         t.append(length>>8) | ||
|  |         t.append(self.cmd) | ||
|  |         t.append(self.cmd_no&0xff) | ||
|  |         t.append(self.cmd_no>>8) | ||
|  |         t+=data | ||
|  |         a,b=crc16(t,2,length+4) | ||
|  |         t.append(a) | ||
|  |         t.append(b) | ||
|  |         # print("encode:",t.hex(",")) | ||
|  |         return t | ||
|  |     def send(self,cmd:int,data:bytearray): | ||
|  |         self.cmd=cmd | ||
|  |         self.cmd_no+=1 | ||
|  |         d=self.encode(data) | ||
|  |         # print("send:",d.hex(",")) | ||
|  |         self.ser.write(d) | ||
|  |     # 测试批检仪 | ||
|  |     def checker_test(self,times:int,scheme:str,script:str): | ||
|  |         print("批检仪测试:") | ||
|  |         tick=0 | ||
|  |         save=checker_save.save(scheme,script) | ||
|  |         data=bytearray() | ||
|  |         while(tick<times): | ||
|  |             tick+=1 | ||
|  |             print("当前={d1},总共={d2}".format(d1=tick,d2=times)) | ||
|  |             start = time.perf_counter() | ||
|  |             # 接收请求检测命令 | ||
|  |             data=self.recv() | ||
|  |             if(self.cmd==0x37): | ||
|  |                 self.send(0x30,bytearray([0])) | ||
|  |                 # 接收指令应答 | ||
|  |                 self.recv() | ||
|  |                 # 延时2秒等电机下压 | ||
|  |                 time.sleep(2) | ||
|  |                 # 开始检测 | ||
|  |                 self.send_code(bytearray([0xB0, 0x00, 0x02, 0x00, 0x60])) | ||
|  |                 # 接收指令应答 | ||
|  |                 self.recv_code(5) | ||
|  |                 # 接收检测结果 | ||
|  |                 self.recv_code(24) | ||
|  |                 # 开始注码 | ||
|  |                 self.send_code(bytearray([0xF0, 0x00, 0x02, 0xE0, 0x41, 0xB1, 0x00, 0x02, 0x00, 0xFB])) | ||
|  |                 # 接收指令应答 | ||
|  |                 self.recv_code(5) | ||
|  |                 # 接收注码 结果 | ||
|  |                 self.recv_code(305) | ||
|  |                 # 打标结束,保存数据库 | ||
|  |                 self.send_code(bytearray([0xF0, 0x00, 0x02, 0xE2, 0x23, 0xB2, 0x00, 0x02, 0x00, 0x67])) | ||
|  |                 # 接收指令应答 | ||
|  |                 self.recv_code(5) | ||
|  | 
 | ||
|  |                 # 接收检测结果 | ||
|  |                 data=self.recv() | ||
|  |                 if(self.cmd==0x31): | ||
|  |                     self.send(0x31,bytearray([0])) | ||
|  |                     # save.save(data) | ||
|  |                 end = time.perf_counter() | ||
|  |                 print("检测耗时:",end-start) | ||
|  | 
 | ||
|  | if __name__ == "__main__": | ||
|  |     p=port() | ||
|  |     # 批检仪测试 | ||
|  |     p.open("com5",115200) | ||
|  |     p.open_code("com8",115200) | ||
|  |     # p.open("utcp",9527) | ||
|  |     p.checker_test(5000,"file/EX_Coder_Test_2023-07-4.json","file/judge-xt.lua") |