Files
checker_slave/python/updata.py

508 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import threading
import time
import portalocker
import serial
import serial.tools.list_ports
import prottcp
STR_RED="\033[1;31m"
STR_BLUE="\033[1;34m"
STR_END="\033[0m"
class progress_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 15))
self.lable.setText(text)
self.p=QProgressBar(father)
self.p.setValue(0)
self.p.setGeometry(QRect(40,x+20,450,20))
def set_rate(self,rate:int):
# print(text)
self.p.setValue(rate)
class data_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,title:str,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 20))
self.lable.setText(title)
self.text = QLabel(father)
self.text.setObjectName(u"text")
self.text.setGeometry(QRect(40,x+20,450,30))
self.text.setText(text)
class updata_dlg(QObject):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
failed_signal = pyqtSignal()
def __init__(self):
QObject.__init__(self)
self.app = QApplication(sys.argv)
self.widget = QWidget()
self.widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.widget.resize(703, 409)
self.widget.setWindowTitle("赋码仪程序升级")
self.file_list_init()
self.com_but_init()
self.save_but_init()
self.updatas_but_init()
# self.cmd_but_init()
self.sstate_but_init()
self.scheme_but_init()
self.com_init()
self.combsp_init()
self.widget.destroyed.connect(self.quit)
self.failed_signal.connect(self.updata_failed)
self.cmd=0
self.port_is_open=False
def quit(self):
self.close_port()
# 程序退出
qApp.exit(1)
# 初始化文件列表
def file_list_init(self):
self.file_list = QListWidget(self.widget)
self.file_list.setObjectName(u"file_list")
self.file_list.setGeometry(QRect(25, 60, 531, 310))
self.file_list.setFrameShape(QFrame.Shape.Box)
self.file_list.setMidLineWidth(1)
self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.file_list.itemDoubleClicked.connect(self.item_clicked)
# 初始化打开端口按钮
def com_but_init(self):
self.com_but = QPushButton(self.widget)
self.com_but.setObjectName(u"com_but")
self.com_but.setGeometry(QRect(590, 10, 93, 28))
self.com_but.setText("打开端口")
self.com_but.clicked.connect(self.com_but_clicked)
# 初始化发送文件按钮
def save_but_init(self):
self.save_but = QPushButton(self.widget)
self.save_but.setObjectName(u"save_but")
self.save_but.setGeometry(QRect(590, 60, 93, 28))
self.save_but.setText("发送文件")
self.save_but.clicked.connect(self.save_but_clicked)
# 初始化发送命令按钮
def cmd_but_init(self):
self.cmd_but = QPushButton(self.widget)
self.cmd_but.setObjectName(u"save_but")
self.cmd_but.setGeometry(QRect(590, 100, 93, 28))
self.cmd_but.setText("升级MCU")
self.cmd_but.clicked.connect(self.cmd_but_clicked)
# 初始化升级小板按钮
def updatas_but_init(self):
self.updatas_but = QPushButton(self.widget)
self.updatas_but.setObjectName(u"updatas_but")
self.updatas_but.setGeometry(QRect(590, 100, 93, 28))
self.updatas_but.setText("升级小板")
self.updatas_but.clicked.connect(self.updatas_but_clicked)
# 初始化在线状态按钮
def sstate_but_init(self):
self.sstate_but = QPushButton(self.widget)
self.sstate_but.setObjectName(u"sstate_but")
self.sstate_but.setGeometry(QRect(590, 140, 93, 28))
self.sstate_but.setText("主板参数")
self.sstate_but.clicked.connect(self.sstate_but_clicked)
# 初始化方案状态按钮
def scheme_but_init(self):
self.sstate_but = QPushButton(self.widget)
self.sstate_but.setObjectName(u"sstate_but")
self.sstate_but.setGeometry(QRect(590, 180, 93, 28))
self.sstate_but.setText("方案参数")
self.sstate_but.clicked.connect(self.scheme_but_clicked)
# com口
def com_init(self):
self.com = QComboBox(self.widget)
self.com.setObjectName(u"com")
self.com.setGeometry(QRect(85, 10, 300, 25))
self.com.setEditable(True)
self.com.currentIndexChanged.connect(self.com_changed)
self.com.addItem("utcp:7777")
ports_list = list(serial.tools.list_ports.comports())
for comport in ports_list:
# print(comport.name,comport.description)
self.com.addItem(comport.name+":"+comport.description)
self.com_label = QLabel(self.widget)
self.com_label.setObjectName(u"label")
self.com_label.setGeometry(QRect(30, 16, 72, 15))
self.com_label.setText("COM口:")
# 选择波特率
def combsp_init(self):
self.combsp = QComboBox(self.widget)
self.combsp.setObjectName(u"combsp")
self.combsp.setGeometry(QRect(470, 10, 80, 25))
self.combsp.setEditable(True)
self.combsp.addItem("115200")
self.combsp.addItem("9600")
self.combsp_label = QLabel(self.widget)
self.combsp_label.setObjectName(u"label")
self.combsp_label.setGeometry(QRect(410, 16, 72, 15))
self.combsp_label.setText("波特率:")
# 显示消息框
def show_msg(self,msg:str):
m=QMessageBox(self.widget)
m.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
m.setText(msg)
m.setWindowTitle("提示")
m.show()
# 找到已选择的文件
def get_selected_file_by_type(self,type:str):
file_list=[]
items=self.file_list.selectedItems()
for i in items:
if(i.text()[-len(type):]==type):
file_list.append(i.text())
if(len(file_list)!=1):
self.show_msg("请选择一个并且只选择一个 "+type+" 文件")
return ""
return file_list[0]
# 获取已选择的文件列表
def get_selected_fils(self):
file_list=[]
items=self.file_list.selectedItems()
if(len(items)==0):
self.show_msg("请选择至少一个文件")
return []
have_elf=False
have_bin=False
have_lua=False
for i in items:
if(i.text()[-4:]==".elf"):
if(have_elf==True):
self.show_msg("只可选择一个 .elf 文件")
return []
have_elf=True
if(i.text()[-4:]==".bin"):
if(have_bin==True):
self.show_msg("只可选择一个 .bin 文件")
return []
have_bin=True
if(i.text()[-4:]==".lua"):
if(have_lua==True):
self.show_msg("只可选择一个 .lua 文件")
return []
have_lua=True
file_list.append(i.text())
return file_list
def item_clicked(self,item:QListWidgetItem ):
print("item clicked.")
print("slected item is",item.text())
def com_but_clicked(self):
print("com but clicked")
if(self.port_is_open==False):
self.open_port()
else:
self.close_port()
def com_changed(self,index:int):
print("com changed")
self.close_port()
def get_cmd(self,file:str):
l=[(".bin",0xee),(".pkt",0xed),(".json",0x32)]
for i in l:
if(file.endswith(i[0])):
return i[1]
return 0
# 发送文件按钮按下
def save_but_clicked(self):
print("save_but_clicked.")
path = self.getpath()+"file\\"
file_list=self.get_selected_fils()
if(len(file_list)==0):
return
self.cmd=self.get_cmd(file_list[0])
print("cmd=",self.cmd)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("上传文件")
w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.handle_=prottcp.handle()
if(file_list[0].endswith(".json")):
self.handle_.set_json("file/"+file_list[0])
else:
self.handle_.set_file("file/"+file_list[0])
d=self.handle_.get_data()
try:
self.port.send(self.cmd,d)
except Exception as e:
print("com not open")
self.show_msg("端口未打开")
del self.handle_
w.close()
return
self.creat_progress(w)
w.show()
# self.failed_signal.connect(w.close)
# w.destroyed.connect(self.close_port)
def updata_failed(self):
self.show_msg("打开端口失败")
# 创建进度条
def creat_progress(self,father:QDialog):
self.probar_list=[]
for i in range(1):
p=progress_box()
p.init(father,"发送文件",+i*40)
self.rate_signal.connect(p.set_rate)
self.probar_list.append(p)
# 发送命令按钮按下
def cmd_but_clicked(self):
print("cmd_but clicked.")
file=self.get_selected_file_by_type(".bin")
if(len(file)==0):
return
print("file:",file)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("升级mcu")
self.updata_mcu(file)
self.creat_progress(w)
w.show()
def sstate_but_clicked(self):
print("sstate_but clicked.")
try:
self.port.send_str("sysinfo")
except Exception as e:
print("com not open")
print(str(e))
def scheme_but_clicked(self):
print("scheme_but clicked.")
try:
self.port.send_str("scheme")
except Exception as e:
print("com not open")
print(str(e))
def updatas_but_clicked(self):
print("updatas_but clicked.")
try:
self.port.send_str("updatas 1,2,3,4,5,6,7,8,9,10")
self.show_msg("已发送升级指令,请留意小板升级情况")
except Exception as e:
self.show_msg("命令发送失败")
print("com not open")
print(str(e))
# 开始运行
def run(self):
self.widget.show()
sys.exit(self.app.exec())
def set_port_state(self,state:bool):
self.port_is_open=state
if(state==True):
self.com_but.setText("关闭端口")
else:
self.com_but.setText("打开端口")
# 扫描文件
def scan_file(self):
self.file_list.addItems(self.find_type([".bin",".json",".pkt"]))
# 扫描指定类型的文件
def find_type(self,types:list):
path = self.getpath()+"file\\"
if not os.path.exists(path):
os.makedirs(path)
list=os.listdir(path)
file_list=[]
for t in types:
for i in list:
if(len(t)>0):
if(i[-len(t):]==t):
file_list.append(i)
else:
file_list.append(i)
return file_list
# 获得文件绝对路径
def getpath(self):
path=os.path.abspath(sys.argv[0])
list_str=path.split("\\")
return path.replace(list_str[-1],"")
# 调用此函数打开通信
def open_port(self):
t = threading.Thread(target=self.com_thread, args=())
t.start()
def com_thread(self):
self.port=prottcp.protu()
item=self.com.itemText(self.com.currentIndex())
bsp=self.combsp.itemText(self.combsp.currentIndex())
com=item.split(":")[0]
if(com!="utcp"):
item=com+":"+bsp
print("item text:",item)
if(self.port.init(item)==False):
print("init port failed.")
self.failed_signal.emit()
self.set_port_state(False)
return
else:
print("init port success.")
self.set_port_state(True)
self.port.recv_signal.connect(self.recv_slot)
self.port.recv_str_signal.connect(self.recv_str_slot)
self.port.start_recv()
self.port.wait()
def close_port(self):
print("close port")
self.set_port_state(False)
try:
self.port.close()
self.port.recv_signal.disconnect(self.recv_slot)
self.port.recv_str_signal.disconnect(self.recv_str_slot)
except Exception as e:
pass
def recv_str_slot(self,cmd:int,txt:str,err:str):
print("|-|",txt)
def recv_slot(self,cmd:int,data:bytearray,err:str):
# print("recv:",cmd,data)
if(self.cmd!=cmd):
return
try:
data=self.handle_.get_data()
if(len(data)>0):
self.port.send(cmd,data)
rate=self.handle_.get_rate()
self.rate_signal.emit(rate)
else:
del self.handle_
except Exception as e:
print(str(e))
# 开始升级mcu
def updata_mcu(self,file):
pass
# 小板通信测试
def comm_test(self):
pass
def end_slot(self,ip,ack,err):
# print(ip,ack,err)
if(ack==False):
self.show_msg(ip+":"+err)
def rate_slot(self,rate):
# print("rate signal:",ip,rate)
self.rate_signal.emit(rate)
def data_slot(self,ip,text):
pass
# 创建数据显示
def creat_databoxs(self,father:QDialog,data_list:list):
self.datab_list=[]
for i in range(len(data_list)):
p=data_box()
p.init(father,data_list[i][0],data_list[i][1],+i*50)
self.datab_list.append(p)
class locker():
def _get_lock(self):
file_name = os.path.basename(__file__)
# linux等平台依然使用标准的/var/run其他nt等平台使用当前目录
if os.name == "posix":
lock_file_name = f"/var/run/{file_name}.pid"
else:
lock_file_name = f"{file_name}.pid"
self.fd = open(lock_file_name, "w")
try:
portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB)
# 将当前进程号写入文件
# 如果获取不到锁上一步就已经异常了,所以不用担心覆盖
self.fd.writelines(str(os.getpid()))
# 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件
self.fd.flush()
except:
print(f"{file_name} have another instance running.")
exit(1)
def __init__(self):
self._get_lock()
# 和fcntl有点区别portalocker释放锁直接有unlock()方法
# 还是一样,其实并不需要在最后自己主动释放锁
def __del__(self):
portalocker.unlock(self.fd)
if __name__ == "__main__":
lock=locker()
dlg=updata_dlg()
dlg.scan_file()
dlg.run()