555 lines
17 KiB
Python
555 lines
17 KiB
Python
|
||
|
||
import sys
|
||
import os
|
||
import shutil
|
||
from PyQt5.QtCore import *
|
||
from PyQt5.QtGui import *
|
||
from PyQt5.QtWidgets import *
|
||
import threading
|
||
import time
|
||
import portalocker
|
||
import serial
|
||
import serial.tools.list_ports
|
||
import prottcp
|
||
import mysql
|
||
import select_list
|
||
|
||
|
||
STR_RED="\033[1;31m"
|
||
STR_BLUE="\033[1;34m"
|
||
STR_END="\033[0m"
|
||
|
||
|
||
|
||
|
||
|
||
class progress_box(QObject):
|
||
|
||
|
||
def __init__(self) -> None:
|
||
QObject.__init__(self)
|
||
|
||
def init(self,father:QDialog,text:str,x:int):
|
||
# print("text=",text)
|
||
self.lable = QLabel(father)
|
||
self.lable.setObjectName(u"label")
|
||
self.lable.setGeometry(QRect(40, x, 120, 15))
|
||
self.lable.setText(text)
|
||
self.p=QProgressBar(father)
|
||
self.p.setValue(0)
|
||
self.p.setGeometry(QRect(40,x+20,450,20))
|
||
def set_rate(self,rate:int):
|
||
# print(text)
|
||
self.p.setValue(rate)
|
||
|
||
|
||
|
||
class data_box(QObject):
|
||
def __init__(self) -> None:
|
||
QObject.__init__(self)
|
||
self.select_item=""
|
||
|
||
def init(self,father:QDialog,title:str,text:str,x:int):
|
||
# print("text=",text)
|
||
self.lable = QLabel(father)
|
||
self.lable.setObjectName(u"label")
|
||
self.lable.setGeometry(QRect(40, x, 120, 20))
|
||
self.lable.setText(title)
|
||
self.text = QLabel(father)
|
||
self.text.setObjectName(u"text")
|
||
self.text.setGeometry(QRect(40,x+20,450,30))
|
||
self.text.setText(text)
|
||
|
||
|
||
|
||
|
||
|
||
class updata_dlg(QObject):
|
||
# 进度信号,ip,1~100
|
||
rate_signal =pyqtSignal([int])
|
||
# 结束信号,ip,成败,描述
|
||
end_signal = pyqtSignal([bool,str])
|
||
|
||
failed_signal = pyqtSignal()
|
||
|
||
def __init__(self):
|
||
QObject.__init__(self)
|
||
self.app = QApplication(sys.argv)
|
||
self.widget = QWidget()
|
||
self.widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
|
||
self.widget.resize(703, 409)
|
||
self.widget.setWindowTitle("赋码仪程序升级")
|
||
self.file_list_init()
|
||
self.com_but_init()
|
||
self.save_but_init()
|
||
self.updatas_but_init()
|
||
# self.cmd_but_init()
|
||
self.sstate_but_init()
|
||
self.scheme_but_init()
|
||
self.com_init()
|
||
self.combsp_init()
|
||
self.download_but_init()
|
||
self.widget.destroyed.connect(self.quit)
|
||
self.failed_signal.connect(self.updata_failed)
|
||
self.cmd=0
|
||
self.port_is_open=False
|
||
|
||
def quit(self):
|
||
self.close_port()
|
||
# 程序退出
|
||
qApp.exit(1)
|
||
# 初始化文件列表
|
||
def file_list_init(self):
|
||
self.file_list = QListWidget(self.widget)
|
||
self.file_list.setObjectName(u"file_list")
|
||
self.file_list.setGeometry(QRect(25, 60, 531, 310))
|
||
self.file_list.setFrameShape(QFrame.Shape.Box)
|
||
self.file_list.setMidLineWidth(1)
|
||
self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
|
||
self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
|
||
self.file_list.itemDoubleClicked.connect(self.item_clicked)
|
||
|
||
|
||
|
||
# 初始化打开端口按钮
|
||
def com_but_init(self):
|
||
self.com_but = QPushButton(self.widget)
|
||
self.com_but.setObjectName(u"com_but")
|
||
self.com_but.setGeometry(QRect(590, 10, 93, 28))
|
||
self.com_but.setText("打开端口")
|
||
self.com_but.clicked.connect(self.com_but_clicked)
|
||
|
||
|
||
# 初始化发送文件按钮
|
||
def save_but_init(self):
|
||
self.save_but = QPushButton(self.widget)
|
||
self.save_but.setObjectName(u"save_but")
|
||
self.save_but.setGeometry(QRect(590, 60, 93, 28))
|
||
self.save_but.setText("发送文件")
|
||
self.save_but.clicked.connect(self.save_but_clicked)
|
||
|
||
# 初始化发送命令按钮
|
||
def cmd_but_init(self):
|
||
self.cmd_but = QPushButton(self.widget)
|
||
self.cmd_but.setObjectName(u"save_but")
|
||
self.cmd_but.setGeometry(QRect(590, 100, 93, 28))
|
||
self.cmd_but.setText("升级MCU")
|
||
self.cmd_but.clicked.connect(self.cmd_but_clicked)
|
||
|
||
# 初始化升级小板按钮
|
||
def updatas_but_init(self):
|
||
self.updatas_but = QPushButton(self.widget)
|
||
self.updatas_but.setObjectName(u"updatas_but")
|
||
self.updatas_but.setGeometry(QRect(590, 100, 93, 28))
|
||
self.updatas_but.setText("升级小板")
|
||
self.updatas_but.clicked.connect(self.updatas_but_clicked)
|
||
|
||
# 初始化在线状态按钮
|
||
def sstate_but_init(self):
|
||
self.sstate_but = QPushButton(self.widget)
|
||
self.sstate_but.setObjectName(u"sstate_but")
|
||
self.sstate_but.setGeometry(QRect(590, 140, 93, 28))
|
||
self.sstate_but.setText("主板参数")
|
||
self.sstate_but.clicked.connect(self.sstate_but_clicked)
|
||
|
||
# 初始化方案状态按钮
|
||
def scheme_but_init(self):
|
||
self.scheme_but = QPushButton(self.widget)
|
||
self.scheme_but.setObjectName(u"scheme_but")
|
||
self.scheme_but.setGeometry(QRect(590, 180, 93, 28))
|
||
self.scheme_but.setText("方案参数")
|
||
self.scheme_but.clicked.connect(self.scheme_but_clicked)
|
||
|
||
# 初始化下载文件按钮
|
||
def download_but_init(self):
|
||
self.download_but = QPushButton(self.widget)
|
||
self.download_but.setObjectName(u"download_but")
|
||
self.download_but.setGeometry(QRect(590, 220, 93, 28))
|
||
self.download_but.setText("下载文件")
|
||
self.download_but.clicked.connect(self.download_but_clicked)
|
||
|
||
|
||
# com口
|
||
def com_init(self):
|
||
self.com = QComboBox(self.widget)
|
||
self.com.setObjectName(u"com")
|
||
self.com.setGeometry(QRect(85, 10, 300, 25))
|
||
self.com.setEditable(True)
|
||
self.com.currentIndexChanged.connect(self.com_changed)
|
||
self.com.addItem("utcp:7777")
|
||
ports_list = list(serial.tools.list_ports.comports())
|
||
for comport in ports_list:
|
||
# print(comport.name,comport.description)
|
||
self.com.addItem(comport.name+":"+comport.description)
|
||
self.com_label = QLabel(self.widget)
|
||
self.com_label.setObjectName(u"label")
|
||
self.com_label.setGeometry(QRect(30, 16, 72, 15))
|
||
self.com_label.setText("COM口:")
|
||
|
||
# 选择波特率
|
||
def combsp_init(self):
|
||
self.combsp = QComboBox(self.widget)
|
||
self.combsp.setObjectName(u"combsp")
|
||
self.combsp.setGeometry(QRect(470, 10, 80, 25))
|
||
self.combsp.setEditable(True)
|
||
self.combsp.addItem("115200")
|
||
self.combsp.addItem("57600")
|
||
self.combsp.addItem("38400")
|
||
self.combsp.addItem("9600")
|
||
self.combsp_label = QLabel(self.widget)
|
||
self.combsp_label.setObjectName(u"label")
|
||
self.combsp_label.setGeometry(QRect(410, 16, 72, 15))
|
||
self.combsp_label.setText("波特率:")
|
||
|
||
# 显示消息框
|
||
def show_msg(self,msg:str):
|
||
m=QMessageBox(self.widget)
|
||
m.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
|
||
m.setText(msg)
|
||
m.setWindowTitle("提示")
|
||
m.show()
|
||
|
||
# 找到已选择的文件
|
||
def get_selected_file_by_type(self,type:str):
|
||
file_list=[]
|
||
items=self.file_list.selectedItems()
|
||
for i in items:
|
||
if(i.text()[-len(type):]==type):
|
||
file_list.append(i.text())
|
||
if(len(file_list)!=1):
|
||
self.show_msg("请选择一个并且只选择一个 "+type+" 文件")
|
||
return ""
|
||
return file_list[0]
|
||
|
||
# 获取已选择的文件列表
|
||
def get_selected_fils(self):
|
||
file_list=[]
|
||
items=self.file_list.selectedItems()
|
||
if(len(items)==0):
|
||
self.show_msg("请选择至少一个文件")
|
||
return []
|
||
have_elf=False
|
||
have_bin=False
|
||
have_lua=False
|
||
for i in items:
|
||
if(i.text()[-4:]==".elf"):
|
||
if(have_elf==True):
|
||
self.show_msg("只可选择一个 .elf 文件")
|
||
return []
|
||
have_elf=True
|
||
if(i.text()[-4:]==".bin"):
|
||
if(have_bin==True):
|
||
self.show_msg("只可选择一个 .bin 文件")
|
||
return []
|
||
have_bin=True
|
||
if(i.text()[-4:]==".lua"):
|
||
if(have_lua==True):
|
||
self.show_msg("只可选择一个 .lua 文件")
|
||
return []
|
||
have_lua=True
|
||
file_list.append(i.text())
|
||
return file_list
|
||
|
||
|
||
|
||
def item_clicked(self,item:QListWidgetItem ):
|
||
print("item clicked.")
|
||
print("slected item is",item.text())
|
||
|
||
def com_but_clicked(self):
|
||
print("com but clicked")
|
||
if(self.port_is_open==False):
|
||
self.open_port()
|
||
else:
|
||
self.close_port()
|
||
|
||
def com_changed(self,index:int):
|
||
print("com changed")
|
||
self.close_port()
|
||
def get_cmd(self,file:str):
|
||
l=[(".bin",0xee),(".pkt",0xed),(".json",0x32)]
|
||
for i in l:
|
||
if(file.endswith(i[0])):
|
||
return i[1]
|
||
return 0
|
||
|
||
# 发送文件按钮按下
|
||
def save_but_clicked(self):
|
||
print("save_but_clicked.")
|
||
path = self.getpath()+"file\\"
|
||
file_list=self.get_selected_fils()
|
||
if(len(file_list)==0):
|
||
return
|
||
self.cmd=self.get_cmd(file_list[0])
|
||
print("cmd=",self.cmd)
|
||
w=QDialog(self.widget)
|
||
w.resize(703-150, 1*40+20)
|
||
w.setWindowTitle("上传文件")
|
||
w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
|
||
self.handle_=prottcp.handle()
|
||
if(file_list[0].endswith(".json")):
|
||
self.handle_.set_json("file/"+file_list[0])
|
||
else:
|
||
self.handle_.set_file("file/"+file_list[0])
|
||
d=self.handle_.get_data()
|
||
try:
|
||
self.port.send(self.cmd,d)
|
||
except Exception as e:
|
||
print("com not open")
|
||
self.show_msg("端口未打开")
|
||
del self.handle_
|
||
w.close()
|
||
return
|
||
self.creat_progress(w)
|
||
w.show()
|
||
# self.failed_signal.connect(w.close)
|
||
# w.destroyed.connect(self.close_port)
|
||
def updata_failed(self):
|
||
self.show_msg("打开端口失败")
|
||
|
||
# 创建进度条
|
||
def creat_progress(self,father:QDialog):
|
||
self.probar_list=[]
|
||
for i in range(1):
|
||
p=progress_box()
|
||
p.init(father,"发送文件",+i*40)
|
||
self.rate_signal.connect(p.set_rate)
|
||
self.probar_list.append(p)
|
||
|
||
# 发送命令按钮按下
|
||
def cmd_but_clicked(self):
|
||
print("cmd_but clicked.")
|
||
file=self.get_selected_file_by_type(".bin")
|
||
if(len(file)==0):
|
||
return
|
||
print("file:",file)
|
||
w=QDialog(self.widget)
|
||
w.resize(703-150, 1*40+20)
|
||
w.setWindowTitle("升级mcu")
|
||
self.updata_mcu(file)
|
||
self.creat_progress(w)
|
||
w.show()
|
||
|
||
|
||
def sstate_but_clicked(self):
|
||
print("sstate_but clicked.")
|
||
try:
|
||
self.port.send_str("sysinfo")
|
||
except Exception as e:
|
||
print("com not open")
|
||
print(str(e))
|
||
|
||
def scheme_but_clicked(self):
|
||
print("scheme_but clicked.")
|
||
try:
|
||
self.port.send_str("scheme")
|
||
except Exception as e:
|
||
print("com not open")
|
||
print(str(e))
|
||
|
||
def updatas_but_clicked(self):
|
||
print("updatas_but clicked.")
|
||
try:
|
||
self.port.send_str("updatas 1,2,3,4,5,6,7,8,9,10")
|
||
self.show_msg("已发送升级指令,请留意小板升级情况")
|
||
except Exception as e:
|
||
self.show_msg("命令发送失败")
|
||
print("com not open")
|
||
print(str(e))
|
||
|
||
def download_but_clicked(self):
|
||
sql=mysql.sql()
|
||
if(sql.init("")):
|
||
str_list=sql.show_tables()
|
||
sel=select_list.select_list(self.widget,"选择文件目录",str_list)
|
||
path=sel.show()
|
||
# print("path:",path)
|
||
if(len(path)<=0):
|
||
print("not select any item,break.")
|
||
return
|
||
sql.table_name=path
|
||
items=sql.items()
|
||
str_list=[]
|
||
for i in items:
|
||
item_name=i[2].replace("\\","/")
|
||
item_name=item_name.split("/")[-1]
|
||
if(i[4]!=None):
|
||
s=str(i[0])+"|"+i[1]+"|"+item_name+"|"+i[4]
|
||
else:
|
||
s=str(i[0])+"|"+i[1]+"|"+item_name
|
||
# print(s)
|
||
str_list.append((s,))
|
||
sel=select_list.select_list(self.widget,"选择文件",str_list)
|
||
item=sel.show()
|
||
# print("item:",item)
|
||
if(len(item)<=0):
|
||
print("not select any item,break.")
|
||
return
|
||
item=item.split("|")
|
||
file_name=sql.download(int(item[0]))
|
||
dst="file/"+file_name.split("/")[-1]
|
||
shutil.copy(file_name,dst)
|
||
self.scan_file()
|
||
|
||
# 开始运行
|
||
def run(self):
|
||
self.widget.show()
|
||
sys.exit(self.app.exec())
|
||
|
||
def set_port_state(self,state:bool):
|
||
self.port_is_open=state
|
||
if(state==True):
|
||
self.com_but.setText("关闭端口")
|
||
else:
|
||
self.com_but.setText("打开端口")
|
||
|
||
# 扫描文件
|
||
def scan_file(self):
|
||
self.file_list.clear()
|
||
self.file_list.addItems(self.find_type([".bin",".json",".pkt"]))
|
||
|
||
|
||
# 扫描指定类型的文件
|
||
def find_type(self,types:list):
|
||
path = self.getpath()+"file\\"
|
||
if not os.path.exists(path):
|
||
os.makedirs(path)
|
||
list=os.listdir(path)
|
||
file_list=[]
|
||
for t in types:
|
||
for i in list:
|
||
if(len(t)>0):
|
||
if(i[-len(t):]==t):
|
||
file_list.append(i)
|
||
else:
|
||
file_list.append(i)
|
||
return file_list
|
||
|
||
# 获得文件绝对路径
|
||
def getpath(self):
|
||
path=os.path.abspath(sys.argv[0])
|
||
list_str=path.split("\\")
|
||
return path.replace(list_str[-1],"")
|
||
|
||
# 调用此函数打开通信
|
||
def open_port(self):
|
||
t = threading.Thread(target=self.com_thread, args=())
|
||
t.start()
|
||
def com_thread(self):
|
||
self.port=prottcp.protu()
|
||
item=self.com.itemText(self.com.currentIndex())
|
||
bsp=self.combsp.itemText(self.combsp.currentIndex())
|
||
com=item.split(":")[0]
|
||
if(com!="utcp"):
|
||
item=com+":"+bsp
|
||
print("item text:",item)
|
||
if(self.port.init(item)==False):
|
||
print("init port failed.")
|
||
self.failed_signal.emit()
|
||
self.set_port_state(False)
|
||
return
|
||
else:
|
||
print("init port success.")
|
||
self.set_port_state(True)
|
||
self.port.recv_signal.connect(self.recv_slot)
|
||
self.port.recv_str_signal.connect(self.recv_str_slot)
|
||
self.port.start_recv()
|
||
self.port.wait()
|
||
def close_port(self):
|
||
print("close port")
|
||
self.set_port_state(False)
|
||
try:
|
||
self.port.close()
|
||
self.port.recv_signal.disconnect(self.recv_slot)
|
||
self.port.recv_str_signal.disconnect(self.recv_str_slot)
|
||
except Exception as e:
|
||
pass
|
||
def recv_str_slot(self,cmd:int,txt:str,err:str):
|
||
print("|-|",txt)
|
||
def recv_slot(self,cmd:int,data:bytearray,err:str):
|
||
# print("recv:",cmd,data)
|
||
if(self.cmd!=cmd):
|
||
return
|
||
try:
|
||
data=self.handle_.get_data()
|
||
if(len(data)>0):
|
||
self.port.send(cmd,data)
|
||
rate=self.handle_.get_rate()
|
||
self.rate_signal.emit(rate)
|
||
else:
|
||
del self.handle_
|
||
except Exception as e:
|
||
print(str(e))
|
||
# 开始升级mcu
|
||
def updata_mcu(self,file):
|
||
pass
|
||
|
||
# 小板通信测试
|
||
def comm_test(self):
|
||
pass
|
||
|
||
|
||
|
||
def end_slot(self,ip,ack,err):
|
||
# print(ip,ack,err)
|
||
if(ack==False):
|
||
self.show_msg(ip+":"+err)
|
||
|
||
def rate_slot(self,rate):
|
||
# print("rate signal:",ip,rate)
|
||
self.rate_signal.emit(rate)
|
||
|
||
def data_slot(self,ip,text):
|
||
pass
|
||
|
||
# 创建数据显示
|
||
def creat_databoxs(self,father:QDialog,data_list:list):
|
||
self.datab_list=[]
|
||
for i in range(len(data_list)):
|
||
p=data_box()
|
||
p.init(father,data_list[i][0],data_list[i][1],+i*50)
|
||
self.datab_list.append(p)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
class locker():
|
||
def _get_lock(self):
|
||
file_name = os.path.basename(__file__)
|
||
# linux等平台依然使用标准的/var/run,其他nt等平台使用当前目录
|
||
if os.name == "posix":
|
||
lock_file_name = f"/var/run/{file_name}.pid"
|
||
else:
|
||
lock_file_name = f"{file_name}.pid"
|
||
self.fd = open(lock_file_name, "w")
|
||
try:
|
||
portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB)
|
||
# 将当前进程号写入文件
|
||
# 如果获取不到锁上一步就已经异常了,所以不用担心覆盖
|
||
self.fd.writelines(str(os.getpid()))
|
||
# 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件
|
||
self.fd.flush()
|
||
except:
|
||
print(f"{file_name} have another instance running.")
|
||
exit(1)
|
||
|
||
def __init__(self):
|
||
self._get_lock()
|
||
|
||
# 和fcntl有点区别,portalocker释放锁直接有unlock()方法
|
||
# 还是一样,其实并不需要在最后自己主动释放锁
|
||
def __del__(self):
|
||
portalocker.unlock(self.fd)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
lock=locker()
|
||
dlg=updata_dlg()
|
||
dlg.scan_file()
|
||
dlg.run()
|
||
|
||
|