1163 lines
57 KiB
Python
1163 lines
57 KiB
Python
#通用库
|
||
import os
|
||
import sys
|
||
import codecs
|
||
import numpy
|
||
import serial
|
||
import time
|
||
import datetime
|
||
from serial.serialutil import Timeout
|
||
import serial.tools.list_ports
|
||
from logging import exception, warning
|
||
from xml.dom import minidom
|
||
import xmodem
|
||
from multiprocessing import Process, Queue, freeze_support
|
||
|
||
#QT库
|
||
from PyQt5 import QtWidgets, QtCore
|
||
from PyQt5.QtGui import QCursor, QColor
|
||
from PyQt5.QtWidgets import QApplication, QMenu, QMessageBox, QFileDialog, QColorDialog
|
||
from PyQt5.QtWidgets import QTableWidgetItem, QTreeWidgetItem, QHeaderView
|
||
from PyQt5.QtCore import QTimer
|
||
from xmb import Ui_MainWindow
|
||
from config import Ui_config
|
||
|
||
#全局变量,这里当做宏定义使用
|
||
g_file_name_dtest_xml = "dtest.xml"
|
||
g_module_connect_key = b"WQKL"
|
||
g_module_connected_key = b"C"
|
||
# g_module_connected_key = b"Recieving RAM-IMAGE in xmodem : C"
|
||
g_module_relay_on_key = bytes.fromhex('FF0F0000000801FF301D')
|
||
g_module_relay_off_key = bytes.fromhex('FF0F000000080100705D')
|
||
g_module_log_key_reboot = "[REBOOT]"
|
||
g_module_log_key_info = "[INFO]"
|
||
g_module_log_key_start = "[START]"
|
||
g_module_log_key_result = "[RESULT]"
|
||
g_module_log_key_data = "[DATA]"
|
||
g_rule_number = 20
|
||
|
||
#考虑到涉及xml配置信息时都需要读取minidom和treewidget,采用变量保存信息能提高效率
|
||
#python可以使用numpy定义结构体,相当与字典,但是没有append方法,所以采用列表,下标固定
|
||
g_idx_name = 0 #dtest节点name下标
|
||
g_idx_note = 1 #dtest节点note下标
|
||
g_idx_eb = 2 #dtest节点enable下标
|
||
g_idx_slave = 3 #dtest节点slave_mode下标
|
||
g_idx_name_c = 0 #case节点name下标
|
||
g_idx_note_c = 1 #case节点note下标
|
||
g_idx_eb_c = 2 #case节点enable下标
|
||
#关键字规则采用list保存,为了方便后续拓展,下标使用固定数值
|
||
g_idx_rule_key = 0
|
||
g_idx_rule_color = 1
|
||
g_idx_rule_eb = 2
|
||
|
||
#状态机对应的状态
|
||
g_fsm_free = 0
|
||
g_fsm_connect = 1
|
||
g_fsm_download = 2
|
||
g_fsm_running = 3
|
||
|
||
# 消息弹窗对应的消息类型
|
||
g_msg_about = 0
|
||
g_msg_crirical = 1
|
||
g_msg_info = 2
|
||
g_msg_question = 3
|
||
g_msg_warning = 4
|
||
|
||
#这里的全局变量用于xmodem操作,保存2个下载串口的句柄
|
||
g_port_xmodex = []
|
||
|
||
#这是是用于给子进程抛消息的队列
|
||
g_list_queue = []
|
||
|
||
#这里的全局函数也是为了给xmodem使用
|
||
def xmodem_transf_0_getc(size, timeout = 1):
|
||
return g_port_xmodex[0].read(size) or None
|
||
def xmodem_transf_0_putc(data, timeout = 1):
|
||
return g_port_xmodex[0].write(data)
|
||
def xmodem_transf_0_callback(total, succeed, failed):
|
||
if total % 10 == 0:
|
||
print(".", end='')
|
||
|
||
def xmodem_transf_1_getc(size, timeout = 1):
|
||
return g_port_xmodex[1].read(size) or None
|
||
def xmodem_transf_1_putc(data, timeout = 1):
|
||
return g_port_xmodex[1].write(data)
|
||
def xmodem_transf_1_callback(total, succeed, failed):
|
||
if total % 10 == 0:
|
||
print(".", end='')
|
||
|
||
def process_log_file_write(g_list_queue):
|
||
msg_process = "" # python子进程不能使用print(stdout),所以打印到文件
|
||
dir_script = os.getcwd() # 当前脚本所在目录
|
||
# 创建debug日志文件和终端日志文件,路径固定为软件所在目录下的log文件夹
|
||
dir_log = os.path.join(dir_script, "log")
|
||
path_debug = dir_log + "\\debug " + \
|
||
datetime.datetime.now().strftime('%Y-%m-%d %H%M%S') + ".log"
|
||
path_terminal = dir_log + "\\terminal " + \
|
||
datetime.datetime.now().strftime('%Y-%m-%d %H%M%S') + ".log"
|
||
|
||
# 每个dtest打印单独存为一个log文件,位于history目录下时间戳最新的子目录下
|
||
# os.listdir是名称从小到大返回列表
|
||
dir_dtest_history = os.path.join(dir_script, "history")
|
||
dir_dtest_list = os.listdir(dir_dtest_history)
|
||
dir_dtest = dir_dtest_list[len(dir_dtest_list) - 1]
|
||
|
||
if not os.access(dir_log, os.F_OK):
|
||
msg_process = "创建log目录: " + dir_log + "\r\n"
|
||
os.mkdir(dir_log)
|
||
fd_debug = codecs.open(path_debug, 'w', 'utf-8')
|
||
fd_terminal = codecs.open(path_terminal, 'w', 'utf-8')
|
||
fd_dtest = None
|
||
|
||
while True:
|
||
if g_list_queue[0].full():
|
||
msg_process += "调试log队列已满!!!\r\n"
|
||
|
||
if not g_list_queue[1].empty():
|
||
if g_list_queue[1].full():
|
||
msg_process += "终端队列已满!!!\r\n"
|
||
msg = g_list_queue[1].get()
|
||
msg = str(msg).replace("\r\n", "")
|
||
# 写入单独的log文件
|
||
if not g_list_queue[2].empty():
|
||
if fd_dtest:
|
||
fd_dtest.close()
|
||
name_dtest = g_list_queue[2].get()
|
||
name_dtest = str(name_dtest).replace(".bin", "")
|
||
path_dtest_log = os.path.join(dir_dtest_history, dir_dtest,
|
||
name_dtest, name_dtest + ".log")
|
||
fd_dtest = codecs.open(path_dtest_log, 'w', 'utf-8')
|
||
msg_process += "创建" + name_dtest + "对应的log文件,路径:" + \
|
||
path_dtest_log + "\r\n"
|
||
if fd_dtest:
|
||
# msg_process += "写入单独的log文件:" + msg + "\r\n"
|
||
fd_dtest.write(msg)
|
||
fd_dtest.flush()
|
||
# 写入总的log文件
|
||
fd_terminal.write(msg)
|
||
fd_terminal.flush()
|
||
# 写入调试日志
|
||
if not g_list_queue[0].empty():
|
||
msg = g_list_queue[0].get()
|
||
fd_debug.write(msg + "\r\n")
|
||
fd_debug.flush()
|
||
if len(msg_process):
|
||
fd_debug.write(msg_process + "\r\n")
|
||
fd_debug.flush()
|
||
msg_process = ""
|
||
|
||
class kl3_test_app(QtWidgets.QMainWindow, Ui_MainWindow):
|
||
def __init__(self, argv):
|
||
self.auto_test_mode = 0
|
||
if len(argv) > 1:
|
||
if argv[1] == "auto":
|
||
self.auto_test_mode = 1
|
||
print("init argv:", argv, ", auto test mode:", self.auto_test_mode)
|
||
super(kl3_test_app, self).__init__() #super调用父类的构造函数
|
||
self.setupUi(self)
|
||
#配置主界面标题栏
|
||
self.setWindowTitle("kl3 dtest测试工具 V1.0")
|
||
#使能debug、终端文本框只读属性
|
||
self.edit_debug.setReadOnly(True)
|
||
self.edit_terminal.setReadOnly(True)
|
||
#dtest\case\rule设置表头宽度自适应,不能手动更改(表头由设计工具直接产生)
|
||
self.widget_dtest.header().setSectionResizeMode(QHeaderView.ResizeToContents)
|
||
self.widget_case.header().setSectionResizeMode(QHeaderView.ResizeToContents)
|
||
self.widget_rule.horizontalHeader().setSectionResizeMode(\
|
||
0, QHeaderView.ResizeToContents)
|
||
self.widget_rule.horizontalHeader().setSectionResizeMode(\
|
||
1, QHeaderView.ResizeToContents)
|
||
self.widget_rule.horizontalHeader().setSectionResizeMode(\
|
||
2, QHeaderView.ResizeToContents)
|
||
self.widget_rule.horizontalHeader().setSectionResizeMode(\
|
||
3, QHeaderView.Stretch)
|
||
self.widget_rule.setRowCount(g_rule_number) #需要先设置行数,默认0行不显示
|
||
#使能dtest和case界面右键菜单
|
||
self.widget_dtest.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
|
||
self.widget_case.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
|
||
#连接按键信号与槽函数
|
||
self.pbt_cfg.clicked.connect(self.pbt_handle_cfg)
|
||
self.pbt_start_test.clicked.connect(self.pbt_handle_test_act)
|
||
self.widget_dtest.customContextMenuRequested.connect(
|
||
lambda: self.mouse_handle_widget_right_clicked(QCursor.pos(), 0))
|
||
self.widget_case.customContextMenuRequested.connect(
|
||
lambda: self.mouse_handle_widget_right_clicked(QCursor.pos(), 1))
|
||
#dtest、case、rule界面在初始化时会触发itemchanged信号,
|
||
# 因此itemchanged信号会在xml_file_load函数中解绑并重新绑定
|
||
self.widget_dtest.itemChanged.connect(self.tree_widget_item_changed_handle)
|
||
self.widget_case.itemChanged.connect(self.tree_widget_item_changed_handle)
|
||
self.widget_rule.cellChanged.connect(self.table_widget_cell_changed)
|
||
self.widget_rule.cellClicked.connect(self.table_widget_cell_clicked)
|
||
#连接动作信号与槽函数
|
||
self.act_debug_visible.triggered.connect(
|
||
lambda: self.act_handle_debug_interface_config(0))
|
||
self.act_debug_invisible.triggered.connect(
|
||
lambda: self.act_handle_debug_interface_config(1))
|
||
self.act_sel_all.triggered.connect(
|
||
lambda: self.act_handle_option_select(QApplication.focusWidget(), 2))
|
||
self.act_sel_none.triggered.connect(
|
||
lambda: self.act_handle_option_select(QApplication.focusWidget(), 0))
|
||
self.act_load.triggered.connect(lambda: self.act_handle_xml_file_open_save(0))
|
||
self.act_save.triggered.connect(lambda: self.act_handle_xml_file_open_save(1))
|
||
self.act_save_as.triggered.connect(lambda: self.act_handle_xml_file_open_save(2))
|
||
#配置界面实例化,连接信号与槽
|
||
self.cfg = Ui_config()
|
||
self.ui_cfg = QtWidgets.QDialog(self)
|
||
self.cfg.setupUi(self.ui_cfg)
|
||
self.ui_cfg.setWindowTitle("配置")
|
||
self.cfg.pbt_box_cfg.accepted.connect(self.pbt_handle_cfg_accept)
|
||
self.cfg.pbt_box_cfg.rejected.connect(self.pbt_handle_cfg_reject)
|
||
self.cfg.pbt_sel_file_0.clicked.connect(
|
||
lambda: self.pbt_handle_cfg_file_select(0))
|
||
self.cfg.pbt_sel_file_1.clicked.connect(
|
||
lambda: self.pbt_handle_cfg_file_select(1))
|
||
#IO操作比较费时,会导致界面刷新缓慢,改为使用队列+子进程方式实现写文件
|
||
g_list_queue.append(Queue(300)) # 软件调试打印队列
|
||
g_list_queue.append(Queue(300)) # dtest日志打印队列
|
||
g_list_queue.append(Queue(5)) # 当前dtest名称队列
|
||
thread_log = Process(target=process_log_file_write, args=(g_list_queue,))
|
||
thread_log.daemon = True
|
||
thread_log.start()
|
||
#手动模式时标准输出重定向到界面,自动测试模式时输出到标准输出(jenkins)
|
||
if self.auto_test_mode:
|
||
#自动测试模式时隐藏调试窗口
|
||
self.widget_debug.setVisible(False)
|
||
else:
|
||
self.stdout_old = sys.stdout
|
||
sys.stdout = self
|
||
#调用初始化函数
|
||
self.init()
|
||
|
||
def init(self):
|
||
#结构体定义
|
||
cfg_type = numpy.dtype({
|
||
'names':['fd', 'com', 'baud', 'data', 'parity', 'stop'],
|
||
'formats':['O', 'U32', 'i', 'i', 'U32', 'f']
|
||
})
|
||
#串口列表初始化
|
||
self.init_com_port()
|
||
#4个串口对应的数组,参数初始化为115200,8N1
|
||
self.np_port_info = numpy.array([(0, "", 115200, 8, 0, 0), (0, "", 115200, 8, 0, 0),
|
||
(0, "", 115200, 8, 0, 0), (0, "", 115200, 8, 0, 0)], dtype=cfg_type)
|
||
#固件路径初始化
|
||
self.list_path = ['', '']
|
||
# result文件(保存测试结果的文件)路径初始化
|
||
self.path_result_file = os.path.join(os.getcwd(), "dtest_file", "result.log")
|
||
#初始化一个保存xml配置信息的列表
|
||
#格式为[[[dtest1 info1, dtest1 info2], [case info1], [case info2]], [dtest2]]
|
||
self.list_dtest_info = []
|
||
#初始化关键字规则信息,格式为[[keyword1, color1, enable1], [keyword2...]]
|
||
self.list_rule_info = []
|
||
#dtest固件配置xml读取
|
||
self.xml_file_load('')
|
||
#初始化状态机(字典)
|
||
self.fsm = {'dtest':0, 'reboot':0, 'state':0, 'cycle':0}
|
||
#dtest测试采用timer+状态机的方式(进程while1和sleep会导致界面卡死)
|
||
self.timer = QTimer()
|
||
self.timer.timeout.connect(self.test_fsm_handle)
|
||
if self.auto_test_mode:
|
||
# 模拟触发开始测试按钮
|
||
print("开始自动测试")
|
||
self.pbt_handle_test_act()
|
||
|
||
def init_com_port(self):
|
||
#当前串口列表初始化,使用列表获取,没有则退出程序,有则私有字典保存com口字符串
|
||
self.dict_com = {}
|
||
list_com_temp = list(serial.tools.list_ports.comports())
|
||
if len(list_com_temp) <= 0:
|
||
print("未发现串口,程序退出")
|
||
self.msg_box_show(g_msg_warning, "未发现串口")
|
||
exit(1)
|
||
self.cfg.combo_com_0.addItem('')
|
||
self.cfg.combo_com_1.addItem('')
|
||
self.cfg.combo_com_2.addItem('')
|
||
self.cfg.combo_com_3.addItem('')
|
||
for port in list_com_temp:
|
||
self.dict_com["%s" % port[0]] = "%s" % port[1]
|
||
self.cfg.combo_com_0.addItem(port[0])
|
||
self.cfg.combo_com_1.addItem(port[0])
|
||
self.cfg.combo_com_2.addItem(port[0])
|
||
self.cfg.combo_com_3.addItem(port[0])
|
||
print("串口列表: %s" % self.dict_com)
|
||
|
||
def xml_file_load(self, path_file):
|
||
if (len(path_file) == 0): #传参为空,则使用默认文件
|
||
print("使用默认文件(%s)" % g_file_name_dtest_xml)
|
||
path_file = os.getcwd() + "\\" + g_file_name_dtest_xml
|
||
if not os.access(path_file, os.F_OK | os.R_OK):
|
||
msg = "文件不存在或不可读:\n" + path_file
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return
|
||
else:
|
||
#清空界面和相关列表
|
||
print("清空dtest、case和rule配置界面,清空列表内容")
|
||
# self.widget_case.setHeaderHidden(True)
|
||
self.widget_dtest.clear()
|
||
self.widget_case.clear()
|
||
self.widget_rule.clearContents()
|
||
# self.list_dtest_info.clear()
|
||
self.list_dtest_info = []
|
||
self.list_rule_info = []
|
||
self.path_xml_file = path_file
|
||
#解绑信号,防止在加载xml过程中更新界面触发信号
|
||
self.widget_dtest.itemChanged.disconnect(self.tree_widget_item_changed_handle)
|
||
self.widget_case.itemChanged.disconnect(self.tree_widget_item_changed_handle)
|
||
self.widget_rule.cellChanged.disconnect(self.table_widget_cell_changed)
|
||
#读取xml文件
|
||
cnt_dtest = 0
|
||
cnt_case = 0
|
||
self.xml_tree = minidom.parse(path_file)
|
||
root = self.xml_tree.documentElement
|
||
#读取debug界面使能信息
|
||
debug_interface_eb = root.getElementsByTagName("debug_interface")[0].firstChild.data
|
||
print("从xml文件中读取到的调试界面使能信息:%s" % debug_interface_eb)
|
||
if debug_interface_eb == "false": #默认是打开的
|
||
self.act_handle_debug_interface_config(0)
|
||
#读取关键字规则信息并显示
|
||
rules = root.getElementsByTagName("rule")
|
||
for i in range(0, g_rule_number):
|
||
# id_rule = int(rule.getAttribute("id"))
|
||
id_rule = i
|
||
item_rule = QTableWidgetItem("", 0)
|
||
item_rule.setCheckState(QtCore.Qt.Unchecked)
|
||
keyword = ""
|
||
color_str = "#000000"
|
||
eb = "false"
|
||
if i < len(rules):
|
||
if rules[i].getElementsByTagName("keyword")[0].hasChildNodes():
|
||
keyword = rules[i].getElementsByTagName("keyword")[0].childNodes[0].data
|
||
color_str = rules[i].getElementsByTagName("color")[0].childNodes[0].data
|
||
eb = rules[i].getElementsByTagName("enable")[0].childNodes[0].data
|
||
if eb == "true":
|
||
item_rule.setCheckState(QtCore.Qt.Checked)
|
||
self.list_rule_info.append([keyword, color_str, eb])
|
||
self.widget_rule.setItem(id_rule, 0, item_rule)
|
||
item_rule = QTableWidgetItem("0", 1)
|
||
item_rule.setFlags(item_rule.flags() & (~QtCore.Qt.ItemIsEditable))
|
||
self.widget_rule.setItem(id_rule, 1, item_rule)
|
||
color = QColor(color_str)
|
||
item_rule = QTableWidgetItem("", 0)
|
||
item_rule.setBackground(color)
|
||
self.widget_rule.setItem(id_rule, 2, item_rule)
|
||
self.widget_rule.setItem(id_rule, 3, QTableWidgetItem(keyword, 3))
|
||
print("从xml文件中读取的关键字规则信息:", self.list_rule_info)
|
||
#读取串口信息
|
||
ports = root.getElementsByTagName("port")
|
||
for port in ports:
|
||
# if port.hasChildNodes():
|
||
id_port = int(port.getAttribute("id"))
|
||
if port.getElementsByTagName("com")[0].hasChildNodes():
|
||
self.np_port_info[id_port]['com'] = \
|
||
port.getElementsByTagName("com")[0].childNodes[0].data
|
||
self.np_port_info[id_port]['baud'] = \
|
||
port.getElementsByTagName("baud")[0].childNodes[0].data
|
||
self.np_port_info[id_port]['data'] = \
|
||
port.getElementsByTagName("data")[0].childNodes[0].data
|
||
self.np_port_info[id_port]['parity'] = \
|
||
port.getElementsByTagName("parity")[0].childNodes[0].data
|
||
self.np_port_info[id_port]['stop'] = \
|
||
port.getElementsByTagName("stop")[0].childNodes[0].data
|
||
print("从xml文件中读取的串口信息:", self.np_port_info)
|
||
#获取路径信息
|
||
self.list_path[0] = root.getElementsByTagName("dir")[0].firstChild.data
|
||
if root.getElementsByTagName("dir")[1].hasChildNodes():
|
||
self.list_path[1] = root.getElementsByTagName("dir")[1].firstChild.data
|
||
print("从xml文件中读取的文件夹地址:", self.list_path)
|
||
#读取dtest信息
|
||
dtests = root.getElementsByTagName("dtest")
|
||
for dtest in dtests:
|
||
cnt_dtest += 1
|
||
id_dtest = int(dtest.getAttribute("id"))
|
||
name = dtest.getElementsByTagName("name")[0].childNodes[0].data
|
||
note = dtest.getElementsByTagName("note")[0].childNodes[0].data
|
||
eb = dtest.getElementsByTagName("enable")[0].childNodes[0].data
|
||
slave = dtest.getElementsByTagName("slave_mode")[0].childNodes[0].data
|
||
print("找到第%d个dtest\n\tname:%s\n\tnote:%s\n\tenable:%s\n"
|
||
"\tslave:%s" % (id_dtest, name, note, eb, slave))
|
||
self.list_dtest_info.append([])
|
||
self.list_dtest_info[id_dtest].append([name, note, eb, slave])
|
||
#添加dtest和case界面,还有一种方法是添加一个check box,选中时可以触发信号
|
||
item_dtest = QTreeWidgetItem(self.widget_dtest)
|
||
item_dtest.setText(0, str(id_dtest))
|
||
item_dtest.setText(1, name)
|
||
item_case_1 = QTreeWidgetItem(self.widget_case)
|
||
item_case_1.setText(0, str(id_dtest))
|
||
item_case_1.setText(1, name)
|
||
if (eb == "true"):
|
||
item_dtest.setCheckState(0, QtCore.Qt.Checked)
|
||
else:
|
||
item_dtest.setCheckState(0, QtCore.Qt.Unchecked)
|
||
item_case_1.setCheckState(0, QtCore.Qt.Unchecked)
|
||
item_case_1.setExpanded(True) #默认展开
|
||
#case界面填充
|
||
for node in dtest.getElementsByTagName("case"):
|
||
if node.hasChildNodes():
|
||
cnt_case += 1
|
||
id_case = int(node.getAttribute("id"))
|
||
name_case = node.getElementsByTagName("name")[0].childNodes[0].data
|
||
note_case = node.getElementsByTagName("note")[0].childNodes[0].data
|
||
eb_case = node.getElementsByTagName("enable")[0].childNodes[0].data
|
||
print("第%d个case,enable:%s, name:%s, note:%s" %
|
||
(id_case, eb_case, name_case, note_case))
|
||
self.list_dtest_info[id_dtest].append([name_case, note_case, eb_case])
|
||
item_case_2 = QTreeWidgetItem(item_case_1)
|
||
item_case_2.setText(0, str(id_case))
|
||
item_case_2.setText(1, name_case)
|
||
if eb_case == "true":
|
||
item_case_2.setCheckState(0, QtCore.Qt.Checked)
|
||
else:
|
||
item_case_2.setCheckState(0, QtCore.Qt.Unchecked)
|
||
print("dtest信息:", self.list_dtest_info)
|
||
#连接界面信号与槽
|
||
self.widget_dtest.itemChanged.connect(self.tree_widget_item_changed_handle)
|
||
self.widget_case.itemChanged.connect(self.tree_widget_item_changed_handle)
|
||
self.widget_rule.cellChanged.connect(self.table_widget_cell_changed)
|
||
|
||
def xml_file_save(self, path):
|
||
#更新dom tree中dir、dtest、port、rule部分
|
||
root = self.xml_tree.documentElement
|
||
#由于存在较多空行,会导致生成的xml文件有较多空行,这里选择删除
|
||
nodes = root.childNodes
|
||
i = 0
|
||
while(i < nodes.length):
|
||
if nodes[i].nodeType == minidom.Node.TEXT_NODE \
|
||
and str(nodes[i].data).find("\n") >= 0:
|
||
root.removeChild(nodes[i])
|
||
else:
|
||
nodes_son = nodes[i].childNodes
|
||
j = 0
|
||
while j < nodes_son.length:
|
||
if nodes_son[j].nodeType == minidom.Node.TEXT_NODE \
|
||
and str(nodes_son[j].data).find("\n") >= 0:
|
||
nodes[i].removeChild(nodes_son[j])
|
||
else:
|
||
nodes_grandson = nodes_son[j].childNodes
|
||
m = 0
|
||
while m < nodes_grandson.length:
|
||
if nodes_grandson[m].nodeType == minidom.Node.TEXT_NODE \
|
||
and str(nodes_grandson[m].data).find("\n") >= 0:
|
||
nodes_son[j].removeChild(nodes_grandson[m])
|
||
else:
|
||
m += 1
|
||
j += 1
|
||
i += 1
|
||
#调试界面使能flag不采用变量保存,而是直接读取界面状态
|
||
debug_interface = root.getElementsByTagName("debug_interface")
|
||
eb = "true" if self.widget_debug.isVisible() else "false"
|
||
debug_interface[0].firstChild.data = eb
|
||
|
||
dirs = root.getElementsByTagName("dir")
|
||
for i in range(len(self.list_path)):
|
||
if dirs[i].hasChildNodes():
|
||
dirs[i].firstChild.data = self.list_path[i]
|
||
else:
|
||
if len(self.list_path[i]):
|
||
dir_text = self.xml_tree.createTextNode(self.list_path[i])
|
||
dirs[i].appendChild(dir_text)
|
||
|
||
dtests = root.getElementsByTagName("dtest")
|
||
for i in range(0, len(self.list_dtest_info)):
|
||
dtests[i].getElementsByTagName("enable")[0].firstChild.data = \
|
||
self.list_dtest_info[i][0][g_idx_eb]
|
||
for j in range(1, len(self.list_dtest_info[i])):
|
||
dtests[i].getElementsByTagName("case")[j -1]\
|
||
.getElementsByTagName("enable")[0].firstChild.data = \
|
||
self.list_dtest_info[i][j][g_idx_eb_c]
|
||
|
||
ports = root.getElementsByTagName("port")
|
||
for i in range(0, len(self.np_port_info)):
|
||
if ports[i].getElementsByTagName("com")[0].hasChildNodes():
|
||
ports[i].getElementsByTagName("com")[0].firstChild.data = \
|
||
self.np_port_info[i]['com']
|
||
else:
|
||
com_text = self.xml_tree.createTextNode(self.np_port_info[i]['com'])
|
||
ports[i].getElementsByTagName("com")[0].appendChild(com_text)
|
||
ports[i].getElementsByTagName("baud")[0].firstChild.data = \
|
||
self.np_port_info[i]['baud']
|
||
ports[i].getElementsByTagName("data")[0].firstChild.data = \
|
||
self.np_port_info[i]['data']
|
||
ports[i].getElementsByTagName("parity")[0].firstChild.data = \
|
||
self.np_port_info[i]['parity']
|
||
ports[i].getElementsByTagName("stop")[0].firstChild.data = \
|
||
self.np_port_info[i]['stop']
|
||
|
||
rules = root.getElementsByTagName("rule")
|
||
for i in range(0, len(self.list_rule_info)):
|
||
rule_num_xml = len(rules)
|
||
if i < rule_num_xml:
|
||
if rules[i].getElementsByTagName("keyword")[0].hasChildNodes():
|
||
rules[i].getElementsByTagName("keyword")[0].firstChild.data = \
|
||
self.list_rule_info[i][g_idx_rule_key]
|
||
else:
|
||
keyword_text = self.xml_tree.createTextNode(\
|
||
self.list_rule_info[i][g_idx_rule_key])
|
||
rules[i].getElementsByTagName("keyword")[0].appendChild(keyword_text)
|
||
rules[i].getElementsByTagName("color")[0].firstChild.data = \
|
||
self.list_rule_info[i][1]
|
||
rules[i].getElementsByTagName("enable")[0].firstChild.data = \
|
||
self.list_rule_info[i][2]
|
||
else:
|
||
# if self.list_rule_info[i][g_idx_rule_eb] == "false":
|
||
# continue
|
||
rule_new = self.xml_tree.createElement("rule")
|
||
rule_new.setAttribute("id", str(i))
|
||
keyword = self.xml_tree.createElement("keyword")
|
||
keyword_text = self.xml_tree.createTextNode(\
|
||
self.list_rule_info[i][g_idx_rule_key])
|
||
keyword.appendChild(keyword_text)
|
||
color = self.xml_tree.createElement("color")
|
||
color_text = self.xml_tree.createTextNode(\
|
||
self.list_rule_info[i][g_idx_rule_color])
|
||
color.appendChild(color_text)
|
||
eb = self.xml_tree.createElement("enable")
|
||
eb_text = self.xml_tree.createTextNode(\
|
||
self.list_rule_info[i][g_idx_rule_eb])
|
||
eb.appendChild(eb_text)
|
||
rule_new.appendChild(keyword)
|
||
rule_new.appendChild(color)
|
||
rule_new.appendChild(eb)
|
||
root.appendChild(rule_new)
|
||
#保存xml文件,使用w模式打开,如文件存在则新建文件
|
||
# fd_xml = open(path, 'w')
|
||
fd_xml = codecs.open(path, 'wb', 'utf-8') #这里必须用codecs.open指定编码
|
||
if fd_xml == None:
|
||
msg = path + " 文件打开失败"
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return
|
||
self.xml_tree.writexml(fd_xml, indent='',
|
||
addindent=' ', newl='\n', encoding='UTF-8')
|
||
# self.xml_tree.writexml(fd_xml, encoding='utf-8')
|
||
fd_xml.close()
|
||
return
|
||
|
||
def pbt_handle_cfg(self):
|
||
print("配置按钮被按下")
|
||
print("默认窗口配置:", self.np_port_info)
|
||
#某些配置选项置为默认值
|
||
self.cfg.combo_com_0.setCurrentText(self.np_port_info[0]['com'])
|
||
self.cfg.combo_com_1.setCurrentText(self.np_port_info[1]['com'])
|
||
self.cfg.combo_com_2.setCurrentText(self.np_port_info[2]['com'])
|
||
self.cfg.combo_com_3.setCurrentText(self.np_port_info[3]['com'])
|
||
self.cfg.combo_baud_0.setCurrentText(str(self.np_port_info[0]['baud']))
|
||
self.cfg.combo_baud_1.setCurrentText(str(self.np_port_info[1]['baud']))
|
||
self.cfg.combo_baud_2.setCurrentText(str(self.np_port_info[2]['baud']))
|
||
self.cfg.combo_baud_3.setCurrentText(str(self.np_port_info[3]['baud']))
|
||
self.cfg.combo_data_0.setCurrentText(str(self.np_port_info[0]['data']))
|
||
self.cfg.combo_data_1.setCurrentText(str(self.np_port_info[1]['data']))
|
||
self.cfg.combo_data_2.setCurrentText(str(self.np_port_info[2]['data']))
|
||
self.cfg.combo_data_3.setCurrentText(str(self.np_port_info[3]['data']))
|
||
self.cfg.edit_fw_path_0.setText(str(self.list_path[0]))
|
||
self.cfg.edit_fw_path_1.setText(str(self.list_path[1]))
|
||
self.ui_cfg.show()
|
||
|
||
def pbt_handle_cfg_accept(self):
|
||
slave_take_effect = 0
|
||
list_cfg_valid = [0, 0]
|
||
print("配置界面 确定 按钮被按下,获取配置参数")
|
||
#串口0
|
||
self.np_port_info[0]['com'] = self.cfg.combo_com_0.currentText()
|
||
self.np_port_info[0]['baud'] = int(self.cfg.combo_baud_0.currentText())
|
||
self.np_port_info[0]['data'] = int(self.cfg.combo_data_0.currentText())
|
||
self.np_port_info[0]['parity'] = self.cfg.combo_parity_0.currentText()
|
||
self.np_port_info[0]['stop'] = float(self.cfg.combo_stop_0.currentText())
|
||
#串口1
|
||
self.np_port_info[1]['com'] = self.cfg.combo_com_1.currentText()
|
||
self.np_port_info[1]['baud'] = int(self.cfg.combo_baud_1.currentText())
|
||
self.np_port_info[1]['data'] = int(self.cfg.combo_data_1.currentText())
|
||
self.np_port_info[1]['parity'] = self.cfg.combo_parity_1.currentText()
|
||
self.np_port_info[1]['stop'] = float(self.cfg.combo_stop_1.currentText())
|
||
#串口2
|
||
self.np_port_info[2]['com'] = self.cfg.combo_com_2.currentText()
|
||
self.np_port_info[2]['baud'] = int(self.cfg.combo_baud_2.currentText())
|
||
self.np_port_info[2]['data'] = int(self.cfg.combo_data_2.currentText())
|
||
self.np_port_info[2]['parity'] = self.cfg.combo_parity_2.currentText()
|
||
self.np_port_info[2]['stop'] = float(self.cfg.combo_stop_2.currentText())
|
||
#串口3
|
||
self.np_port_info[3]['com'] = self.cfg.combo_com_3.currentText()
|
||
self.np_port_info[3]['baud'] = int(self.cfg.combo_baud_3.currentText())
|
||
self.np_port_info[3]['data'] = int(self.cfg.combo_data_3.currentText())
|
||
self.np_port_info[3]['parity'] = self.cfg.combo_parity_3.currentText()
|
||
self.np_port_info[3]['stop'] = float(self.cfg.combo_stop_3.currentText())
|
||
print("配置的串口参数:", self.np_port_info)
|
||
#文件路径
|
||
self.list_path[0] = self.cfg.edit_fw_path_0.text();
|
||
self.list_path[1] = self.cfg.edit_fw_path_1.text();
|
||
print("配置的固件路径参数:", self.list_path)
|
||
#判断配置是否合理,要求至少配主模式2个串口和一个文件路径
|
||
if (len(self.np_port_info[0]['com']) == 0
|
||
or len(self.np_port_info[1]['com']) == 0
|
||
or len(self.list_path[0]) == 0):
|
||
print("配置无效,至少需要2个串口和一个路径")
|
||
self.msg_box_show(g_msg_warning, '至少需要配置主模式的\n'
|
||
'2个串口和固件路径')
|
||
list_cfg_valid[0] = 0
|
||
else:
|
||
list_cfg_valid[0] = 1
|
||
#如果配置了从模式,同样需要检查参数是否有效
|
||
if (len(self.np_port_info[2]['com']) > 0
|
||
or len(self.np_port_info[3]['com']) > 0
|
||
or len(self.list_path[1]) > 0):
|
||
slave_take_effect = 1
|
||
if (len(self.np_port_info[2]['com']) == 0
|
||
or len(self.np_port_info[3]['com']) == 0
|
||
or len(self.list_path[1]) == 0):
|
||
print("从模式配置无效,需要配置从模式2个串口和一个路径")
|
||
self.msg_box_show(g_msg_warning, '需要配置从模式的\n'
|
||
'2个串口和固件路径')
|
||
list_cfg_valid[1] = 0
|
||
else:
|
||
list_cfg_valid[1] = 1
|
||
else:
|
||
list_cfg_valid[1] = 0
|
||
#如果配置无效,则保持配置界面处于最上层
|
||
if (list_cfg_valid[0] == 1
|
||
and list_cfg_valid[1] == slave_take_effect):
|
||
#检查串口是否冲突
|
||
cycle_index = 4 if list_cfg_valid[1] == 1 else 2
|
||
for i in range(0, int(cycle_index / 2)):
|
||
for j in range(0, cycle_index):
|
||
if self.np_port_info[i]['com'] == self.np_port_info[j]['com']\
|
||
and i != j:
|
||
print("串口冲突")
|
||
self.msg_box_show(g_msg_warning, "串口冲突")
|
||
return
|
||
#检查路径是否存在,防止手动输入错误路径
|
||
if (os.path.exists(self.list_path[0]) == False
|
||
or os.path.exists(self.list_path[1]) != slave_take_effect):
|
||
print("文件夹路径不存在")
|
||
self.msg_box_show(g_msg_warning, "文件夹不存在")
|
||
return
|
||
self.ui_cfg.close()
|
||
|
||
def pbt_handle_cfg_reject(self):
|
||
print("配置界面 取消 按钮被按下")
|
||
self.ui_cfg.close()
|
||
|
||
def pbt_handle_cfg_file_select(self, pbt_num):
|
||
print("配置界面 路径选择按钮%d 被按下" % pbt_num)
|
||
path_temp = str(QFileDialog.getExistingDirectory(self,
|
||
'请选择固件所在目录', './', QFileDialog.ShowDirsOnly))
|
||
print("获取到文件夹路径:", path_temp)
|
||
if (os.path.exists(path_temp) == False):
|
||
print("文件夹路径不存在")
|
||
path_temp = ''
|
||
if (len(path_temp) > 0):
|
||
self.list_path[pbt_num] = path_temp
|
||
if (pbt_num == 0):
|
||
self.cfg.edit_fw_path_0.setText(path_temp)
|
||
else:
|
||
self.cfg.edit_fw_path_1.setText(path_temp)
|
||
|
||
def pbt_handle_test_act(self):
|
||
list_failed_dtest = []
|
||
list_succeed_dtest = []
|
||
pbt_text = self.pbt_start_test.text()
|
||
print("%s 按钮被按下" % pbt_text)
|
||
if pbt_text == "开始测试":
|
||
if (self.test_start_file_check() < 0) or (self.test_start_port_check() < 0):
|
||
if self.auto_test_mode:
|
||
exit(1)
|
||
return
|
||
#由于工期紧张,从模块的连接和下载由多线程同时工作更改为顺序执行,下述循环:
|
||
#从模式连接->从模式下载->主模式连接->主模式下载->发送case组合->打印串口
|
||
#【注意】:该流程及后续的流程中存在较多常量下标,以后拓展时需要注意
|
||
#【注意】:由于现有流程在单线程执行且部分流程为死循环,这会导致界面卡住
|
||
self.fsm['state'] = g_fsm_free
|
||
self.fsm['dtest'] = -1
|
||
self.fsm['cycle'] = 0
|
||
self.fsm['reboot'] = 0
|
||
#使用timer+状态机的方式
|
||
self.timer.start(1000)
|
||
self.pbt_start_test.setText("停止测试")
|
||
self.pbt_cfg.setEnabled(False)
|
||
self.edit_cycle_index.setReadOnly(True)
|
||
# 删除result文件
|
||
if os.path.exists(self.path_result_file):
|
||
os.remove(self.path_result_file)
|
||
# 在result文件中写入当前测试固件所在文件夹
|
||
dir_dtest_history = os.path.join(os.getcwd(), "history")
|
||
dir_dtest_list = os.listdir(dir_dtest_history)
|
||
dir_dtest = dir_dtest_list[len(dir_dtest_list) - 1]
|
||
self.test_save_dtest_result("当前测试固件所在目录",
|
||
"file://TESTER-IP-11/Users/jenkins/AppData/Local/Jenkins/.jenkins/workspace/kl3_dtest_test/scripts/history/" + dir_dtest)
|
||
else:
|
||
self.test_stop_handle()
|
||
|
||
def test_fsm_handle(self):
|
||
# print("定时器超时信号触发")
|
||
if self.fsm['state'] == g_fsm_free:
|
||
#获取下一个将要执行的dtest
|
||
overflow = 0
|
||
dtest_n = self.fsm['dtest']
|
||
if not self.fsm['reboot']:
|
||
if self.fsm['dtest'] + 1 >= len(self.list_dtest_info):#一轮执行完毕
|
||
overflow = 1
|
||
for i in range(dtest_n + 1, len(self.list_dtest_info)):
|
||
if self.list_dtest_info[i][0][g_idx_eb] == "true":
|
||
dtest_n = i
|
||
break
|
||
if i == len(self.list_dtest_info) - 1:
|
||
overflow = 1
|
||
if overflow:
|
||
self.fsm['cycle'] += 1
|
||
cycle_curr = self.fsm['cycle']
|
||
cycle_total = int(self.edit_cycle_index.text())
|
||
print("当前循环次数:%d,总循环次数:%d" % (cycle_curr, cycle_total))
|
||
if (cycle_total <= 0) or (cycle_curr < cycle_total): #下一轮
|
||
self.fsm['dtest'] = -1
|
||
self.fsm['state'] = g_fsm_free
|
||
return
|
||
else: #所有测试结束
|
||
print("所有测试执行完毕")
|
||
self.test_stop_handle()
|
||
if self.auto_test_mode:
|
||
exit(0)
|
||
return
|
||
else:
|
||
self.fsm['state'] = g_fsm_download
|
||
self.fsm['dtest'] = dtest_n
|
||
print("下一个将要执行的dtest编号:%d" % dtest_n)
|
||
elif self.fsm['state'] == g_fsm_download:
|
||
failed_flag = 0
|
||
failed_str = ["从模块连接", "从模块下载", "主模块连接", "主模块下载"]
|
||
#这里部分操作为阻塞模式,因此要先关掉timer
|
||
self.timer.stop()
|
||
dtest_index = self.fsm['dtest']
|
||
name_file = self.list_dtest_info[dtest_index][0][g_idx_name]
|
||
if name_file.rfind(".bin") < 0:
|
||
name_file += ".bin"
|
||
path = self.list_path[0] + "\\" + name_file
|
||
print("开始测试第%d个dtest(%s),路径:%s" % (dtest_index, name_file, path))
|
||
if self.auto_test_mode:
|
||
g_list_queue[2].put_nowait(name_file)
|
||
if self.list_dtest_info[dtest_index][0][g_idx_slave] == "true":
|
||
name_file_slave = name_file.replace('.', '_slave.')
|
||
path_slave = self.list_path[1] + '\\' + name_file_slave
|
||
print("从模式使能,bin文件:", path_slave)
|
||
if self.test_start_connect_module(self.np_port_info[2]['fd'], \
|
||
self.np_port_info[3]['fd']):
|
||
failed_flag = 1
|
||
if failed_flag == 0 and \
|
||
self.test_start_xmodem_transf_1(self.np_port_info[2]['fd'], \
|
||
path_slave) < 0:
|
||
failed_flag = 2
|
||
if failed_flag == 0 and \
|
||
self.test_start_connect_module(self.np_port_info[0]['fd'], \
|
||
self.np_port_info[1]['fd']):
|
||
failed_flag = 3
|
||
if failed_flag == 0 and \
|
||
self.test_start_xmodem_transf_0(self.np_port_info[0]['fd'], path) < 0:
|
||
failed_flag = 4
|
||
if failed_flag:
|
||
print("%s执行失败,原因:%s 失败" % (name_file, failed_str[failed_flag - 1]))
|
||
self.fsm['state'] = g_fsm_free
|
||
self.test_save_dtest_result(name_file.replace(".bin", ""), "FAILED")
|
||
self.timer.start(100) # start next test
|
||
else:
|
||
self.fsm['state'] = g_fsm_running
|
||
self.fsm_timestamp_download_complete = datetime.datetime.now()
|
||
self.fsm_dtest_running_flag = 0
|
||
self.timer.start(200)
|
||
elif self.fsm['state'] == g_fsm_running:
|
||
complete_flag = 0
|
||
#【TODO】:这里计划增加一个更改串口波特率的流程
|
||
# 10s没有收到数据,则认为dtest执行失败了
|
||
name_dtest = self.list_dtest_info[self.fsm['dtest']][0][g_idx_name]
|
||
if not self.fsm_dtest_running_flag:
|
||
if ((datetime.datetime.now() - self.fsm_timestamp_download_complete).seconds > 10):
|
||
print("未收到dtest发送的START字段,也许dtest执行失败了")
|
||
self.test_save_dtest_result(name_dtest, "FAILED")
|
||
self.fsm['state'] = g_fsm_free
|
||
self.timer.start(1000)
|
||
#串口接收并打印
|
||
# read_bytes = b"this is test"
|
||
# read_bytes = self.np_port_info[0]['fd'].readall()
|
||
ready_num = self.np_port_info[0]['fd'].inWaiting()
|
||
if ready_num == 0:
|
||
self.timer.start(100)
|
||
return
|
||
if not self.fsm_dtest_running_flag:
|
||
if ready_num < len(g_module_log_key_start):
|
||
print("串口数据长度不够,收到的数据长度:", ready_num)
|
||
self.timer.start(100)
|
||
return
|
||
read_bytes = self.np_port_info[0]['fd'].read(ready_num)
|
||
string = read_bytes.decode('utf-8') #这里已知串口数据均为字符
|
||
|
||
#字符串规则匹配,如需处理原始串口数据,则额外提供函数(放到规则匹配后面)
|
||
#串口收到的字符串可能发送粘连,因此以换行符为间隔进行匹配
|
||
list_str = string.splitlines()
|
||
for i in range(0, len(list_str)):
|
||
if len(list_str[i]) == 0:
|
||
continue
|
||
string_temp = str(list_str[i])
|
||
color = QColor("#000000") #黑色
|
||
for i in range(0, len(self.list_rule_info)):
|
||
if self.list_rule_info[i][g_idx_rule_eb] == "false":
|
||
continue
|
||
if string_temp.find(self.list_rule_info[i][g_idx_rule_key]) >= 0:
|
||
item_cnt = self.widget_rule.item(i, 1)
|
||
keyword_cnt = int(item_cnt.text()) + 1
|
||
item_cnt.setText(str(keyword_cnt))
|
||
color = QColor(self.list_rule_info[i][g_idx_rule_color])
|
||
#[TODO]:颜色更改显示字符串
|
||
self.edit_terminal.setTextColor(color)
|
||
self.edit_terminal.append(string_temp)
|
||
# self.edit_terminal.moveCursor(QTextCursor.End)
|
||
# print("当前字符串颜色:%s,内容:%s" % (color.name(), string_temp))
|
||
#发送消息到写文件的进程
|
||
g_list_queue[1].put_nowait(string)
|
||
|
||
#预定义的关键字处理
|
||
if string.find(g_module_log_key_result) >= 0:
|
||
if string.find("fail") > 0 or string.find("FAIL") > 0:
|
||
result = "FAILED"
|
||
else:
|
||
result = "SUCCEED"
|
||
print(name_dtest, " 执行完成,结果:", result)
|
||
self.test_save_dtest_result(name_dtest, result)
|
||
complete_flag = 1
|
||
elif string.find(g_module_log_key_reboot) >= 0:
|
||
print("dtest需要重启")
|
||
self.fsm['reboot'] = 1
|
||
complete_flag = 1
|
||
elif string.find(g_module_log_key_start) >= 0 and \
|
||
not self.fsm_dtest_running_flag:
|
||
print("received start keyword, send dtest case group")
|
||
self.fsm_dtest_running_flag = 1
|
||
#发送case组合
|
||
time.sleep(0.8)
|
||
self.test_start_send_case_group(self.fsm['dtest'], self.np_port_info[0]['fd'])
|
||
if complete_flag:
|
||
self.timer.start(1000)
|
||
self.fsm['state'] = g_fsm_free
|
||
self.timer.start(100)
|
||
|
||
def test_start_send_case_group(self, dtest_index, port):
|
||
group = 0
|
||
for i in range(1, len(self.list_dtest_info[dtest_index])):
|
||
if self.list_dtest_info[dtest_index][i][g_idx_eb_c] == "true":
|
||
group |= (1 << (i - 1))
|
||
cmd = ("[CONFIG] - 0x" + str("{:08X}".format(group))).encode(encoding='utf-8')
|
||
print("%s case组合:%d,发送到dtest的指令:%s" % \
|
||
(self.list_dtest_info[dtest_index][0][g_idx_name], group, cmd))
|
||
port.write(cmd)
|
||
return 0
|
||
|
||
def test_start_xmodem_transf_0(self, port, path_bin):
|
||
print("开始xmodem下载固件: ", path_bin)
|
||
x_modem = xmodem.XMODEM(xmodem_transf_0_getc,
|
||
xmodem_transf_0_putc, mode='xmodem1k')
|
||
time_start = datetime.datetime.now()
|
||
try:
|
||
fd_bin = open(path_bin, 'rb')
|
||
except exception as e:
|
||
msg = "文件:" + path_bin + " 打开失败"
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return -1
|
||
send_flag = x_modem.send(fd_bin, callback=xmodem_transf_0_callback)
|
||
time_end = datetime.datetime.now()
|
||
time_delta = (time_end - time_start).seconds
|
||
print("固件(%s)发送完成,用时:%ss" % (path_bin, time_delta))
|
||
return 0
|
||
|
||
def test_start_xmodem_transf_1(self, port, path_bin):
|
||
self.fsm['state'] = g_fsm_download
|
||
print("开始xmodem下载固件: ", path_bin)
|
||
x_modem = xmodem.XMODEM(xmodem_transf_1_getc,
|
||
xmodem_transf_1_putc, mode='xmodem1k')
|
||
time_start = datetime.datetime.now()
|
||
try:
|
||
fd_bin = open(path_bin, 'rb')
|
||
except exception as e:
|
||
msg = "文件:" + path_bin + " 打开失败"
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return -1
|
||
send_flag = x_modem.send(fd_bin, callback=xmodem_transf_1_callback)
|
||
time_end = datetime.datetime.now()
|
||
time_delta = (time_end - time_start).seconds
|
||
print("固件(%s)发送完成,用时:%ss" % (path_bin, time_delta))
|
||
return 0
|
||
|
||
def test_start_connect_module(self, port, port_relay):
|
||
print("尝试连接模块")
|
||
str_recv = b''
|
||
time_start = datetime.datetime.now()
|
||
self.fsm['state'] = g_fsm_connect
|
||
while 1:
|
||
#【注意】:这里并没有判断继电器的返回值
|
||
port_relay.write(g_module_relay_off_key)
|
||
time.sleep(0.5)
|
||
port_relay.write(g_module_relay_on_key)
|
||
time.sleep(0.2)
|
||
for i in range(0, 5):
|
||
port.write(g_module_connect_key)
|
||
time.sleep(0.1)
|
||
# temp = port.readall()
|
||
ready_num = self.np_port_info[0]['fd'].inWaiting()
|
||
if ready_num == 0:
|
||
continue
|
||
temp = self.np_port_info[0]['fd'].read(ready_num)
|
||
if len(temp):
|
||
print("port rec len: %d, data:%s" % (len(temp), temp))
|
||
str_recv += temp
|
||
if str_recv.find(g_module_connected_key) >= 0:
|
||
print("模块连接成功")
|
||
return 0
|
||
time_now = datetime.datetime.now()
|
||
time_delta = (time_now - time_start).seconds
|
||
if (time_delta > 120):
|
||
print("模块连接超时:%s -> %s" % (time_start, time_now))
|
||
return -1
|
||
|
||
def test_start_port_check(self):
|
||
list_slave = []
|
||
for i in range(0, len(self.list_dtest_info)):
|
||
if self.list_dtest_info[i][0][g_idx_eb] == "true":
|
||
list_slave.append(self.list_dtest_info[i][0][g_idx_slave])
|
||
print("从模式使能情况:%s" % list_slave)
|
||
for i in range(0, len(self.np_port_info)):
|
||
if (i > 1):
|
||
if list_slave.count("true") > 0:
|
||
if self.np_port_info[i]['com'] == '' \
|
||
or self.np_port_info[i + 1]['com'] == '':
|
||
msg = "使能了从模式,但是未配置从模式串口"
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return -1
|
||
else:
|
||
continue
|
||
port = self.np_port_info[i]['com']
|
||
baud = self.np_port_info[i]['baud']
|
||
data = self.np_port_info[i]['data']
|
||
parity = self.np_port_info[i]['parity'][0] #取第一个字符
|
||
stop = self.np_port_info[i]['stop']
|
||
print("尝试打开串口,com号(%s),波特率(%d),数据位(%d),校验位(%s),"
|
||
"停止位(%f)" % (port, baud, data, parity, stop))
|
||
try:
|
||
self.np_port_info[i]['fd'] = serial.Serial(port = port,
|
||
baudrate = baud, bytesize = data, parity = parity,
|
||
stopbits = stop, timeout = 0.3)
|
||
print("%s 串口打开成功" % port)
|
||
g_port_xmodex.append(self.np_port_info[i]['fd'])
|
||
except Exception as e:
|
||
msg = port + "串口打开失败,原因:" + str(e)
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return -1
|
||
return 0
|
||
|
||
def test_start_file_check(self):
|
||
#判断dtest文件是否存在,不存在则enable标签置位false,且dtest界面标红
|
||
file_lack = []
|
||
for i in range(0, len(self.list_dtest_info)):
|
||
if self.list_dtest_info[i][0][g_idx_eb] == "false":
|
||
continue
|
||
item = self.widget_dtest.topLevelItem(i)
|
||
name_file = self.list_dtest_info[i][0][g_idx_name]
|
||
if name_file.rfind('.bin') < 0:
|
||
name_file += '.bin'
|
||
# if str_file_name.find(name_file) < 0:
|
||
path = self.list_path[0] + '\\' + name_file
|
||
print("当前检查的文件名为:%s, 路径为:%s" % (name_file, path))
|
||
if not os.access(path, os.F_OK | os.R_OK):
|
||
self.list_dtest_info[i][0][g_idx_eb] = "false"
|
||
item.setCheckState(0, 0)
|
||
item.setForeground(0, QtCore.Qt.red)
|
||
item.setForeground(1, QtCore.Qt.red)
|
||
file_lack.append(name_file)
|
||
else:
|
||
if self.list_dtest_info[i][0][g_idx_slave] == "true" \
|
||
and self.list_dtest_info[i][0][g_idx_eb] == "true":
|
||
if len(self.list_path[1]) == 0:
|
||
msg = "使能了从模式,但是未配置从固件文件夹路径"
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return -1
|
||
name_file_slave = name_file.replace('.', '_slave.')
|
||
path_slave = self.list_path[1] + '\\' + name_file_slave
|
||
print("slave模式使能,slave文件名:%s, 路径:%s" %
|
||
(name_file_slave, path_slave))
|
||
# if str_file_name.find(name_file_slave) < 0:
|
||
if not os.access(path_slave, os.F_OK | os.R_OK):
|
||
self.list_dtest_info[i][0][g_idx_eb] = "false"
|
||
item.setCheckState(0, 0)
|
||
item.setForeground(0, QtCore.Qt.red)
|
||
item.setForeground(1, QtCore.Qt.red)
|
||
file_lack.append(name_file_slave)
|
||
else:
|
||
item.setForeground(0, QtCore.Qt.black)
|
||
item.setForeground(1, QtCore.Qt.black)
|
||
if len(file_lack) > 0:
|
||
msg = "缺少以下固件:" + str(file_lack)
|
||
print(msg)
|
||
self.msg_box_show(g_msg_warning, msg)
|
||
return -1
|
||
return 0
|
||
|
||
def test_stop_handle(self):
|
||
print("停止测试,关闭串口和文件,释放变量")
|
||
self.timer.stop()
|
||
self.pbt_cfg.setEnabled(True)
|
||
self.edit_cycle_index.setReadOnly(False)
|
||
g_port_xmodex.clear()
|
||
self.fsm['state'] = g_fsm_free
|
||
self.fsm['dtest'] = -1
|
||
self.fsm['cycle'] = 0
|
||
self.fsm['reboot'] = 0
|
||
for i in range(0, len(self.np_port_info)):
|
||
if self.np_port_info[i]['fd']:
|
||
self.np_port_info[i]['fd'].close()
|
||
self.pbt_start_test.setText("开始测试")
|
||
return
|
||
|
||
def test_save_dtest_result(self, dtest, result):
|
||
path_result_file = self.path_result_file
|
||
try:
|
||
fd_result = open(path_result_file, "a")
|
||
except exception as e:
|
||
print(path_result_file, "文件打开失败, 原因:", e)
|
||
fd_result.write(dtest + " --> " + result + "\n")
|
||
fd_result.close()
|
||
|
||
def mouse_handle_widget_right_clicked(self, pos, src):
|
||
print("%s右键单击被触发, 显示右键菜单" % ("dtest" if src == 0 else "case"))
|
||
menu = QMenu(self.widget_dtest)
|
||
menu.addAction(self.act_sel_all)
|
||
menu.addAction(self.act_sel_none)
|
||
menu.exec(pos)
|
||
|
||
def tree_widget_item_changed_handle(self, item, column):
|
||
check_state = 'true' if item.checkState(0) else 'false'
|
||
print("复选框状态:%s" % check_state)
|
||
#更新dtest_info列表enable选项
|
||
if item.treeWidget() is self.widget_dtest:
|
||
index = int(item.text(0))
|
||
self.list_dtest_info[index][0][g_idx_eb] = check_state
|
||
else:
|
||
if str(item.parent()) != 'None':
|
||
index = int(item.parent().text(0))
|
||
index_case = int(item.text(0))
|
||
self.list_dtest_info[index][index_case + 1][g_idx_eb_c] = check_state
|
||
else:
|
||
index = int(item.text(0))
|
||
for i in range(1, len(self.list_dtest_info[index])):
|
||
self.list_dtest_info[index][i][g_idx_eb_c] = check_state
|
||
#更新界面
|
||
for i in range(0, item.childCount()):
|
||
item_child = item.child(i)
|
||
item_child.setCheckState(0, item.checkState(0))
|
||
print("修改后的dtest list", self.list_dtest_info)
|
||
|
||
def table_widget_cell_changed(self, row, column):
|
||
item = self.widget_rule.item(row, column)
|
||
if column == 1: #计数列的更改不处理
|
||
return
|
||
print("表格数据变化,坐标(%d, %d)" % (row, column))
|
||
if column == 0: #复选框
|
||
check_state = 'true' if item.checkState() else 'false'
|
||
print("复选框状态:", check_state)
|
||
self.list_rule_info[row][g_idx_rule_eb] = check_state
|
||
elif column == 2: #颜色更改
|
||
pass
|
||
elif column == 3: #关键字更改
|
||
keyword = item.text()
|
||
print("关键字更改为:", keyword)
|
||
self.list_rule_info[row][g_idx_rule_key] = keyword
|
||
print("修改之后的rule info:", self.list_rule_info)
|
||
|
||
def table_widget_cell_clicked(self, row, column):
|
||
if column != 2: #只响应颜色按钮
|
||
return
|
||
print("表格颜色列表被单击,坐标(%d, %d)" % (row, column))
|
||
color = QColorDialog.getColor()
|
||
if not color.isValid():
|
||
return
|
||
color_str = str(color.name())
|
||
print("选中的颜色:%s" % color_str)
|
||
self.list_rule_info[row][g_idx_rule_color] = color_str
|
||
item = self.widget_rule.item(row, column)
|
||
item.setBackground(color)
|
||
|
||
def act_handle_debug_interface_config(self, act):
|
||
print("显示0/关闭1调试窗口 动作(%d)被按下" % act)
|
||
if act == 0:
|
||
self.widget_debug.setVisible(False)
|
||
else:
|
||
self.widget_debug.setVisible(True)
|
||
|
||
def act_handle_option_select(self, src, act):
|
||
print("%s 右键菜单 %s 动作被触发" %
|
||
("dtest" if src is self.widget_dtest else "case",
|
||
"全选" if act == 2 else "全取消"))
|
||
#更新dtest_info列表enable选项
|
||
str_eb = "true" if act == 2 else "false"
|
||
if src is self.widget_dtest:
|
||
widget_temp = self.widget_dtest
|
||
for i in range(0, len(self.list_dtest_info)):
|
||
self.list_dtest_info[i][0][g_idx_eb] = str_eb
|
||
else:
|
||
widget_temp = self.widget_case
|
||
for i in range(0, len(self.list_dtest_info)):
|
||
for j in range(1, len(self.list_dtest_info[i])):
|
||
self.list_dtest_info[i][j][g_idx_eb_c] = str_eb
|
||
#更新界面
|
||
for i in range(0, widget_temp.topLevelItemCount()):
|
||
item = widget_temp.topLevelItem(i)
|
||
item.setCheckState(0, act)
|
||
for i in range(0, item.childCount()):
|
||
item_child = item.child(i)
|
||
item_child.setCheckState(0, item.checkState(0))
|
||
print("修改后的dtest list", self.list_dtest_info)
|
||
|
||
def act_handle_xml_file_open_save(self, act):
|
||
print("%s 动作被触发" %
|
||
("导入" if act == 0 else ("保存" if act == 1 else "另存")))
|
||
if act == 0: #导入文件
|
||
path_file, _ = QFileDialog.getOpenFileName(self,
|
||
"选择要导入的xml文件", "./", 'xml文件 (*.xml)') #返回一个元组
|
||
if len(path_file) == 0:
|
||
print("选中的文件路径无效: ", path_file)
|
||
return
|
||
print("选中的文件: ", path_file)
|
||
self.xml_file_load(path_file)
|
||
elif act == 2: #另存文件
|
||
path_file, _ = QFileDialog.getSaveFileName(self,
|
||
"另存为", "./", "xml文件 (*.xml)")
|
||
if len(path_file) == 0:
|
||
print("选中的文件路径无效: ", path_file)
|
||
return
|
||
self.xml_file_save(path_file)
|
||
else: #保存文件
|
||
self.xml_file_save(self.path_xml_file)
|
||
|
||
def write(self, str):
|
||
if str == ' ' or str == '\n':
|
||
return
|
||
self.edit_debug.append(str)
|
||
g_list_queue[0].put_nowait(str)
|
||
|
||
def msg_box_show(self, type, msg):
|
||
# 自动化测试时不弹窗
|
||
if self.auto_test_mode:
|
||
return
|
||
if type == g_msg_warning:
|
||
QMessageBox.warning(self, '警告', msg, QMessageBox.Yes)
|
||
else:
|
||
return
|
||
|
||
def main():
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
my_app = kl3_test_app(argv=sys.argv)
|
||
my_app.show()
|
||
exit_flag = app.exec_()
|
||
print(sys.argv[0], "exit flag: ", exit_flag)
|
||
sys.exit(exit_flag)
|
||
|
||
if __name__ == '__main__':
|
||
freeze_support() #解决打包exe后出现的循环启动
|
||
main() |