Files
tcp_port_tran/prot_codec.py

115 lines
2.0 KiB
Python
Raw Permalink Normal View History

import os
import sys
import json
# 数据包编解码
# 计算整个数据流的CRC
def _crc16(data:bytearray):
lenth=len(data)
if(lenth>0):
crc=0xffff
for i in range(lenth):
crc=(crc^data[i])&0xffff
for j in range(8):
if(crc&1)!=0:
crc=((crc>>1)^0xa001)&0xffff
else:
crc=(crc>>1)&0xffff
return bytearray([crc&0xff,(crc>>8)&0xff])
return bytearray([0,0])
# 0xff帧起始0xfe帧结尾
# 0xf0转义标识
# 0xf1命令数据分割
# 0xf2数据校验分割
# 字节数据大于等于0xf0 之后要转义
def _tran(data:bytearray):
ret=bytearray()
for item in data:
if(item>=0xf0):
ret.append(0xf0)
ret.append(item-0xf0)
else:
ret.append(item)
return ret
# 去转义
def _untran(data:bytearray):
ret=bytearray()
i=0
length=len(data)
while i<length:
if(data[i]>=0xf0):
ret.append(data[i]+data[i+1])
i+=1
else:
ret.append(data[i])
i+=1
return ret
# 以命令和数据编码
def encode(cmd:bytearray,data:bytearray):
crc=_crc16(cmd+data)
ret=bytearray()
ret.append(0xff)
ret+=_tran(cmd)
ret.append(0xf1)
ret+=_tran(data)
ret.append(0xf2)
ret+=_tran(crc)
ret.append(0xfe)
return ret
# 解码为命令和数据
def decode(data:bytearray):
# print('prot_decode:',data.hex(' '))
while len(data)>0:
if(data[0]!=0xff):
data=data[1:]
else:
break
if(len(data)==0):
return (None,None)
if(data[0]==0xff and data[-1]==0xfe):
data=data[1:-1]
cmd_index=data.find(0xf1)
data_index=data.find(0xf2)
cmd=_untran(data[:cmd_index])
da=_untran(data[cmd_index+1:data_index])
crc=_untran(data[data_index+1:])
if(crc==_crc16(cmd+da)):
return (cmd,da)
return (None,None)
if __name__ == "__main__":
data=encode(json.dumps({'device':'client','option':'data'}).encode('utf-8'),bytearray([0xff,0xfd,0xff,0xde,0xde]))
print(data.hex(' '))
print(decode(data))
print(b'\xff'.hex(' '))