145 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			145 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import serial
 | |
| import threading
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| def crc8(data:bytearray):
 | |
|     crc=0
 | |
|     num=len(data)
 | |
|     for j in range(num):
 | |
|         crc^=data[j]
 | |
|         crc&=0xff
 | |
|         for i in range(8):
 | |
|             if((crc&0x01)!=0):
 | |
|                 crc>>=1
 | |
|                 crc^=0x8c
 | |
|                 crc&=0xff
 | |
|             else:
 | |
|                 crc>>=1
 | |
|     return crc
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| class protm:
 | |
|     
 | |
|     def __init__(self,com:str) -> None:
 | |
|         self.cmd=0
 | |
|         self.cmd_no=0
 | |
|         self.addr=0
 | |
|         self.host_addr=0
 | |
|         self.str_err=""
 | |
|         self.is_big_data=False
 | |
|         self.num_to_recv=0
 | |
|         self.recv_data=bytearray()
 | |
|         self.com=com
 | |
|     def init(self):
 | |
|         self.ser = serial.Serial(port=self.com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
 | |
|             stopbits=serial.STOPBITS_ONE,timeout=None)
 | |
|     def encode(self,data:bytearray):
 | |
|         t=bytearray()
 | |
|         length=len(data)+10
 | |
|         t.append(0x59)
 | |
|         t.append(0x65)
 | |
|         t.append(length&0xff)
 | |
|         t.append(length>>8)
 | |
|         t.append(self.addr)
 | |
|         t.append(self.host_addr)
 | |
|         t.append(self.cmd)
 | |
|         t.append(self.cmd_no&0xff)
 | |
|         t.append(self.cmd_no>>8)
 | |
|         t+=data
 | |
|         a=crc8(t)
 | |
|         t.append(a)
 | |
|         # print("encode:",t.hex(","))
 | |
|         return t
 | |
|     def decode(self,data:bytearray):
 | |
|         self.str_err="ok"
 | |
|         if(len(data)<10):
 | |
|             print("recv data len too less.")
 | |
|             self.str_err="recv data len too less."
 | |
|             return bytearray()
 | |
|         self.host_addr=data[4]
 | |
|         self.addr=data[5]# 回应任意地址的命令
 | |
|         length=data[2]|(data[3]<<8)
 | |
|         crc=crc8(data[:-1])
 | |
|         if(length!=len(data)):
 | |
|             print("recv data have lossed")
 | |
|             self.str_err="recv data have lossed"
 | |
|             return bytearray()
 | |
|         self.cmd_no=data[7]|(data[8]<<8)
 | |
|         self.cmd=data[6]
 | |
|         if(crc!=data[-1]):
 | |
|             print("recv data check error.h_crc=%02x,crc=%02x",crc,data[-1])
 | |
|             self.str_err="recv data check error."
 | |
|         return data[9:-1]
 | |
| 
 | |
|     def recv(self):
 | |
|         data=bytearray()
 | |
|         while(self.ser.is_open):
 | |
|             d=bytes()
 | |
|             try:
 | |
|                 d=self.ser.read(1)
 | |
|             except Exception as a:
 | |
|                 print("err:",str(a))
 | |
|                 return
 | |
|             data+=d
 | |
|             if(len(data)==2):
 | |
|                 if(data[0]==0x59 and data[1]==0x65):
 | |
|                     self.num_to_recv=4
 | |
|                 else:
 | |
|                     print("data={d}".format(d=data.hex(",")))
 | |
|                     data=data[1:]
 | |
|                     self.num_to_recv=0
 | |
|             elif(len(data)==4):
 | |
|                 length=data[2]|(data[3]<<8)
 | |
|                 self.num_to_recv=length
 | |
|             if(self.num_to_recv>0 and self.num_to_recv==len(data)):
 | |
|                 t=self.decode(data)
 | |
|                 
 | |
|                 self.recv_data+=t
 | |
|                 # print("recv",t.hex(","))
 | |
|                 self.handle(self.cmd,t)
 | |
|                 data.clear()
 | |
|             # else:
 | |
|             #     print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
 | |
|     def send(self,cmd:int,data:bytearray):
 | |
|         self.cmd=cmd
 | |
|         self.cmd_no+=1
 | |
|         print("send:",data.hex(","))
 | |
|         self.ser.write(self.encode(data))
 | |
|     def start_recv(self):
 | |
|         self.thread_ = threading.Thread(target=self.recv, args=())
 | |
|         self.thread_.start()
 | |
|     def wait(self):
 | |
|         self.thread_.join()
 | |
|     def close(self):
 | |
|         if(self.ser.is_open):
 | |
|             self.ser.close()
 | |
|     def handle(self,cmd:int,data:bytearray):
 | |
|         print("cmd={d1},data={d2}".format(d1=cmd,d2=data.hex(",")))
 | |
|         if(self.addr==1):
 | |
|             print("skip addr:1")
 | |
|             return
 | |
|         t=bytearray()
 | |
|         for i in range(23):
 | |
|             t.append(i+1)
 | |
|         self.send(0x16,t)
 | |
| 
 | |
| 
 | |
| 
 | |
| u=protm("com7")
 | |
| # u.start_recv()
 | |
| # u.wait()
 | |
| u.addr=0
 | |
| u.host_addr=0
 | |
| u.cmd=0x19
 | |
| data=bytearray([0x11,0x22,0x33,0x44])
 | |
| crc=crc8(data)
 | |
| data=bytes([4,crc])+data
 | |
| print(u.encode(data).hex(" "))
 | |
| 
 | 
