仓库迁移

This commit is contained in:
andy
2023-06-10 11:52:00 +08:00
parent e4616bfae5
commit 077c27f2bb
499 changed files with 199745 additions and 92 deletions

View File

@@ -0,0 +1,352 @@
{
"PlanID": 46534786,
"PlanBrief": "模拟工厂注码",
"CheckSoftVersion": [
1
],
"CheckHardVersion": [
1
],
"TimeOutS": 4,
"TimeOutM": 10,
"TimeOutUI": 12,
"TaskIDMax": 26,
"TaskArray": [
{
"TaskID": 0,
"TaskBrief": "电源准备",
"TaskIndex": 0,
"ParamCount": 1,
"ParamInfo": [
"预设电压1"
],
"ParamVal": [
88
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 1,
"TaskBrief": "上电充能",
"TaskIndex": 1,
"ParamCount": 3,
"ParamInfo": [
"总线电压",
"采样超时0.1ms",
"计时启停ADC"
],
"ParamVal": [
88,
5000,
400
],
"TestStandard": [
{
"Max": 95,
"Min": 80
},
{
"Max": 600,
"Min": 100
}
],
"ReturnCount": 2,
"ReturnInfo": [
"总线电压",
"大于启停的时间"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [
23,
22
],
"RetryCount": 0
},
{
"TaskID": 5,
"TaskBrief": "写配置参数",
"TaskIndex": 2,
"ParamCount": 4,
"ParamInfo": [
"UID长度",
"密码长度",
"电流挡位",
"版本号"
],
"ParamVal": [
8,
4,
0,
1
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 32,
"TaskBrief": "加载配置",
"TaskIndex": 3,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [
{
"Max": 8,
"Min": 8
},
{
"Max": 4,
"Min": 4
},
{
"Max": 1,
"Min": 1
}
],
"ReturnCount": 3,
"ReturnInfo": [
"UID长度",
"密码长度",
"版本号"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [
28,
28,
28,
255
],
"RetryCount": 0
},
{
"TaskID": 4,
"TaskBrief": "扫描UID",
"TaskIndex": 4,
"ParamCount": 2,
"ParamInfo": [
"UID长度",
"使1/失0UID对比"
],
"ParamVal": [
0,
0
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 24,
"TaskBrief": "状态码检测",
"TaskIndex": 5,
"ParamCount": 2,
"ParamInfo": [
"比较掩码",
"比较位"
],
"ParamVal": [
128,
0
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 11,
"TaskBrief": "桥丝检测",
"TaskIndex": 6,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [
{
"Max": 240,
"Min": 0
}
],
"ReturnCount": 1,
"ReturnInfo": [
"桥丝ADC"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [
30
],
"RetryCount": 0
},
{
"TaskID": 8,
"TaskBrief": "充能统计",
"TaskIndex": 7,
"ParamCount": 5,
"ParamInfo": [
"充电挡位",
"统计超时",
"电流判线AD",
"充电结束值0.1uA",
"充电超时0.1S"
],
"ParamVal": [
34,
5000,
100,
600,
5
],
"TestStandard": [
{
"Max": 3500,
"Min": 1000
},
{
"Max": 600,
"Min": 200
},
{
"Max": 80,
"Min": 10
}
],
"ReturnCount": 3,
"ReturnInfo": [
"充能值0.1ms",
"充末电流0.1uA",
"最大充电电流0.1mA"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [
34,
35,
32,
255
],
"RetryCount": 0
},
{
"TaskID": 7,
"TaskBrief": "模拟注码",
"TaskIndex": 8,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 40,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 26,
"TaskBrief": "复位",
"TaskIndex": 9,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 45,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 4,
"TaskBrief": "扫描UID",
"TaskIndex": 10,
"ParamCount": 2,
"ParamInfo": [
"UID长度",
"使1/失0UID对比"
],
"ParamVal": [
0,
1
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 31,
"TaskBrief": "密码验证",
"TaskIndex": 11,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 24,
"TaskBrief": "状态码检测",
"TaskIndex": 12,
"ParamCount": 2,
"ParamInfo": [
"比较掩码",
"比较位"
],
"ParamVal": [
32,
32
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
},
{
"TaskID": 29,
"TaskBrief": "赋码设备使能",
"TaskIndex": 13,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 0
}
]
}

BIN
python/file/Ym_Checker.bin Normal file

Binary file not shown.

BIN
python/file/Ym_Coder.bin Normal file

Binary file not shown.

View File

@@ -0,0 +1,928 @@
{
"PlanID": 15,
"PlanBrief": "电解电容雷管厂高压检测",
"CheckSoftVersion": [
6,
7,
8
],
"CheckHardVersion": [
0,
65535
],
"TimeOutS": 34,
"TimeOutM": 40,
"TimeOutUI": 42,
"TaskIDMax": 26,
"MajorErrInfo": [
{
"Info": "执行成功",
"MajorErrCode": 0,
"SubErrCode": []
},
{
"Info": "检测器异常",
"MajorErrCode": 1,
"SubErrCode": [
21,
23,
254
]
},
{
"Info": "主电容异常",
"MajorErrCode": 2,
"SubErrCode": [
30,
31,
32,
33,
34,
35,
42
]
},
{
"Info": "接触异常",
"MajorErrCode": 3,
"SubErrCode": [
27
]
},
{
"Info": "桥丝阻值异常",
"MajorErrCode": 4,
"SubErrCode": [
30,
41,
42
]
},
{
"Info": "芯片异常",
"MajorErrCode": 5,
"SubErrCode": [
22,
24,
25,
26,
27,
28,
29,
30,
32,
33,
35,
36,
37,
38,
39,
40,
41,
42,
43,
44,
45
]
},
{
"Info": "其他异常",
"MajorErrCode": 20,
"SubErrCode": [
255
]
},
{
"Info": "通信失败",
"MajorErrCode": 208,
"SubErrCode": [
]
},
{
"Info": "脚本执行失败",
"MajorErrCode": 209,
"SubErrCode": [
]
},
{
"Info": "数据长度与方案不符",
"MajorErrCode": 210,
"SubErrCode": [
]
},
{
"Info": "方案ID不符",
"MajorErrCode": 211,
"SubErrCode": [
]
},
{
"Info": "检测项目不足,无法判定",
"MajorErrCode": 212,
"SubErrCode": [
]
},
{
"Info": "数据不合规",
"MajorErrCode": 213,
"SubErrCode": [
]
}
],
"SubErrInfo": [
{
"Info": "预设电压异常",
"ErrCode": 21
},
{
"Info": "上电充能错误",
"ErrCode": 22
},
{
"Info": "电压设置异常",
"ErrCode": 23
},
{
"Info": "UID扫描错误",
"ErrCode": 24
},
{
"Info": "OTP错误",
"ErrCode": 25
},
{
"Info": "通讯电流错误",
"ErrCode": 26
},
{
"Info": "基本电流错误",
"ErrCode": 27
},
{
"Info": "读取芯片ID错误",
"ErrCode": 28
},
{
"Info": "密码验证错误",
"ErrCode": 29
},
{
"Info": "DAC比较错误",
"ErrCode": 30
},
{
"Info": "高压充能错误",
"ErrCode": 31
},
{
"Info": "充电电流错误",
"ErrCode": 32
},
{
"Info": "高压充末电流错误",
"ErrCode": 33
},
{
"Info": "低压充电能量错误",
"ErrCode": 34
},
{
"Info": "低压充末电流错误",
"ErrCode": 35
},
{
"Info": "写延时错误",
"ErrCode": 36
},
{
"Info": "读延时错误",
"ErrCode": 37
},
{
"Info": "芯片程测错误",
"ErrCode": 38
},
{
"Info": "时钟校准错误",
"ErrCode": 39
},
{
"Info": "写工厂信息错误",
"ErrCode": 40
},
{
"Info": "模拟起爆错误",
"ErrCode": 41
},
{
"Info": "芯片状态码异常",
"ErrCode": 42
},
{
"Info": "通信电流错误",
"ErrCode": 43
},
{
"Info": "反码时长错误",
"ErrCode": 44
},
{
"Info": "复位放电错误",
"ErrCode": 45
},
{
"Info": "检测器执行异常",
"ErrCode": 254
},
{
"Info": "返回参数判断",
"ErrCode": 255
}
],
"TaskArray": [
{
"TaskID": 0,
"TaskBrief": "电源准备",
"TaskIndex": 0,
"ParamCount": 7,
"ParamInfo": [
"预设电压1",
"预设电压2",
"预设电压3",
"预设电压4",
"预设电压5",
"预设电压6",
"预设电压7"
],
"ParamVal": [
65,
80,
85,
175,
205,
215,
255
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 21,
"ResultErrCode": [],
"RetryCount": 1
},
{
"TaskID": 1,
"TaskBrief": "上电充能",
"TaskIndex": 1,
"ParamCount": 2,
"ParamInfo": [
"总线电压",
"计时启停ADC"
],
"ParamVal": [
80,
2500
],
"TestStandard": [
{
"Max": 85,
"Min": 75
},
{
"Max": 900,
"Min": 150
}
],
"ReturnCount": 2,
"ReturnInfo": [
"总线电压",
"大于启停的时间"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 22,
"ResultErrCode": [
23,
22
],
"RetryCount": 1
},
{
"TaskID": 4,
"TaskBrief": "扫描UID",
"TaskIndex": 2,
"ParamCount": 2,
"ParamInfo": [
"使1/失0反码采集",
"使0/失1UID全0验证"
],
"ParamVal": [
1,
0
],
"TestStandard": [
{
"Max": 90,
"Min": 40
},
{
"Max": 400,
"Min": 250
}
],
"ReturnCount": 2,
"ReturnInfo": [
"最大反馈电流",
"最大反馈时间"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 24,
"ResultErrCode": [
43,
42,
255,
255,
255,
255,
255,
255
],
"RetryCount": 1
},
{
"TaskID": 3,
"TaskBrief": "获取总线电流",
"TaskIndex": 3,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [
{
"Max": 260,
"Min": 120
},
{
"Max": 260,
"Min": 120
}
],
"ReturnCount": 2,
"ReturnInfo": [
"总线正电流",
"总线反电流"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 27,
"ResultErrCode": [
27,
27
],
"RetryCount": 0
},
{
"TaskID": 6,
"TaskBrief": "读取芯片代码",
"TaskIndex": 4,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 26,
"ExecuteErrCode": 28,
"ResultErrCode": [
28
],
"RetryCount": 1
},
{
"TaskID": 2,
"TaskBrief": "设置总线电压",
"TaskIndex": 5,
"ParamCount": 1,
"ParamInfo": [
"总线电压"
],
"ParamVal": [
205
],
"TestStandard": [
{
"Max": 210,
"Min": 200
}
],
"ReturnCount": 1,
"ReturnInfo": [
"总线电压"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 23,
"ResultErrCode": [
23
],
"RetryCount": 1
},
{
"TaskID": 3,
"TaskBrief": "获取总线电流",
"TaskIndex": 6,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [
{
"Max": 270,
"Min": 120
},
{
"Max": 270,
"Min": 120
}
],
"ReturnCount": 2,
"ReturnInfo": [
"总线正电流",
"总线反电流"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 27,
"ResultErrCode": [
27,
27
],
"RetryCount": 0
},
{
"TaskID": 11,
"TaskBrief": "充能统计",
"TaskIndex": 7,
"ParamCount": 4,
"ParamInfo": [
"电流判线AD",
"充电结束值0.1uA",
"充电超时0.1S",
"电流监控0.1S"
],
"ParamVal": [
1000,
500,
60,
20
],
"TestStandard": [
{
"Max": 20000,
"Min": 13000
},
{
"Max": 600,
"Min": 120
}
],
"ReturnCount": 2,
"ReturnInfo": [
"充能值0.1ms",
"充末电流0.1uA"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 31,
"ResultErrCode": [
31,
32,
255,
255
],
"RetryCount": 1
},
{
"TaskID": 12,
"TaskBrief": "充电电压检测",
"TaskIndex": 8,
"ParamCount": 2,
"ParamInfo": [
"起始DAC",
"结束DAC"
],
"ParamVal": [
31,
29
],
"TestStandard": [
{
"Max": 31,
"Min": 29
}
],
"ReturnCount": 1,
"ReturnInfo": [
"扫描通过的DAC"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 30,
"ResultErrCode": [
30
],
"RetryCount": 1
},
{
"TaskID": 2,
"TaskBrief": "设置总线电压",
"TaskIndex": 9,
"ParamCount": 1,
"ParamInfo": [
"总线电压"
],
"ParamVal": [
65
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 23,
"ResultErrCode": [
255
],
"RetryCount": 1
},
{
"TaskID": 13,
"TaskBrief": "延时等待",
"TaskIndex": 10,
"ParamCount": 1,
"ParamInfo": [
"延时时间"
],
"ParamVal": [
30
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [
255
],
"RetryCount": 1
},
{
"TaskID": 12,
"TaskBrief": "充电电压检测",
"TaskIndex": 11,
"ParamCount": 2,
"ParamInfo": [
"起始DAC",
"结束DAC"
],
"ParamVal": [
31,
28
],
"TestStandard": [
{
"Max": 31,
"Min": 28
}
],
"ReturnCount": 1,
"ReturnInfo": [
"扫描通过的DAC"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 30,
"ResultErrCode": [
30
],
"RetryCount": 1
},
{
"TaskID": 18,
"TaskBrief": "复位放电",
"TaskIndex": 12,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 45,
"ResultErrCode": [],
"RetryCount": 2
},
{
"TaskID": 13,
"TaskBrief": "延时等待",
"TaskIndex": 13,
"ParamCount": 1,
"ParamInfo": [
"延时时间"
],
"ParamVal": [
80
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [
255
],
"RetryCount": 1
},
{
"TaskID": 4,
"TaskBrief": "扫描UID",
"TaskIndex": 14,
"ParamCount": 2,
"ParamInfo": [
"使1/失0反码采集",
"使0/失1UID全0验证"
],
"ParamVal": [
0,
0
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 24,
"ResultErrCode": [
255,
255,
255,
255,
255,
255,
255,
255
],
"RetryCount": 1
},
{
"TaskID": 11,
"TaskBrief": "充能统计",
"TaskIndex": 15,
"ParamCount": 4,
"ParamInfo": [
"电流判线AD",
"充电结束值0.1uA",
"充电超时0.1S",
"电流监控0.1S"
],
"ParamVal": [
800,
400,
40,
0
],
"TestStandard": [
{
"Max": 4500,
"Min": 2500
},
{
"Max": 450,
"Min": 120
}
],
"ReturnCount": 2,
"ReturnInfo": [
"充能值0.1ms",
"充末电流0.1uA"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 34,
"ResultErrCode": [
34,
35,
255,
255
],
"RetryCount": 1
},
{
"TaskID": 5,
"TaskBrief": "密码验证",
"TaskIndex": 16,
"ParamCount": 1,
"ParamInfo": [
"0码/1原码验证"
],
"ParamVal": [
0
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 29,
"ResultErrCode": [],
"RetryCount": 1
},
{
"TaskID": 7,
"TaskBrief": "OTP检测",
"TaskIndex": 17,
"ParamCount": 3,
"ParamInfo": [
"1关/0检UID密码为0",
"1关/0检延时保留区为0",
"1关/0检用户区为0"
],
"ParamVal": [
0,
0,
0
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 25,
"ResultErrCode": [],
"RetryCount": 1
},
{
"TaskID": 14,
"TaskBrief": "写延时",
"TaskIndex": 18,
"ParamCount": 1,
"ParamInfo": [
"延时"
],
"ParamVal": [
20
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 36,
"ResultErrCode": [],
"RetryCount": 1
},
{
"TaskID": 15,
"TaskBrief": "读延时",
"TaskIndex": 19,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 37,
"ResultErrCode": [
255
],
"RetryCount": 1
},
{
"TaskID": 16,
"TaskBrief": "时钟校准",
"TaskIndex": 20,
"ParamCount": 2,
"ParamInfo": [
"脉冲周期",
"脉冲个数"
],
"ParamVal": [
1000,
25
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 39,
"ResultErrCode": [],
"RetryCount": 1
},
{
"TaskID": 12,
"TaskBrief": "充电电压检测",
"TaskIndex": 21,
"ParamCount": 2,
"ParamInfo": [
"起始DAC",
"结束DAC"
],
"ParamVal": [
3,
0
],
"TestStandard": [
{
"Max": 3,
"Min": 0
}
],
"ReturnCount": 1,
"ReturnInfo": [
"扫描通过的DAC"
],
"ErrJumpTo": 255,
"ExecuteErrCode": 30,
"ResultErrCode": [
34
],
"RetryCount": 1
},
{
"TaskID": 19,
"TaskBrief": "起爆使能",
"TaskIndex": 22,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 1
},
{
"TaskID": 9,
"TaskBrief": "读芯片状态",
"TaskIndex": 23,
"ParamCount": 1,
"ParamInfo": [
"状态值掩码"
],
"ParamVal": [
254
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 42,
"ResultErrCode": [],
"RetryCount": 10
},
{
"TaskID": 20,
"TaskBrief": "起爆充能",
"TaskIndex": 24,
"ParamCount": 2,
"ParamInfo": [
"电流判线AD",
"起爆脉冲个数"
],
"ParamVal": [
1000,
400
],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 41,
"ResultErrCode": [
255,
255,
255
],
"RetryCount": 1
},
{
"TaskID": 18,
"TaskBrief": "复位放电",
"TaskIndex": 25,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 45,
"ResultErrCode": [],
"RetryCount": 2
},
{
"TaskID": 26,
"TaskBrief": "关总线",
"TaskIndex": 26,
"ParamCount": 0,
"ParamInfo": [],
"ParamVal": [],
"TestStandard": [],
"ReturnCount": 0,
"ReturnInfo": [],
"ErrJumpTo": 255,
"ExecuteErrCode": 254,
"ResultErrCode": [],
"RetryCount": 1
}
]
}

632
python/file/judge.lua Normal file
View File

@@ -0,0 +1,632 @@
---@diagnostic disable: lowercase-global, undefined-global
-- 适配的方案id
PLAN_ID=15
-- 更新日志
-- 2023.4.13
-- 检测流程编写完成
-- 2023.4.14
-- 发现未接触时检测电流有可能为1此时会报芯片异常
-- 把接触异常电流<1改为<5
-- 2023.4.20
-- 打印出出错的任务代码
-- 2023.4.23
-- 调试代码解决异常代码不对的问题
-- 电压无法上升改为检测器异常
-- 修改因脚本产生的错误代码从0xd2(210)开始编码
-- "数据长度与方案不符",210
-- "方案ID不符",211
-- "检测项目不足,无法判定",212
-- "数据不合规",213
-- 方案配置文件:
-- cfg_name
-- 检测数据
-- check_data
-- 当前路径
-- get_path()
-- 打印表
-- prints:print_t()
-- json解码
-- json:decode()
-- 读取方案配置信息
check_cfg_file=io.open(cfg_name,"r")
check_cfg_str=check_cfg_file:read("*a")
check_cfg_file:close()
json_table = json:decode(check_cfg_str)
-- 表长度
function len(t)
local res=0
if(t==nil)then return 0 end
for k,v in pairs(t) do
res=res+1
end
return res
end
-- 截取数组
function mid(array,start,length)
if(len(array)<(start+length-1)) then
print("mid:out of range",start,length)
return nil
end
return table.pack(table.unpack(array,start,start+length-1))
end
-- 把n插入t中不允许重复
function insert(t,n)
for k,v in pairs(t) do
if(v==n) then
return
end
end
table.insert(t,n)
end
-- 删除t中的值n
function del(t,n)
for k,v in pairs(t) do
if(v==n) then
table.remove(t,k)
break
end
end
end
-- 删除t中除n之外的所有值
function only(t,n)
for k,v in pairs(t) do
if(v~=n) then
-- table.remove(t,k)
t[k]=nil
end
end
end
-- 8个字节描述执行结果0成功1失败
function find_ack(index,acks)
if(index>len(acks)*8) or index<1 then
print("index out of range.index=",index)
return false
end
if ((acks[(index-1)//8+1] & (1<<((index-1)%8)))~=0) then
return false
else
return true
end
end
-- 8个字节描述执行与否1执行0未执行
function find_stat(index,stats)
if(index>len(stats)*8) or index<1 then
print("index out of range.index=",index)
return false
end
if ((stats[(index-1)//8+1] & (1<<((index-1)%8)))~=0) then
return true
else
return false
end
end
-- 展示任务回复0成功1失败
function show_acks(acks)
-- prints:print_a(acks)
print("执行结果:(0:ok;1:err)")
local str=""
for i=1,len(json_table["TaskArray"]),1 do
local ack=find_ack(i,acks)
if ack==true then
str=str.."0, "
else
str=str.."1, "
end
end
print(str)
end
-- 展示任务执行状态0未执行1已执行
function show_stats(stats)
-- prints:print_a(stats)
print("执行状态:(0:skiped;1:executed)")
local str=""
for i=1,len(json_table["TaskArray"]),1 do
local stat=find_stat(i,stats)
if stat==true then
str=str.."1, "
else
str=str.."0, "
end
end
print(str)
end
-- 统计指定index之前应该跳过的数据长度和当前index的返回长度
function calc_skip(index,tasks)
local len=0
-- for i=1,table.getn(tasks),1 do
local i=1
while i<index do
-- for i=1,index,1 do
-- print("calc_skip,index=",index,"i=",i)
len=len+tasks[i]["ReturnCount"]
i=i+1
end
-- print("calc_skip,len=",len)
return len*2,tasks[index]["ReturnCount"]*2
end
-- 找到指定index的任务返回数和值,把两个字节合并为一个值
function find_return(index)
local data=check_data
local skip_len,len=calc_skip(index,json_table["TaskArray"])
local ret={}
local temp=mid(data,skip_len+1+16,len)
if(temp==nil) then return ret end
-- print("find_return,temp=")
-- prints:print_a(temp)
for i=1,len//2,1 do
ret[i]=temp[i*2-1]|(temp[i*2]<<8)
end
return ret
end
-- JQ_Test_PowerPrapare, //0 电源准备 1
-- JQ_Test_PowerOn, //1 上电充能 1,(3,充能值(时间)超上限(开路/短路)、总线电压超下限(短路))
-- JQ_Test_SetBusV, //2 设置总线电压 1
-- JQ_Test_BaseCur, //3 获取总线电流 1,3(总线电流偏大(漏流/短路)、总线电流为0(开路))
-- JQ_Test_ScanUID, //4 扫描UID 1,2,3,5(能扫描到uid即认为芯片正常)
-- JQ_Test_PWCheck, //5 密码验证
-- JQ_Test_ReadChipID, //6 读取芯片代码 1,2,3,5(能读到id即认为芯片正常)
-- JQ_Test_OTPCheck, //7 OTP检测
-- JQ_Test_FTCheck, //8 工厂测试检测
-- JQ_Test_ReadState, //9 读取状态
-- JQ_Test_WriteUserInfo, //10 写工厂信息
-- JQ_Test_ChgEnergy, //11 充能统计
-- JQ_Test_CheckDAC, //12 充电电压检测
-- JQ_Test_WaitDelay, //13 延时等待
-- JQ_Test_SetDelay, //14 设置延时
-- JQ_Test_ReadDelay, //15 读取延时
-- JQ_Test_ClkTrim, //16 时钟校准
-- JQ_Test_Discharge, //17 放电
-- JQ_Test_Reset, //18 复位
-- JQ_Test_BootEn, //19 起爆使能
-- JQ_Test_BoomEnergy, //20 起爆充能
-- JQ_Test_EnCommEndCur, //21 使能通讯末电流采集
-- JQ_Test_GetCommEndCur, //22 获取通讯末电流
-- JQ_Test_WriteOTP, //23 写OTP
-- JQ_Test_ReadOTP, //24 读OTP
-- JQ_Test_ClearBoom, //25 清除起爆计数
-- JQ_Test_PowerOff, //26 关总线
-- 根据任务id找到下一个任务
function find_index(index,task_id)
-- print("find_index:index=",index)
local tasks=json_table["TaskArray"]
-- prints:print_t(tasks)
for i=index,len(tasks),1 do
if(tasks[i]["TaskID"]==task_id) then
return tasks[i]
end
end
print("find_index err:index=",index,"task_id=",task_id)
return nil
end
-- 检测器异常,1
-- 主电容异常,2
-- 接触异常,3
-- 桥丝阻值异常,4
-- 芯片异常,5
-- 其他异常,6
-- 判断上电错误,返回下一个检测任务,nil停止检测
function JQ_Test_PowerPrapare(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 上电错误只能是检测器异常,检测结束
print("task failed.index=",task["TaskIndex"])
only(err_code,1)
return nil
else
-- 上电正常,排除检测器异常
del(err_code,1)
print("检测上电充能")
-- 下一步是上电充能
return find_index(task["TaskIndex"]+2,1)
end
end
-- 判断上电充能错误,返回下一个检测任务,nil停止检测
function JQ_Test_PowerOn(task,err_code)
local ret_value={}
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 上电充能异常
del(err_code,0)
print("task failed.index=",task["TaskIndex"])
-- 下一步检测电流
return find_index(task["TaskIndex"]+2,3)
else
-- 充能流程正常,返回第一个值是电压,第二个值是充能时间
ret_value=find_return(task["TaskIndex"]+1)
-- 电压无法上升,检测器异常
if(ret_value[1]<task["TestStandard"][1]["Min"]) then
only(err_code,1)
return nil
end
-- 充电时间过长,进一步检测电流,判断开路还是短路
if(ret_value[2]>39000) then
del(err_code,0)
print("检测电流")
return find_index(task["TaskIndex"]+2,3)
elseif(ret_value[2]<1) then
-- 新小板程序在开路时会返回0保险起见还是看一下电流
del(err_code,0)
print("检测电流")
return find_index(task["TaskIndex"]+2,3)
end
---- 排除接触异常
--del(err_code,3)
print("检测芯片UID")
-- 下一步检测芯片
return find_index(task["TaskIndex"]+2,4)
end
end
-- 扫描uid成功则芯片正常返回下一个检测任务,nil停止检测
function JQ_Test_ScanUID(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 通信失败,返回芯片异常,检测结束
print("task failed.task_index=",task["TaskIndex"])
only(err_code,5)
return nil
else
print("检测电流")
return find_index(task["TaskIndex"]+2,3)
--print("读取芯片代码")
--return find_index(task["TaskIndex"]+2,6)
end
end
-- 判断低压电流,返回下一个检测任务,nil停止检测
function JQ_Test_BaseCurLow(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 电流检测错误只能是检测器异常,检测结束
print("task failed.task_index=",task["TaskIndex"])
only(err_code,1)
return nil
else
-- 返回正向电流,反向电流
ret_value=find_return(task["TaskIndex"]+1)
print("电流:",ret_value[1])
-- 电流为0开路返回接触异常
if(ret_value[1]<10) and (ret_value[2]<10)then
only(err_code,3)
return nil
-- 电流过大,短路,返回芯片异常
elseif((ret_value[1]>task["TestStandard"][1]["Max"]) and (ret_value[1]<1500))
or ((ret_value[2]>task["TestStandard"][2]["Max"]) and (ret_value[2]<1500))
then
only(err_code,5)
return nil
elseif((ret_value[1]>1500) and (ret_value[2]>1500))then
only(err_code,3)
return nil
end
-- 排除接触异常
del(err_code,3)
--print("读取芯片UID")
---- 下一步检测芯片
--return find_index(task["TaskIndex"]+2,6)
print("读取芯片代码")
return find_index(task["TaskIndex"]+2,6)
end
end
-- 读取芯片代码,成功则芯片正常,返回下一个检测任务,nil停止检测
function JQ_Test_ReadChipID(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 通信失败,返回芯片异常,检测结束
print("task failed.task_index=",task["TaskIndex"])
only(err_code,5)
return nil
else
print("检测高压电流")
return find_index(task["TaskIndex"]+2,3)
end
end
-- 判断高压电流,返回下一个检测任务,nil停止检测
function JQ_Test_BaseCurHigh(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 电流检测错误只能是检测器异常,检测结束
print("task failed.task_index=",task["TaskIndex"])
only(err_code,1)
return nil
else
-- 返回正向电流,反向电流
ret_value=find_return(task["TaskIndex"]+1)
print("电流:",ret_value[1])
-- 电流为0开路返回接触异常
if(ret_value[1]<10) and (ret_value[2]<10)then
only(err_code,5)
return nil
-- 电流过大,短路,返回芯片异常
elseif((ret_value[1]>task["TestStandard"][1]["Max"]) and (ret_value[1]<1500))
or ((ret_value[2]>task["TestStandard"][2]["Max"]) and (ret_value[2]<1500))
then
only(err_code,5)
return nil
elseif((ret_value[1]>1500) or (ret_value[2]>1500))then
only(err_code,5)
return nil
end
-- 排除接触异常
del(err_code,3)
print("检测桥丝和电容")
-- 下一步检测桥丝和电容
return find_index(task["TaskIndex"]+2,12)
end
end
-- 检测电容电压,返回下一个检测任务,nil停止检测
function JQ_Test_CheckDAC(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 检测电压失败则桥丝或主电容异常
print("task failed.task_index=",task["TaskIndex"])
-- 这里判断上电充能值,如果偏大则桥丝异常,偏小则电容异常
-- 充能流程正常,返回第一个值是电压,第二个值是充能时间
-- local task_p=find_index(1,1)
-- ret_value=find_return(task_p["TaskIndex"]+1)
-- if(ret_value[2]>39999) then
-- only(err_code,4)
-- return nil
-- else
-- only(err_code,2)
-- return nil
-- end
only(err_code,4)
return nil
else
-- 排除桥丝异常
del(err_code,4)
print("检测电容")
return find_index(task["TaskIndex"]+2,12)
end
end
-- 检测电容电压,返回下一个检测任务,nil停止检测
function JQ_Test_CheckDAC2(task,err_code)
if(find_ack(task["TaskIndex"]+1,mid(check_data,1,8))==false)then
-- 检测电压失败则主电容异常
print("task failed.task_index=",task["TaskIndex"])
only(err_code,2)
return nil
else
-- 排除电容异常
del(err_code,2)
del(err_code,5)
print("检测完毕")
return nil
end
end
-- 定义占位函数,不应该调用
function JQ_Test_Empty(task,err_code)
print("err function call,task index=",task["TaskIndex"])
insert(err_code,6)
return nil
end
-- 这个数组根据方案检测顺序确定
tasks_judge_fun={
JQ_Test_PowerPrapare,
JQ_Test_PowerOn,
JQ_Test_ScanUID,
JQ_Test_BaseCurLow,
JQ_Test_ReadChipID,
JQ_Test_Empty,
JQ_Test_BaseCurHigh,
JQ_Test_Empty,
JQ_Test_CheckDAC,
JQ_Test_Empty,
JQ_Test_Empty,
JQ_Test_CheckDAC2
}
-- 根据任务依次判断
function judge()
local fun_index=1
local err_code={0,1,2,3,4,5}
-- 找到上电任务
local task=find_index(1,0)
while true do
if(task==nil)then
if(len(err_code)>1)then
-- 如果找不到任务,则应该是方案错误,这里返回其他异常
print("can not find task,fun_index=",fun_index)
insert(err_code,6)
end
break
end
if(find_stat(task["TaskIndex"]+1,mid(check_data,9,8))==false)then
-- 如果任务未执行,则应该是方案错误,这里返回其他异常
print("task unexecuted.index=",task["TaskIndex"])
insert(err_code,6)
break
end
fun_index=task["TaskIndex"]+1
task=tasks_judge_fun[fun_index](task,err_code)
end
return err_code
end
-- 判断一个任务的子错误
function judge_task(task,data)
local range=task["TestStandard"]
local ret={}
-- print("judge task")
if range~=nil then
-- print("judge task")
for i=1,len(range),1 do
if(data[i]<range[i]["Min"]) or (data[i]>range[i]["Max"]) then
-- print("task=",task["TaskBrief"],"out of range:")
-- print("err code=",task["ResultErrCode"][i])
table.insert(ret,task["ResultErrCode"][i])
end
end
end
return ret
end
-- 依次判断主错误
function judge_list()
local acks=mid(check_data,1,8)
local stats=mid(check_data,9,8)
for i=1,len(json_table["TaskArray"]),1 do
local stat=find_stat(i,stats)
if stat==true then
local ack=find_ack(i,acks)
local task_p=json_table["TaskArray"][i]
local ret_value=find_return(i)
local err_code=judge_task(task_p,ret_value)
if ack==false then
table.insert(err_code,task_p["ExecuteErrCode"])
end
if len(err_code)>0 then
print("task:",task_p["TaskBrief"],"err_code:")
prints:print_a(err_code)
end
end
end
end
-- 要校验小板的版本号,方案的版本号
-- 脚本产生的异常从210开始0xd2
function check_env()
-- 校验输入数据的长度
local skip,l=calc_skip(len(json_table["TaskArray"]),json_table["TaskArray"])
-- print("requier len=",skip+l)
if(skip+l+16>len(check_data)) then
print("check data too less.")
return "数据长度与方案不符",210
end
if(json_table["PlanID"]~=PLAN_ID) then
print("plan id check err.")
return "方案ID不符",211
end
return "ok",0
end
-- 取得表中第一个值
function get_err_code(err_code)
for k,v in pairs(err_code) do
if(v~=nil) then
return v
end
end
return nil
end
function main()
local err_str_table={"检测器异常","主电容异常","接触异常","桥丝阻值异常","芯片异常"}
local ch_err,ch_code=check_env()
if(ch_code~=0) then
return ch_err,ch_code
end
-- 展示任务执行结果
show_acks(mid(check_data,1,8))
show_stats(mid(check_data,9,8))
-- 开始判定
local err_code=judge()
if(len(err_code)>1) then
prints:print_a(err_code)
return "检测项目不足,无法判定",212
end
prints:print_a(err_code)
local err=get_err_code(err_code)
if(err~=nil) then
if(err==0) then
judge_list()
return "ok",err
else
return err_str_table[err],err
end
else
print("err:err_code is none.")
return "数据不合规",213
end
end
-- 先返回错误描述,再返回错误码
return main()

135
python/protmcu.py Normal file
View File

@@ -0,0 +1,135 @@
import serial
import threading
def crc8(data:bytearray):
crc=0
num=len(data)
for j in range(num):
crc^=data[j]
crc&=0xff
for i in range(8):
if((crc&0x01)!=0):
crc>>=1
crc^=0x8c
crc&=0xff
else:
crc>>=1
return crc
class protm:
def __init__(self,com:str) -> None:
self.cmd=0
self.cmd_no=0
self.addr=0
self.host_addr=0
self.str_err=""
self.is_big_data=False
self.num_to_recv=0
self.recv_data=bytearray()
self.ser = serial.Serial(port=com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,timeout=None)
def encode(self,data:bytearray):
t=bytearray()
length=len(data)+10
t.append(0x59)
t.append(0x65)
t.append(length&0xff)
t.append(length>>8)
t.append(self.addr)
t.append(self.host_addr)
t.append(self.cmd)
t.append(self.cmd_no&0xff)
t.append(self.cmd_no>>8)
t+=data
a=crc8(t)
t.append(a)
# print("encode:",t.hex(","))
return t
def decode(self,data:bytearray):
self.str_err="ok"
if(len(data)<10):
print("recv data len too less.")
self.str_err="recv data len too less."
return bytearray()
self.host_addr=data[4]
self.addr=data[5]# 回应任意地址的命令
length=data[2]|(data[3]<<8)
crc=crc8(data[:-1])
if(length!=len(data)):
print("recv data have lossed")
self.str_err="recv data have lossed"
return bytearray()
self.cmd_no=data[7]|(data[8]<<8)
self.cmd=data[6]
if(crc!=data[-1]):
print("recv data check error.h_crc=%02x,crc=%02x",crc,data[-1])
self.str_err="recv data check error."
return data[9:-1]
def recv(self):
data=bytearray()
while(self.ser.is_open):
d=bytes()
try:
d=self.ser.read(1)
except Exception as a:
print("err:",str(a))
return
data+=d
if(len(data)==2):
if(data[0]==0x59 and data[1]==0x65):
self.num_to_recv=4
else:
print("data={d}".format(d=data.hex(",")))
data=data[1:]
self.num_to_recv=0
elif(len(data)==4):
length=data[2]|(data[3]<<8)
self.num_to_recv=length
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
t=self.decode(data)
self.recv_data+=t
# print("recv",t.hex(","))
self.handle(self.cmd,t)
data.clear()
# else:
# print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
def send(self,cmd:int,data:bytearray):
self.cmd=cmd
self.cmd_no+=1
print("send:",data.hex(","))
self.ser.write(self.encode(data))
def start_recv(self):
self.thread_ = threading.Thread(target=self.recv, args=())
self.thread_.start()
def wait(self):
self.thread_.join()
def close(self):
if(self.ser.is_open):
self.ser.close()
def handle(self,cmd:int,data:bytearray):
print("cmd={d1},data={d2}".format(d1=cmd,d2=data.hex(",")))
if(self.addr==1):
print("skip addr:1")
return
t=bytearray()
for i in range(23):
t.append(i+1)
self.send(0x16,t)
u=protm("com5")
u.start_recv()
u.wait()

431
python/prottcp.py Normal file
View File

@@ -0,0 +1,431 @@
import serial
import serial.tools.list_ports
import threading
import time
import json
import socket
from PyQt5.QtCore import *
def crc16(data:bytearray,offset:int,len:int):
if(len>0):
crc=0xffff
for i in range(len-offset):
crc=(crc^data[i+offset])&0xffff
for j in range(8):
if(crc&1)!=0:
crc=((crc>>1)^0xa001)&0xffff
else:
crc=(crc>>1)&0xffff
return crc&0xff,(crc>>8)&0xff
return 0,0
def crc32(data:bytearray):
temp=0
crc=0xffffffff
i=0
if(len(data)%4!=0):
return 0
while(i<len(data)):
temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24)
i+=4
for j in range(32):
if((crc^temp)&0x80000000)!=0:
crc=0x04c11db7^(crc<<1)
else:
crc<<=1
temp<<=1
crc&=0xffffffff
return crc
# 把tcp封装为串口
class utcp:
is_open=False
def __init__(self,port:int)->None:
self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ser.bind(("",port))
self.ser.settimeout(10)
self.ser.listen(128)
print("wait for mcu connect.")
self.client,self.client_addr=self.ser.accept()
print("client:",self.client_addr)
self.is_open=True
def read(self,len:int):
# return self.ser.recv(len)
return self.client.recv(len)
def write(self,data:bytearray):
# return self.ser.send(data)
return self.client.send(data)
def close(self):
self.client.close()
self.ser.close()
self.is_open=False
def scheme_task_to_byte(j:json):
data=bytearray()
data.append(j["TaskID"])
data.append(j["TaskIndex"])
data.append(j["RetryCount"])
data.append(j["ErrJumpTo"])
data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4))
for i in j["ParamVal"]:
data.append(i&0xff)
data.append(i>>8)
return data
def scheme_tasks_to_byte(j:json):
data=bytearray()
for i in j["TaskArray"]:
data+=scheme_task_to_byte(i)
length=len(data)
if(length<(0x700-4)):
for i in range(0x700-4-length):
data.append(0xff)
return data
def scheme_taskids_to_byte(j:json):
t=bytearray()
t.append(j["PlanID"]&0xff)
t.append((j["PlanID"]>>8)&0xff)
t.append((j["PlanID"]>>16)&0xff)
t.append((j["PlanID"]>>24)&0xff)
for i in j["TaskArray"]:
t.append(i["TaskID"])
length=len(t)
if(length<(0x100)):
for i in range(0x100-length):
t.append(0xff)
return t
def scheme_to_byte(j:json):
t=bytearray()
t+=scheme_taskids_to_byte(j)
t+=scheme_tasks_to_byte(j)
crc=crc32(t)
t.append(crc&0xff)
t.append((crc>>8)&0xff)
t.append((crc>>16)&0xff)
t.append((crc>>24)&0xff)
return t
# int转数组
def arr_from_int(num:int):
return bytearray([num&0xff,(num>>8)&0xff,(num>>16)&0xff,(num>>24)&0xff])
# 先max后min
def scheme_get_task_range(j:json):
t=bytearray()
return_count=j["ReturnCount"]
for i in j["TestStandard"]:
t+=arr_from_int(i["Max"])
t+=arr_from_int(i["Min"])
n=return_count-len(j["TestStandard"])
for i in range(n):
t+=arr_from_int(65535)
t+=arr_from_int(0)
return t
def scheme_to_host(j:json):
t=bytearray()
t+=arr_from_int(j["PlanID"])
t+=arr_from_int(j["TimeOutM"])
b=bytearray()
for i in j["TaskArray"]:
b+=scheme_get_task_range(i)
t+=arr_from_int(len(b)//8)
t+=b
return t
# 发送文件类
class handle:
stat=0
name=""
def __init__(self) -> None:
pass
def get_rate(self):
return self.packet_now*100//self.packet_all
def get_data(self):
if(self.stat==0):
length=len(self.data)
# 1是写0是读
t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff])
t+=self.name.encode()
self.stat=1
return t
elif(self.stat==1):
packet_bytes=200
if(len(self.data)<self.sent_bytes):
packet_bytes=len(self.data)-self.sent_bytes
if(packet_bytes>0):
self.packet_now+=1
t=bytearray()
t.append(2)
t.append(self.packet_now&0xff)
t.append((self.packet_now>>8)&0xff)
t.append(self.packet_all&0xff)
t.append((self.packet_all>>8)&0xff)
t+=self.data[self.sent_bytes:self.sent_bytes+packet_bytes]
self.sent_bytes+=packet_bytes
return t
else:
print("send done.")
return bytearray()
def set_file(self,name:str):
self.data=bytearray()
with open(name,"rb") as f:
self.data=f.read()
self.name=f.name.split('/')[-1]
self.stat=0
self.packet_all=(len(self.data)+199)//200
self.sent_bytes=0
self.packet_now=0
self.packet_bytes=200
def set_json(self,name:str):
self.data=bytearray()
with open(name,"rb") as f:
json_obj=json.loads(f.read())
d=scheme_to_byte(json_obj)
# print("len=",len(d),d.hex(","))
d+=scheme_to_host(json_obj)
# print("len=",len(d),d.hex(","))
self.data=d
self.name=f.name.split('/')[-1]
self.stat=0
self.packet_all=(len(self.data)+199)//200
self.sent_bytes=0
self.packet_now=0
self.packet_bytes=200
class protu(QObject):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
# 接收到数据信号
recv_signal =pyqtSignal([int,bytearray,str])
hand=handle()
def __init__(self) -> None:
QObject.__init__(self)
self.cmd=0
self.cmd_no=0
self.str_err=""
self.is_big_data=False
self.num_to_recv=0
self.recv_data=bytearray()
def init(self,com:str):
s=com.split(":")
try:
if(s[0]=="utcp"):
self.ser = utcp(int(s[1]))
else:
self.ser = serial.Serial(port=s[0], baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,timeout=None)
except Exception as e:
print(str(e))
return False
return True
def encode(self,data:bytearray):
t=bytearray()
length=len(data)+3
t.append(0x59)
t.append(0x6d)
t.append(length&0xff)
t.append(length>>8)
t.append(self.cmd)
t.append(self.cmd_no&0xff)
t.append(self.cmd_no>>8)
t+=data
a,b=crc16(t,2,length+4)
t.append(a)
t.append(b)
# print("encode:",t.hex(","))
return t
def decode(self,data:bytearray):
self.str_err="ok"
if(len(data)<10):
print("recv data len too less.")
self.str_err="recv data len too less."
return bytearray()
if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
print("frame head not 0x59 0x6d.")
self.str_err="frame head not 0x59 0x6d."
return bytearray()
length=data[3]|(data[4]<<8)
if(length==65535):
# 重新设置数据长度
length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
self.is_big_data=True
else:
self.is_big_data=False
if(length+7!=len(data)):
print("recv data have lossed")
self.str_err="recv data have lossed"
return bytearray()
a,b=crc16(data,3,length+5)
if(a!=data[-2] or b!=data[-1]):
print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
self.str_err="recv data check error."
self.cmd_no=data[6]|(data[7]<<8)
self.cmd=data[5]
if(self.is_big_data==False):
return data[8:-2]
else:
return data[12:-2]
def recv(self):
# self.recv_signal.connect(self.send_file_next)
data=bytearray()
while(self.ser.is_open):
d=bytes()
try:
d=self.ser.read(1)
except Exception as a:
# print("err:",str(a))
print("port closed")
return
data+=d
if(len(data)==3):
if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
self.num_to_recv=5
else:
data=data[1:]
self.num_to_recv=0
elif(len(data)==5):
length=data[3]|(data[4]<<8)
if(length<65535):
self.num_to_recv+=length+2
self.is_big_data=False
else:
self.num_to_recv=12
self.is_big_data=True
elif(len(data)==12):
if(self.is_big_data==True):
length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
self.num_to_recv=5+length+2
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
t=self.decode(data)
self.recv_data+=t
print("recv",t.hex(","))
self.recv_signal.emit(self.cmd,t,self.str_err)
# self.send_file_next(self.cmd,t,self.str_err)
print("sent signal---")
data.clear()
# else:
# print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
def send(self,cmd:int,data:bytearray):
self.cmd=cmd
self.cmd_no+=1
print("send:",data.hex(","))
self.ser.write(self.encode(data))
def start_recv(self):
self.thread_ = threading.Thread(target=self.recv, args=())
self.thread_.start()
def wait(self):
self.thread_.join()
def close(self):
try:
if(self.ser.is_open):
self.ser.close()
except Exception as e:
print(str(e))
def send_file_next(self,cmd:int,data:bytearray,err:str):
print("send_file_next")
data=self.hand.get_data()
if(len(data)>0):
self.send(cmd,data)
else:
self.close()
def send_file(self,cmd:int,name:str):
self.start_recv()
self.hand.set_file(name)
# self.send_file_next(cmd,bytearray([0]),"ok")
self.recv_signal.emit(cmd,bytearray([0]),"ok")
self.wait()
# 0x30 开始检测
# 0x31 检测上报
# 0x32 读写方案
# 0x33 连接维护
# 0x34 自检数据
# 0x35 自检上报
# 0x36 升级脚本
# 0xed 主板升级
# 0xee 小板升级
# with open("file/judge.lua","rb") as f:
# send_file("COM5",0x36,f.name.split('/')[-1],f.read())
# with open("file/JQ_JCXB_V54.bin","rb") as f:
# send_file("COM5",0xee,f.name.split('/')[-1],f.read())
# with open("file/checker_ye_cfg.json","rb") as f:
# json_obj=json.loads(f.read())
# d=scheme_to_byte(json_obj)
# print("len=",len(d),d.hex(","))
# d=scheme_to_host(json_obj)
# print("len=",len(d),d.hex(","))
# send_file("COM5",0x32,f.name.split('/')[-1],f.read())
if __name__ == "__main__":
u=protu()
u.init("utcp:7777")
# u.send_file(0xee,"file/JQ_JCXB_V54.bin")
u.send_file(0xed,"../Objects/checker_gen1_app_20230602.bin")

292
python/protuc.py Normal file
View File

@@ -0,0 +1,292 @@
import serial
import serial.tools.list_ports
import threading
import time
import json
# ports_list = list(serial.tools.list_ports.comports())
# if len(ports_list) <= 0:
# print("无串口设备。")
# else:
# print("可用的串口设备如下:")
# for comport in ports_list:
# print(list(comport)[0], list(comport)[1])
def crc16(data:bytearray,offset:int,len:int):
if(len>0):
crc=0xffff
for i in range(len-offset):
crc=(crc^data[i+offset])&0xffff
for j in range(8):
if(crc&1)!=0:
crc=((crc>>1)^0xa001)&0xffff
else:
crc=(crc>>1)&0xffff
return crc&0xff,(crc>>8)&0xff
return 0,0
def crc32(data:bytearray):
temp=0
crc=0xffffffff
i=0
if(len(data)%4!=0):
return 0
while(i<len(data)):
temp=data[i]|(data[i+1]<<8)|(data[i+2]<<16)|(data[i+3]<<24)
i+=4
for j in range(32):
if((crc^temp)&0x80000000)!=0:
crc=0x04c11db7^(crc<<1)
else:
crc<<=1
temp<<=1
crc&=0xffffffff
return crc
class protu:
def __init__(self,com:str) -> None:
self.cmd=0
self.cmd_no=0
self.str_err=""
self.is_big_data=False
self.num_to_recv=0
self.recv_data=bytearray()
self.ser = serial.Serial(port=com, baudrate=115200,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,timeout=None)
def encode(self,data:bytearray):
t=bytearray()
length=len(data)+3
t.append(0x59)
t.append(0x6d)
t.append(length&0xff)
t.append(length>>8)
t.append(self.cmd)
t.append(self.cmd_no&0xff)
t.append(self.cmd_no>>8)
t+=data
a,b=crc16(t,2,length+4)
t.append(a)
t.append(b)
# print("encode:",t.hex(","))
return t
def decode(self,data:bytearray):
self.str_err="ok"
if(len(data)<10):
print("recv data len too less.")
self.str_err="recv data len too less."
return bytearray()
if(data[0]!=0x59 or data[1]!=0x6d or data[2]!=0x43):
print("frame head not 0x59 0x6d.")
self.str_err="frame head not 0x59 0x6d."
return bytearray()
length=data[3]|(data[4]<<8)
if(length==65535):
# 重新设置数据长度
length=data[7]|(data[8]<<8)|(data[9]<<16)|(data[10]<<24);
self.is_big_data=True
else:
self.is_big_data=False
if(length+7!=len(data)):
print("recv data have lossed")
self.str_err="recv data have lossed"
return bytearray()
a,b=crc16(data,3,length+5)
if(a!=data[-2] or b!=data[-1]):
print("recv data check error.h_crc=%02x %02x,crc=%02x %02x",a,b,data[-2],data[-1])
self.str_err="recv data check error."
self.cmd_no=data[6]|(data[7]<<8)
self.cmd=data[5]
if(self.is_big_data==False):
return data[8:-2]
else:
return data[12:-2]
def recv(self,dolater):
data=bytearray()
while(self.ser.is_open):
d=bytes()
try:
d=self.ser.read(1)
except Exception as a:
print("err:",str(a))
return
data+=d
if(len(data)==3):
if(data[0]==0x59 and data[1]==0x6d and data[2]==0x43):
self.num_to_recv=5
else:
data=data[1:]
self.num_to_recv=0
elif(len(data)==5):
length=data[3]|(data[4]<<8)
if(length<65535):
self.num_to_recv+=length+2
self.is_big_data=False
else:
self.num_to_recv=12
self.is_big_data=True
elif(len(data)==12):
if(self.is_big_data==True):
length=data[8]|(data[9]<<8)|(data[10]<<16)|(data[11]<<24)
self.num_to_recv=5+length+2
if(self.num_to_recv>0 and self.num_to_recv==len(data)):
t=self.decode(data)
self.recv_data+=t
# dolater(self.cmd,t,self.str_err)
print("recv",t.hex(","))
try:
print("send next:")
next(dolater)
except:
print("dolater end.")
return
data.clear()
# else:
# print("len(data)={d1},num_ro_recv={d2}".format(d1=len(data),d2=self.num_to_recv))
def send(self,cmd:int,data:bytearray):
self.cmd=cmd
self.cmd_no+=1
# print("send:",data.hex(","))
self.ser.write(self.encode(data))
def start_recv(self,dolater):
self.thread_ = threading.Thread(target=self.recv, args=(dolater,))
self.thread_.start()
def wait(self):
self.thread_.join()
def close(self):
if(self.ser.is_open):
self.ser.close()
def make_dolater(cmd:int,data:bytearray,u:protu):
dataa=data
def dolater(cmd:int,data_:bytearray,str_err:str):
# print("cmd={d1},data={d2},err={d3}".format(d1=cmd,d2=data.hex(","),d3=str_err))
packet_all=(len(dataa)+199)//200
sent_bytes=0
packet_now=0
packet_bytes=200
while True:
packet_bytes=200
if(len(dataa)<sent_bytes):
packet_bytes=len(dataa)-sent_bytes
if(packet_bytes>0):
packet_now+=1
t=bytearray()
t.append(2)
t.append(packet_now&0xff)
t.append((packet_now>>8)&0xff)
t.append(packet_all&0xff)
t.append((packet_all>>8)&0xff)
t+=dataa[sent_bytes:sent_bytes+packet_bytes]
u.send(cmd,t)
sent_bytes+=packet_bytes
yield
else:
print("send done.")
u.close()
break
return dolater(cmd,data,u)
def scheme_task_to_byte(j:json):
data=bytearray()
data.append(j["TaskID"])
data.append(j["TaskIndex"])
data.append(j["RetryCount"])
data.append(j["ErrJumpTo"])
data.append((j["ParamCount"]&0x0f)|(j["ReturnCount"]<<4))
for i in j["ParamVal"]:
data.append(i&0xff)
data.append(i>>8)
return data
def scheme_tasks_to_byte(j:json):
data=bytearray()
for i in j["TaskArray"]:
data+=scheme_task_to_byte(i)
length=len(data)
if(length<(0x700-4)):
for i in range(0x700-4-length):
data.append(0xff)
return data
def scheme_taskids_to_byte(j:json):
t=bytearray()
t.append(j["PlanID"]&0xff)
t.append((j["PlanID"]>>8)&0xff)
t.append((j["PlanID"]>>16)&0xff)
t.append((j["PlanID"]>>24)&0xff)
for i in j["TaskArray"]:
t.append(i["TaskID"])
length=len(t)
if(length<(0x100)):
for i in range(0x100-length):
t.append(0xff)
return t
def scheme_to_byte(j:json):
t=bytearray()
t+=scheme_taskids_to_byte(j)
t+=scheme_tasks_to_byte(j)
crc=crc32(t)
t.append(crc&0xff)
t.append((crc>>8)&0xff)
t.append((crc>>16)&0xff)
t.append((crc>>24)&0xff)
return t
# 1是写,0,是读
def send_file(com:str,cmd:int,name:str,data:bytearray):
u=protu(com)
u.start_recv(make_dolater(cmd,data,u))
length=len(data)
t=bytearray([1,length&0xff,(length>>8)&0xff,(length>>16)&0xff,(length>>24)&0xff])
t+=name.encode()
u.send(cmd,t)
u.wait()
# 0x30 开始检测
# 0x31 检测上报
# 0x32 读写方案
# 0x33 连接维护
# 0x34 自检数据
# 0x35 自检上报
# 0x36 升级脚本
# 0xed 主板升级
# 0xee 小板升级
# with open("file/judge.lua","rb") as f:
# send_file("COM5",0x36,f.name.split('/')[-1],f.read())
with open("file/checker_ye_cfg.json","rb") as f:
json_obj=json.loads(f.read())
d=scheme_to_byte(json_obj)
print("len=",len(d),d.hex(","))
# send_file("COM5",0x32,f.name.split('/')[-1],f.read())

29
python/scheme_data.c Normal file
View File

@@ -0,0 +1,29 @@
#include "stdint.h"
typedef struct{
uint32_t max;
uint32_t min;
}item_def;
typedef struct
{
uint32_t item_num;
item_def data[0];
}task_def;
typedef struct
{
uint32_t plan_id;
uint32_t timeout_m;
uint32_t task_num;
task_def data[0];
}scheme_def;

48
python/signal_test.py Normal file
View File

@@ -0,0 +1,48 @@
import serial
import serial.tools.list_ports
import threading
import time
import json
import socket
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import sys
class protu(QObject):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
# 接收到数据信号
recv_signal =pyqtSignal([int,bytearray,str])
def __init__(self) -> None:
QObject.__init__(self)
self.app = QApplication(sys.argv)
def wait_in_thread(self):
print("wait int thread")
while(True):
self.recv_signal.emit(0,bytearray(),"ok")
print("in wait")
time.sleep(1)
def start_thread(self):
self.recv_signal.connect(self.recv_slot)
self.thread_ = threading.Thread(target=self.wait_in_thread, args=())
self.thread_.start()
self.recv_signal.emit(0,bytearray(),"ok")
sys.exit(self.app.exec())
def recv_slot(self,ack:int,data:bytearray,err:str):
print("recv slot")
pro=protu()
pro.start_thread()

441
python/updata.py Normal file
View File

@@ -0,0 +1,441 @@
import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import threading
import time
import portalocker
import serial
import serial.tools.list_ports
import prottcp
STR_RED="\033[1;31m"
STR_BLUE="\033[1;34m"
STR_END="\033[0m"
class progress_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 15))
self.lable.setText(text)
self.p=QProgressBar(father)
self.p.setValue(0)
self.p.setGeometry(QRect(40,x+20,450,20))
def set_rate(self,rate:int):
# print(text)
self.p.setValue(rate)
class data_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,title:str,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 20))
self.lable.setText(title)
self.text = QLabel(father)
self.text.setObjectName(u"text")
self.text.setGeometry(QRect(40,x+20,450,30))
self.text.setText(text)
class updata_dlg(QObject):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
failed_signal = pyqtSignal()
def __init__(self):
QObject.__init__(self)
self.app = QApplication(sys.argv)
self.widget = QWidget()
self.widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.widget.resize(703, 409)
self.widget.setWindowTitle("批检仪程序升级")
self.file_list_init()
self.save_but_init()
self.cmd_but_init()
self.sstate_but_init()
self.com_but_init()
self.com_init()
self.widget.destroyed.connect(self.quit)
self.failed_signal.connect(self.updata_failed)
self.cmd=0
self.port_is_open=False
def quit(self):
self.close_port()
# 程序退出
qApp.exit(1)
# 初始化文件列表
def file_list_init(self):
self.file_list = QListWidget(self.widget)
self.file_list.setObjectName(u"file_list")
self.file_list.setGeometry(QRect(25, 60, 531, 310))
self.file_list.setFrameShape(QFrame.Shape.Box)
self.file_list.setMidLineWidth(1)
self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.file_list.itemDoubleClicked.connect(self.item_clicked)
# 初始化打开端口按钮
def com_but_init(self):
self.com_but = QPushButton(self.widget)
self.com_but.setObjectName(u"com_but")
self.com_but.setGeometry(QRect(450, 10, 93, 28))
self.com_but.setText("打开端口")
self.com_but.clicked.connect(self.com_but_clicked)
# 初始化发送文件按钮
def save_but_init(self):
self.save_but = QPushButton(self.widget)
self.save_but.setObjectName(u"save_but")
self.save_but.setGeometry(QRect(590, 60, 93, 28))
self.save_but.setText("发送文件")
self.save_but.clicked.connect(self.save_but_clicked)
# 初始化发送命令按钮
def cmd_but_init(self):
self.cmd_but = QPushButton(self.widget)
self.cmd_but.setObjectName(u"save_but")
self.cmd_but.setGeometry(QRect(590, 100, 93, 28))
self.cmd_but.setText("升级MCU")
self.cmd_but.clicked.connect(self.cmd_but_clicked)
# 初始化在线状态按钮
def sstate_but_init(self):
self.sstate_but = QPushButton(self.widget)
self.sstate_but.setObjectName(u"sstate_but")
self.sstate_but.setGeometry(QRect(590, 140, 93, 28))
self.sstate_but.setText("MCU在线状态")
self.sstate_but.clicked.connect(self.sstate_but_clicked)
# com口
def com_init(self):
self.com = QComboBox(self.widget)
self.com.setObjectName(u"com")
self.com.setGeometry(QRect(100, 10, 300, 21))
self.com.setEditable(True)
self.com.currentIndexChanged.connect(self.com_changed)
self.com.addItem("utcp:7777")
ports_list = list(serial.tools.list_ports.comports())
for comport in ports_list:
# print(comport.name,comport.description)
self.com.addItem(comport.name+":"+comport.description)
self.com_label = QLabel(self.widget)
self.com_label.setObjectName(u"label")
self.com_label.setGeometry(QRect(30, 10, 72, 15))
self.com_label.setText("COM口:")
# 显示消息框
def show_msg(self,msg:str):
m=QMessageBox(self.widget)
m.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
m.setText(msg)
m.setWindowTitle("提示")
m.show()
# 找到已选择的文件
def get_selected_file_by_type(self,type:str):
file_list=[]
items=self.file_list.selectedItems()
for i in items:
if(i.text()[-len(type):]==type):
file_list.append(i.text())
if(len(file_list)!=1):
self.show_msg("请选择一个并且只选择一个 "+type+" 文件")
return ""
return file_list[0]
# 获取已选择的文件列表
def get_selected_fils(self):
file_list=[]
items=self.file_list.selectedItems()
if(len(items)==0):
self.show_msg("请选择至少一个文件")
return []
have_elf=False
have_bin=False
have_lua=False
for i in items:
if(i.text()[-4:]==".elf"):
if(have_elf==True):
self.show_msg("只可选择一个 .elf 文件")
return []
have_elf=True
if(i.text()[-4:]==".bin"):
if(have_bin==True):
self.show_msg("只可选择一个 .bin 文件")
return []
have_bin=True
if(i.text()[-4:]==".lua"):
if(have_lua==True):
self.show_msg("只可选择一个 .lua 文件")
return []
have_lua=True
file_list.append(i.text())
return file_list
def item_clicked(self,item:QListWidgetItem ):
print("item clicked.")
print("slected item is",item.text())
def com_but_clicked(self):
print("com but clicked")
if(self.port_is_open==False):
self.open_port()
else:
self.close_port()
def com_changed(self,index:int):
print("com changed")
self.close_port()
def get_cmd(self,file:str):
l=[(".bin",0xee),(".pkt",0xed),(".json",0x32)]
for i in l:
if(file.endswith(i[0])):
return i[1]
return 0
# 发送文件按钮按下
def save_but_clicked(self):
print("save_but_clicked.")
path = self.getpath()+"file\\"
file_list=self.get_selected_fils()
if(len(file_list)==0):
return
self.cmd=self.get_cmd(file_list[0])
print("cmd=",self.cmd)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("上传文件")
w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.handle_=prottcp.handle()
if(file_list[0].endswith(".json")):
self.handle_.set_json("file/"+file_list[0])
else:
self.handle_.set_file("file/"+file_list[0])
d=self.handle_.get_data()
try:
self.port.send(self.cmd,d)
except Exception as e:
print("com not open")
del self.handle_
w.close()
return
self.creat_progress(w)
w.show()
# self.failed_signal.connect(w.close)
# w.destroyed.connect(self.close_port)
def updata_failed(self):
self.show_msg("打开端口失败")
# 创建进度条
def creat_progress(self,father:QDialog):
self.probar_list=[]
for i in range(1):
p=progress_box()
p.init(father,"发送文件",+i*40)
self.rate_signal.connect(p.set_rate)
self.probar_list.append(p)
# 发送命令按钮按下
def cmd_but_clicked(self):
print("cmd_but clicked.")
file=self.get_selected_file_by_type(".bin")
if(len(file)==0):
return
print("file:",file)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("升级mcu")
self.updata_mcu(file)
self.creat_progress(w)
w.show()
def sstate_but_clicked(self):
print("sstate_but clicked.")
# 开始运行
def run(self):
self.widget.show()
sys.exit(self.app.exec())
def set_port_state(self,state:bool):
self.port_is_open=state
if(state==True):
self.com_but.setText("关闭端口")
else:
self.com_but.setText("打开端口")
# 扫描文件
def scan_file(self):
self.file_list.addItems(self.find_type([".bin",".json",".pkt"]))
# 扫描指定类型的文件
def find_type(self,types:list):
path = self.getpath()+"file\\"
if not os.path.exists(path):
os.makedirs(path)
list=os.listdir(path)
file_list=[]
for t in types:
for i in list:
if(len(t)>0):
if(i[-len(t):]==t):
file_list.append(i)
else:
file_list.append(i)
return file_list
# 获得文件绝对路径
def getpath(self):
path=os.path.abspath(sys.argv[0])
list_str=path.split("\\")
return path.replace(list_str[-1],"")
# 调用此函数打开通信
def open_port(self):
t = threading.Thread(target=self.com_thread, args=())
t.start()
def com_thread(self):
self.port=prottcp.protu()
item=self.com.itemText(self.com.currentIndex())
print("item text:",item)
if(self.port.init(item)==False):
print("init port failed.")
self.failed_signal.emit()
self.set_port_state(False)
return
else:
print("init port success.")
self.set_port_state(True)
self.port.recv_signal.connect(self.recv_slot)
self.port.start_recv()
self.port.wait()
def close_port(self):
print("close port")
self.set_port_state(False)
try:
self.port.close()
except Exception as e:
pass
def recv_slot(self,cmd:int,data:bytearray,err:str):
print("recv:",cmd,data)
if(self.cmd!=cmd):
return
try:
data=self.handle_.get_data()
if(len(data)>0):
self.port.send(cmd,data)
rate=self.handle_.get_rate()
self.rate_signal.emit(rate)
else:
del self.handle_
except Exception as e:
print(str(e))
# 开始升级mcu
def updata_mcu(self,file):
pass
# 小板通信测试
def comm_test(self):
pass
def end_slot(self,ip,ack,err):
# print(ip,ack,err)
if(ack==False):
self.show_msg(ip+":"+err)
def rate_slot(self,rate):
# print("rate signal:",ip,rate)
self.rate_signal.emit(rate)
def data_slot(self,ip,text):
pass
# 创建数据显示
def creat_databoxs(self,father:QDialog,data_list:list):
self.datab_list=[]
for i in range(len(data_list)):
p=data_box()
p.init(father,data_list[i][0],data_list[i][1],+i*50)
self.datab_list.append(p)
class locker():
def _get_lock(self):
file_name = os.path.basename(__file__)
# linux等平台依然使用标准的/var/run其他nt等平台使用当前目录
if os.name == "posix":
lock_file_name = f"/var/run/{file_name}.pid"
else:
lock_file_name = f"{file_name}.pid"
self.fd = open(lock_file_name, "w")
try:
portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB)
# 将当前进程号写入文件
# 如果获取不到锁上一步就已经异常了,所以不用担心覆盖
self.fd.writelines(str(os.getpid()))
# 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件
self.fd.flush()
except:
print(f"{file_name} have another instance running.")
exit(1)
def __init__(self):
self._get_lock()
# 和fcntl有点区别portalocker释放锁直接有unlock()方法
# 还是一样,其实并不需要在最后自己主动释放锁
def __del__(self):
portalocker.unlock(self.fd)
if __name__ == "__main__":
lock=locker()
dlg=updata_dlg()
dlg.scan_file()
dlg.run()