Files
checker_slave/python/updata.py
2023-06-14 18:05:04 +08:00

442 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import threading
import time
import portalocker
import serial
import serial.tools.list_ports
import prottcp
STR_RED="\033[1;31m"
STR_BLUE="\033[1;34m"
STR_END="\033[0m"
class progress_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 15))
self.lable.setText(text)
self.p=QProgressBar(father)
self.p.setValue(0)
self.p.setGeometry(QRect(40,x+20,450,20))
def set_rate(self,rate:int):
# print(text)
self.p.setValue(rate)
class data_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,title:str,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 20))
self.lable.setText(title)
self.text = QLabel(father)
self.text.setObjectName(u"text")
self.text.setGeometry(QRect(40,x+20,450,30))
self.text.setText(text)
class updata_dlg(QObject):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
failed_signal = pyqtSignal()
def __init__(self):
QObject.__init__(self)
self.app = QApplication(sys.argv)
self.widget = QWidget()
self.widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.widget.resize(703, 409)
self.widget.setWindowTitle("批检仪程序升级")
self.file_list_init()
self.save_but_init()
self.cmd_but_init()
self.sstate_but_init()
self.com_but_init()
self.com_init()
self.widget.destroyed.connect(self.quit)
self.failed_signal.connect(self.updata_failed)
self.cmd=0
self.port_is_open=False
def quit(self):
self.close_port()
# 程序退出
qApp.exit(1)
# 初始化文件列表
def file_list_init(self):
self.file_list = QListWidget(self.widget)
self.file_list.setObjectName(u"file_list")
self.file_list.setGeometry(QRect(25, 60, 531, 310))
self.file_list.setFrameShape(QFrame.Shape.Box)
self.file_list.setMidLineWidth(1)
self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.file_list.itemDoubleClicked.connect(self.item_clicked)
# 初始化打开端口按钮
def com_but_init(self):
self.com_but = QPushButton(self.widget)
self.com_but.setObjectName(u"com_but")
self.com_but.setGeometry(QRect(450, 10, 93, 28))
self.com_but.setText("打开端口")
self.com_but.clicked.connect(self.com_but_clicked)
# 初始化发送文件按钮
def save_but_init(self):
self.save_but = QPushButton(self.widget)
self.save_but.setObjectName(u"save_but")
self.save_but.setGeometry(QRect(590, 60, 93, 28))
self.save_but.setText("发送文件")
self.save_but.clicked.connect(self.save_but_clicked)
# 初始化发送命令按钮
def cmd_but_init(self):
self.cmd_but = QPushButton(self.widget)
self.cmd_but.setObjectName(u"save_but")
self.cmd_but.setGeometry(QRect(590, 100, 93, 28))
self.cmd_but.setText("升级MCU")
self.cmd_but.clicked.connect(self.cmd_but_clicked)
# 初始化在线状态按钮
def sstate_but_init(self):
self.sstate_but = QPushButton(self.widget)
self.sstate_but.setObjectName(u"sstate_but")
self.sstate_but.setGeometry(QRect(590, 140, 93, 28))
self.sstate_but.setText("MCU在线状态")
self.sstate_but.clicked.connect(self.sstate_but_clicked)
# com口
def com_init(self):
self.com = QComboBox(self.widget)
self.com.setObjectName(u"com")
self.com.setGeometry(QRect(100, 10, 300, 21))
self.com.setEditable(True)
self.com.currentIndexChanged.connect(self.com_changed)
self.com.addItem("utcp:7777")
ports_list = list(serial.tools.list_ports.comports())
for comport in ports_list:
# print(comport.name,comport.description)
self.com.addItem(comport.name+":"+comport.description)
self.com_label = QLabel(self.widget)
self.com_label.setObjectName(u"label")
self.com_label.setGeometry(QRect(30, 10, 72, 15))
self.com_label.setText("COM口:")
# 显示消息框
def show_msg(self,msg:str):
m=QMessageBox(self.widget)
m.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
m.setText(msg)
m.setWindowTitle("提示")
m.show()
# 找到已选择的文件
def get_selected_file_by_type(self,type:str):
file_list=[]
items=self.file_list.selectedItems()
for i in items:
if(i.text()[-len(type):]==type):
file_list.append(i.text())
if(len(file_list)!=1):
self.show_msg("请选择一个并且只选择一个 "+type+" 文件")
return ""
return file_list[0]
# 获取已选择的文件列表
def get_selected_fils(self):
file_list=[]
items=self.file_list.selectedItems()
if(len(items)==0):
self.show_msg("请选择至少一个文件")
return []
have_elf=False
have_bin=False
have_lua=False
for i in items:
if(i.text()[-4:]==".elf"):
if(have_elf==True):
self.show_msg("只可选择一个 .elf 文件")
return []
have_elf=True
if(i.text()[-4:]==".bin"):
if(have_bin==True):
self.show_msg("只可选择一个 .bin 文件")
return []
have_bin=True
if(i.text()[-4:]==".lua"):
if(have_lua==True):
self.show_msg("只可选择一个 .lua 文件")
return []
have_lua=True
file_list.append(i.text())
return file_list
def item_clicked(self,item:QListWidgetItem ):
print("item clicked.")
print("slected item is",item.text())
def com_but_clicked(self):
print("com but clicked")
if(self.port_is_open==False):
self.open_port()
else:
self.close_port()
def com_changed(self,index:int):
print("com changed")
self.close_port()
def get_cmd(self,file:str):
l=[(".bin",0xee),(".pkt",0xed),(".json",0x32)]
for i in l:
if(file.endswith(i[0])):
return i[1]
return 0
# 发送文件按钮按下
def save_but_clicked(self):
print("save_but_clicked.")
path = self.getpath()+"file\\"
file_list=self.get_selected_fils()
if(len(file_list)==0):
return
self.cmd=self.get_cmd(file_list[0])
print("cmd=",self.cmd)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("上传文件")
w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.handle_=prottcp.handle()
if(file_list[0].endswith(".json")):
self.handle_.set_json("file/"+file_list[0])
else:
self.handle_.set_file("file/"+file_list[0])
d=self.handle_.get_data()
try:
self.port.send(self.cmd,d)
except Exception as e:
print("com not open")
del self.handle_
w.close()
return
self.creat_progress(w)
w.show()
# self.failed_signal.connect(w.close)
# w.destroyed.connect(self.close_port)
def updata_failed(self):
self.show_msg("打开端口失败")
# 创建进度条
def creat_progress(self,father:QDialog):
self.probar_list=[]
for i in range(1):
p=progress_box()
p.init(father,"发送文件",+i*40)
self.rate_signal.connect(p.set_rate)
self.probar_list.append(p)
# 发送命令按钮按下
def cmd_but_clicked(self):
print("cmd_but clicked.")
file=self.get_selected_file_by_type(".bin")
if(len(file)==0):
return
print("file:",file)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("升级mcu")
self.updata_mcu(file)
self.creat_progress(w)
w.show()
def sstate_but_clicked(self):
print("sstate_but clicked.")
# 开始运行
def run(self):
self.widget.show()
sys.exit(self.app.exec())
def set_port_state(self,state:bool):
self.port_is_open=state
if(state==True):
self.com_but.setText("关闭端口")
else:
self.com_but.setText("打开端口")
# 扫描文件
def scan_file(self):
self.file_list.addItems(self.find_type([".bin",".json",".pkt"]))
# 扫描指定类型的文件
def find_type(self,types:list):
path = self.getpath()+"file\\"
if not os.path.exists(path):
os.makedirs(path)
list=os.listdir(path)
file_list=[]
for t in types:
for i in list:
if(len(t)>0):
if(i[-len(t):]==t):
file_list.append(i)
else:
file_list.append(i)
return file_list
# 获得文件绝对路径
def getpath(self):
path=os.path.abspath(sys.argv[0])
list_str=path.split("\\")
return path.replace(list_str[-1],"")
# 调用此函数打开通信
def open_port(self):
t = threading.Thread(target=self.com_thread, args=())
t.start()
def com_thread(self):
self.port=prottcp.protu()
item=self.com.itemText(self.com.currentIndex())
print("item text:",item)
if(self.port.init(item)==False):
print("init port failed.")
self.failed_signal.emit()
self.set_port_state(False)
return
else:
print("init port success.")
self.set_port_state(True)
self.port.recv_signal.connect(self.recv_slot)
self.port.start_recv()
self.port.wait()
def close_port(self):
print("close port")
self.set_port_state(False)
try:
self.port.close()
except Exception as e:
pass
def recv_slot(self,cmd:int,data:bytearray,err:str):
# print("recv:",cmd,data)
if(self.cmd!=cmd):
return
try:
data=self.handle_.get_data()
if(len(data)>0):
self.port.send(cmd,data)
rate=self.handle_.get_rate()
self.rate_signal.emit(rate)
else:
del self.handle_
except Exception as e:
print(str(e))
# 开始升级mcu
def updata_mcu(self,file):
pass
# 小板通信测试
def comm_test(self):
pass
def end_slot(self,ip,ack,err):
# print(ip,ack,err)
if(ack==False):
self.show_msg(ip+":"+err)
def rate_slot(self,rate):
# print("rate signal:",ip,rate)
self.rate_signal.emit(rate)
def data_slot(self,ip,text):
pass
# 创建数据显示
def creat_databoxs(self,father:QDialog,data_list:list):
self.datab_list=[]
for i in range(len(data_list)):
p=data_box()
p.init(father,data_list[i][0],data_list[i][1],+i*50)
self.datab_list.append(p)
class locker():
def _get_lock(self):
file_name = os.path.basename(__file__)
# linux等平台依然使用标准的/var/run其他nt等平台使用当前目录
if os.name == "posix":
lock_file_name = f"/var/run/{file_name}.pid"
else:
lock_file_name = f"{file_name}.pid"
self.fd = open(lock_file_name, "w")
try:
portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB)
# 将当前进程号写入文件
# 如果获取不到锁上一步就已经异常了,所以不用担心覆盖
self.fd.writelines(str(os.getpid()))
# 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件
self.fd.flush()
except:
print(f"{file_name} have another instance running.")
exit(1)
def __init__(self):
self._get_lock()
# 和fcntl有点区别portalocker释放锁直接有unlock()方法
# 还是一样,其实并不需要在最后自己主动释放锁
def __del__(self):
portalocker.unlock(self.fd)
if __name__ == "__main__":
lock=locker()
dlg=updata_dlg()
dlg.scan_file()
dlg.run()