import os import sys import json # 数据包编解码 # 计算整个数据流的CRC def _crc16(data:bytearray): lenth=len(data) if(lenth>0): crc=0xffff for i in range(lenth): crc=(crc^data[i])&0xffff for j in range(8): if(crc&1)!=0: crc=((crc>>1)^0xa001)&0xffff else: crc=(crc>>1)&0xffff return bytearray([crc&0xff,(crc>>8)&0xff]) return bytearray([0,0]) # 0xff帧起始,0xfe帧结尾 # 0xf0转义标识 # 0xf1命令数据分割 # 0xf2数据校验分割 # 字节数据大于等于0xf0 之后要转义 def _tran(data:bytearray): ret=bytearray() for item in data: if(item>=0xf0): ret.append(0xf0) ret.append(item-0xf0) else: ret.append(item) return ret # 去转义 def _untran(data:bytearray): ret=bytearray() i=0 length=len(data) while i=0xf0): ret.append(data[i]+data[i+1]) i+=1 else: ret.append(data[i]) i+=1 return ret # 以命令和数据编码 def encode(cmd:bytearray,data:bytearray): crc=_crc16(cmd+data) ret=bytearray() ret.append(0xff) ret+=_tran(cmd) ret.append(0xf1) ret+=_tran(data) ret.append(0xf2) ret+=_tran(crc) ret.append(0xfe) return ret # 解码为命令和数据 def decode(data:bytearray): # print('prot_decode:',data.hex(' ')) while len(data)>0: if(data[0]!=0xff): data=data[1:] else: break if(len(data)==0): return (None,None) if(data[0]==0xff and data[-1]==0xfe): data=data[1:-1] cmd_index=data.find(0xf1) data_index=data.find(0xf2) cmd=_untran(data[:cmd_index]) da=_untran(data[cmd_index+1:data_index]) crc=_untran(data[data_index+1:]) if(crc==_crc16(cmd+da)): return (cmd,da) return (None,None) if __name__ == "__main__": data=encode(json.dumps({'device':'client','option':'data'}).encode('utf-8'),bytearray([0xff,0xfd,0xff,0xde,0xde])) print(data.hex(' ')) print(decode(data)) print(b'\xff'.hex(' '))