115 lines
2.0 KiB
Python
115 lines
2.0 KiB
Python
![]() |
|
|||
|
import os
|
|||
|
import sys
|
|||
|
import json
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# 数据包编解码
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# 计算整个数据流的CRC
|
|||
|
def _crc16(data:bytearray):
|
|||
|
lenth=len(data)
|
|||
|
if(lenth>0):
|
|||
|
crc=0xffff
|
|||
|
for i in range(lenth):
|
|||
|
crc=(crc^data[i])&0xffff
|
|||
|
for j in range(8):
|
|||
|
if(crc&1)!=0:
|
|||
|
crc=((crc>>1)^0xa001)&0xffff
|
|||
|
else:
|
|||
|
crc=(crc>>1)&0xffff
|
|||
|
return bytearray([crc&0xff,(crc>>8)&0xff])
|
|||
|
return bytearray([0,0])
|
|||
|
|
|||
|
|
|||
|
# 0xff帧起始,0xfe帧结尾
|
|||
|
# 0xf0转义标识
|
|||
|
# 0xf1命令数据分割
|
|||
|
# 0xf2数据校验分割
|
|||
|
|
|||
|
# 字节数据大于等于0xf0 之后要转义
|
|||
|
def _tran(data:bytearray):
|
|||
|
ret=bytearray()
|
|||
|
for item in data:
|
|||
|
if(item>=0xf0):
|
|||
|
ret.append(0xf0)
|
|||
|
ret.append(item-0xf0)
|
|||
|
else:
|
|||
|
ret.append(item)
|
|||
|
return ret
|
|||
|
|
|||
|
# 去转义
|
|||
|
def _untran(data:bytearray):
|
|||
|
ret=bytearray()
|
|||
|
i=0
|
|||
|
length=len(data)
|
|||
|
while i<length:
|
|||
|
if(data[i]>=0xf0):
|
|||
|
ret.append(data[i]+data[i+1])
|
|||
|
i+=1
|
|||
|
else:
|
|||
|
ret.append(data[i])
|
|||
|
i+=1
|
|||
|
return ret
|
|||
|
|
|||
|
|
|||
|
# 以命令和数据编码
|
|||
|
def encode(cmd:bytearray,data:bytearray):
|
|||
|
crc=_crc16(cmd+data)
|
|||
|
ret=bytearray()
|
|||
|
ret.append(0xff)
|
|||
|
ret+=_tran(cmd)
|
|||
|
ret.append(0xf1)
|
|||
|
ret+=_tran(data)
|
|||
|
ret.append(0xf2)
|
|||
|
ret+=_tran(crc)
|
|||
|
ret.append(0xfe)
|
|||
|
return ret
|
|||
|
|
|||
|
|
|||
|
# 解码为命令和数据
|
|||
|
def decode(data:bytearray):
|
|||
|
# print('prot_decode:',data.hex(' '))
|
|||
|
while len(data)>0:
|
|||
|
if(data[0]!=0xff):
|
|||
|
data=data[1:]
|
|||
|
else:
|
|||
|
break
|
|||
|
if(len(data)==0):
|
|||
|
return (None,None)
|
|||
|
if(data[0]==0xff and data[-1]==0xfe):
|
|||
|
data=data[1:-1]
|
|||
|
cmd_index=data.find(0xf1)
|
|||
|
data_index=data.find(0xf2)
|
|||
|
cmd=_untran(data[:cmd_index])
|
|||
|
da=_untran(data[cmd_index+1:data_index])
|
|||
|
crc=_untran(data[data_index+1:])
|
|||
|
if(crc==_crc16(cmd+da)):
|
|||
|
return (cmd,da)
|
|||
|
return (None,None)
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
data=encode(json.dumps({'device':'client','option':'data'}).encode('utf-8'),bytearray([0xff,0xfd,0xff,0xde,0xde]))
|
|||
|
print(data.hex(' '))
|
|||
|
print(decode(data))
|
|||
|
print(b'\xff'.hex(' '))
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|