115 lines
2.0 KiB
Python
115 lines
2.0 KiB
Python
|
||
import os
|
||
import sys
|
||
import json
|
||
|
||
|
||
|
||
|
||
# 数据包编解码
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
# 计算整个数据流的CRC
|
||
def _crc16(data:bytearray):
|
||
lenth=len(data)
|
||
if(lenth>0):
|
||
crc=0xffff
|
||
for i in range(lenth):
|
||
crc=(crc^data[i])&0xffff
|
||
for j in range(8):
|
||
if(crc&1)!=0:
|
||
crc=((crc>>1)^0xa001)&0xffff
|
||
else:
|
||
crc=(crc>>1)&0xffff
|
||
return bytearray([crc&0xff,(crc>>8)&0xff])
|
||
return bytearray([0,0])
|
||
|
||
|
||
# 0xff帧起始,0xfe帧结尾
|
||
# 0xf0转义标识
|
||
# 0xf1命令数据分割
|
||
# 0xf2数据校验分割
|
||
|
||
# 字节数据大于等于0xf0 之后要转义
|
||
def _tran(data:bytearray):
|
||
ret=bytearray()
|
||
for item in data:
|
||
if(item>=0xf0):
|
||
ret.append(0xf0)
|
||
ret.append(item-0xf0)
|
||
else:
|
||
ret.append(item)
|
||
return ret
|
||
|
||
# 去转义
|
||
def _untran(data:bytearray):
|
||
ret=bytearray()
|
||
i=0
|
||
length=len(data)
|
||
while i<length:
|
||
if(data[i]>=0xf0):
|
||
ret.append(data[i]+data[i+1])
|
||
i+=1
|
||
else:
|
||
ret.append(data[i])
|
||
i+=1
|
||
return ret
|
||
|
||
|
||
# 以命令和数据编码
|
||
def encode(cmd:bytearray,data:bytearray):
|
||
crc=_crc16(cmd+data)
|
||
ret=bytearray()
|
||
ret.append(0xff)
|
||
ret+=_tran(cmd)
|
||
ret.append(0xf1)
|
||
ret+=_tran(data)
|
||
ret.append(0xf2)
|
||
ret+=_tran(crc)
|
||
ret.append(0xfe)
|
||
return ret
|
||
|
||
|
||
# 解码为命令和数据
|
||
def decode(data:bytearray):
|
||
# print('prot_decode:',data.hex(' '))
|
||
while len(data)>0:
|
||
if(data[0]!=0xff):
|
||
data=data[1:]
|
||
else:
|
||
break
|
||
if(len(data)==0):
|
||
return (None,None)
|
||
if(data[0]==0xff and data[-1]==0xfe):
|
||
data=data[1:-1]
|
||
cmd_index=data.find(0xf1)
|
||
data_index=data.find(0xf2)
|
||
cmd=_untran(data[:cmd_index])
|
||
da=_untran(data[cmd_index+1:data_index])
|
||
crc=_untran(data[data_index+1:])
|
||
if(crc==_crc16(cmd+da)):
|
||
return (cmd,da)
|
||
return (None,None)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
data=encode(json.dumps({'device':'client','option':'data'}).encode('utf-8'),bytearray([0xff,0xfd,0xff,0xde,0xde]))
|
||
print(data.hex(' '))
|
||
print(decode(data))
|
||
print(b'\xff'.hex(' '))
|
||
|
||
|
||
|
||
|