添加执行任务命令生成和解析

This commit is contained in:
ranchuan
2023-12-25 18:47:54 +08:00
parent f354a8125d
commit ff87d33675
4 changed files with 316 additions and 3 deletions

302
updata/prot_table.py Normal file
View File

@@ -0,0 +1,302 @@
import prottcp as pt
# task_id,errcode,par_count,ret_count
_g_ew_task_table=[
("电源准备",0,1,10,0),
("延时等待",1,1,2,1),
("测量桥丝阻值",2,4,1,1),
("上电充能",3,2,3,2),
("设置总线电压",4,1,1,1),
("获取总线电流",5,1,0,2),
("使能MTP",6,5,1,0),
("扫描UID",7,5,3,8),
("写配置参数",8,5,7,0),
("检测配置",9,5,7,0),
("验证配置",10,5,4,0),
("模拟注码",11,5,0,0),
("验证模拟注码",12,5,0,0),
("芯片自检",13,5,2,1),
("状态检测",14,5,2,2),
("充电",15,5,2,0),
("放电",16,5,0,0),
("写延时",17,5,1,0),
("读延时",18,5,0,1),
("时钟校准",19,5,2,1),
("设置通信地址",20,5,3,1),
("状态巡检",21,5,3,1),
("起爆使能",22,5,0,0),
("起爆检测",23,5,3,1),
("缓存数据写入MTP",24,5,3,0),
("验证缓存数据",25,5,3,0),
("关总线",26,1,0,0),
("升级",27,5,4,1),
("充能统计",28,2,6,4),
("计算异常",29,0,10,0),
("重新执行任务",30,0,1,0),
("写入三码数据",31,5,0,0),
("验证三码数据",32,5,0,0),
("配置三码数据到小板",33,1,10,0),
("任务插槽",34,0,1,0),
("读MTP",35,5,2,8),
("写MTP",36,5,10,0),
("设置电阻校准值",37,1,1,0),
("读取流水号",38,5,2,2),
("写入流水号",39,5,2,0),
("写固定延时",40,5,1,0),
]
# task_id,errcode,par_count,ret_count
_g_jq_task_table=[
("电源准备",0,1,10,0),
("上电充能",1,2,3,2),
("设置总线电压",2,1,1,1),
("获取总线电流",3,1,0,2),
("扫描UID",4,5,2,8),
("密码验证",5,5,1,0),
("读取芯片代码",6,5,0,1),
("OTP检测",7,5,3,0),
("工厂测试检测",8,5,0,0),
("读取状态",9,5,1,0),
("写工厂信息",10,5,2,0),
("充能统计",11,5,5,4),
("充电电压检测",12,5,2,1),
("延时等待",13,1,2,1),
("写延时",14,5,1,0),
("读延时",15,5,0,1),
("时钟校准",16,5,2,1),
("放电",17,5,2,0),
("复位",18,5,0,0),
("起爆使能",19,5,0,0),
("起爆充能",20,5,3,3),
("使能通讯末电流采集",21,1,0,0),
("获取通讯末电流",22,1,0,1),
("写OTP",23,5,6,0),# opt_addr,len,[data]
("读OTP",24,5,2,10),# 这个任务根据参数1得到实际读取长度
("清除起爆计数",25,5,0,0),
("关总线",26,1,0,0),
("将缓存区数据写入OTP",27,5,3,0),
("写入三码数据",28,5,0,0),
("验证三码数据",29,5,0,0),
("测量电容压差",30,2,1,1),
("测量桥丝阻值",31,4,1,1),
("使能OTP写",32,5,0,0),
("写模块版本",33,5,1,0),
("读取版本号",34,5,0,1),
("写缓存数据",35,5,3,0),
("验证缓存数据",36,5,3,0),
("验证延时时间",37,5,0,0),
("切换总线极性",38,5,2,0),
("计算异常",39,0,10,0),
("重新执行任务",40,0,1,0),
("配置三码数据到小板",41,1,10,0),
("任务插槽",42,1,1,0),
("设置电阻校准值",43,1,1,0),
("写入流水号",44,5,2,0),
("读取流水号",45,5,2,2),
("写固定延时",46,5,1,0),
]
# task_id,errcode,par_count,ret_count
_g_ext_task_table=[
("计算异常",100,0,10,0),
("重新执行任务",101,0,1,0),
("配置三码数据到小板",102,1,10,0),
("任务插槽",103,1,1,0),
("设置电阻校准值",104,1,1,0),
]
# 组装和解析应用层协议
# 填充指定个数的byte
def arr_byte_copy(byte:int,num:int):
t=bytearray()
for i in range(num):
t.append(byte)
return t
# 根据字符找到任务,失败返回nullptr
def coder_find_task(name:str,chip:int):
table=None
print("find task in ext_table.")
table=_g_ext_task_table
for i in table:
if(i[0]==name):
return i
table=None
if(chip==0):
table=_g_jq_task_table
print("find task in jq_table.")
elif(chip==2):
table=_g_ew_task_table
print("find task in ew_table.")
if(table is None):
return None
for i in table:
if(i[0]==name):
print("find task index=%d.",i)
return i
return None
# 添加一个任务,返回0添加成功
def coder_add_task(d:bytearray,chip:int,name:str,slot_index:int,task_index:int,params:list):
task=None
task=coder_find_task(name,chip)
if(task is None):
print(f"can not find task with {name}")
return -1
else:
# task_id,errcode,par_count,ret_count
print(f"task_name:{name},task_id={task[1]},par_count={task[3]},ret_count={task[4]},err={task[2]}")
d.append(0xff&(slot_index))
d.append(0xff&(task[1]))
d.append(0xff&(task[3]))
d.append(0xff&(task[4]))
d.append(0xff&(task[2]))
d.append(0xff&(task_index))
if(task[3]>0):
for i in params:
d.append(i&0xff)
d.append((i>>8)&0xff)
return 0
# 包装为一个小板的数据,返回0成功
# 包装前需要先添加任务
def coder_slave_pack(d:bytearray,addr:int):
d_len=len(d)
d.insert(0,0xff&(d_len))
d.insert(0,0xff&(addr))
return 0
# 找到指定任务序号的数据
def coder_slave_find_ret_data(data:bytearray,task_index:int,size:int):
r=bytearray()
for i in range(len(data)):
if(data[i]==task_index):
r=data[i+2,data[i+1]+i+2]
break
i+=2+data[i+1]
if(len(r)>=size):
r=r[:size]
else:
r_len=size-len(r)
r+=arr_byte_copy(0,r_len)
return r
# 添加分包信息
def coder_pack(d:bytearray):
d_len=len(d)
if(d_len>236):
return None
r=bytearray()
r.append(d_len)
r.append(0)
r.append(0)
r.append(d_len)
r+=d
return r
# 运行检测并读取工厂信息任务
def coder_check_read_fatinfo(chip:int):
d=bytearray()
if(chip==0):
coder_add_task(d,chip,"读OTP",1,1,[20,4])
elif(chip==2):
coder_add_task(d,chip,"读MTP",1,1,[9,4])
coder_add_task(d,chip,"读延时",1,2,[])
coder_slave_pack(d,0x1f)
# 检测模式1,返回异常代码和任务插槽数据
d.insert(0,1)
return coder_pack(d)
# 解析读取到的工厂信息
def coder_check_decode_fatinfo(data:bytearray):
r=bytearray()
data=data[1:]
sret=coder_slave_find_ret_data(data[1:],1,4)
if(len(sret)==4):
sn=sret[0]|(sret[1]<<8)|(sret[2]<<16)|(sret[3]<<24)
c_type=sn&0x1f
sn=sn>>5
print(f"模块类型:{c_type},流水号:{sn}")
sret=coder_slave_find_ret_data(data[1:],2,2)
if(len(sret)==2):
delay=sret[0]|(sret[1]<<8)
print(f"延时:{delay}")
# 运行检测并写入流水号
def coder_check_write_sn(chip:int,sn:int):
d=bytearray()
params=[]
params.append(sn&0xffff)
params.append((sn>>16)&0xffff)
coder_add_task(d,chip,"写入流水号",1,1,params)
coder_add_task(d,chip,"读取流水号",2,2,params)
coder_slave_pack(d,0x1f)
# 检测模式1,返回异常代码和任务插槽数据
d.insert(0,1)
return coder_pack(d)
# 解析读取到的流水号
def coder_check_decode_sn(data:bytearray):
r=bytearray()
data=data[1:]
sret=coder_slave_find_ret_data(data[1:],2,4)
if(len(sret)==4):
sn=sret[0]|(sret[1]<<8)|(sret[2]<<16)|(sret[3]<<24)
print(f"流水号:{sn}")
if __name__ == "__main__":
u=pt.protu()
# # 检测并写入流水号
# data=coder_check_write_sn(2,12345)
# 检测并读取工厂信息和延时
data=coder_check_read_fatinfo(2)
u.cmd=0x21
data=u.encode(data)
print(data.hex(' '))

View File

@@ -132,6 +132,7 @@ def scheme_to_byte(j:json):
t+=scheme_taskids_to_byte(j)
t+=scheme_tasks_to_byte(j)
crc=crc32(t)
print("scheme crc32=",hex(crc))
t.append(crc&0xff)
t.append((crc>>8)&0xff)
t.append((crc>>16)&0xff)