Files
python_tools/updata/updata_uart.py
ranchuan 238fd1e6bb 解决updata 关闭时串口没有正常关闭的问题
修改服务器文件列表显示
添加.jwt文件解析
2023-11-01 17:58:32 +08:00

622 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import shutil
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import threading
import time
import portalocker
import serial
import serial.tools.list_ports
import prottcp
import mysql
import select_list
import file_detail
import float_lable
STR_RED="\033[1;31m"
STR_BLUE="\033[1;34m"
STR_END="\033[0m"
class progress_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
def init(self,father:QDialog,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 15))
self.lable.setText(text)
self.p=QProgressBar(father)
self.p.setValue(0)
self.p.setGeometry(QRect(40,x+20,450,20))
def set_rate(self,rate:int):
# print(text)
self.p.setValue(rate)
class data_box(QObject):
def __init__(self) -> None:
QObject.__init__(self)
self.select_item=""
def init(self,father:QDialog,title:str,text:str,x:int):
# print("text=",text)
self.lable = QLabel(father)
self.lable.setObjectName(u"label")
self.lable.setGeometry(QRect(40, x, 120, 20))
self.lable.setText(title)
self.text = QLabel(father)
self.text.setObjectName(u"text")
self.text.setGeometry(QRect(40,x+20,450,30))
self.text.setText(text)
class updata_dlg(QWidget):
# 进度信号ip1~100
rate_signal =pyqtSignal([int])
# 结束信号ip成败描述
end_signal = pyqtSignal([bool,str])
failed_signal = pyqtSignal()
# 数据库下载文件结束信号
sql_download_end_signal =pyqtSignal([])
def __init__(self):
QWidget.__init__(self)
self.widget = self
self.widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.widget.resize(703, 409)
self.widget.setWindowTitle("赋码仪程序升级")
self.file_list_init()
self.com_but_init()
self.save_but_init()
self.updatas_but_init()
# self.cmd_but_init()
self.sstate_but_init()
self.scheme_but_init()
self.com_init()
self.combsp_init()
self.download_but_init()
self.widget.destroyed.connect(self.quit)
self.failed_signal.connect(self.updata_failed)
self.cmd=0
self.port_is_open=False
self.scan_file()
self.infotext_init()
# 提示信息
def infotext_init(self):
self.infotext=QLabel(self.widget)
self.infotext.setGeometry(QRect(25, 380, 800, 15))
self.infotext.setText("欢迎使用云铭公司设备维护工具.")
def set_infotext(self,text:str):
try:
self.infotext.setText(text)
except Exception as e:
pass
def quit(self):
print("quit")
self.close_port()
# 程序退出
qApp.exit(1)
# 初始化文件列表
def file_list_init(self):
self.file_list = QListWidget(self.widget)
self.file_list.setObjectName(u"file_list")
self.file_list.setGeometry(QRect(25, 60, 531, 310))
self.file_list.setFrameShape(QFrame.Shape.Box)
self.file_list.setMidLineWidth(1)
self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.file_list.itemDoubleClicked.connect(self.item_clicked)
ack=self.file_list.setProperty("contextMenuPolicy",Qt.ContextMenuPolicy.CustomContextMenu)
self.file_list.customContextMenuRequested.connect(self.file_item_menu_slot)
def file_item_menu_slot(self,pos:QPoint):
print("show pop menu")
menu=QMenu(self.file_list)
action=QAction("删除选中条目",self.file_list)
action.triggered.connect(self.file_item_delete)
menu.addAction(action)
action=QAction("查看选中条目详情",self.file_list)
action.triggered.connect(self.file_item_detail)
menu.addAction(action)
action=QAction("发送选中条目到主板",self.file_list)
action.triggered.connect(self.save_but_clicked)
menu.addAction(action)
menu.addAction(action)
action=QAction("刷新文件列表",self.file_list)
action.triggered.connect(self.scan_file)
menu.addAction(action)
menu.exec(self.file_list.mapToGlobal(pos))
def file_item_delete(self):
print("delete slected items.")
items=self.file_list.selectedItems()
for i in items:
os.remove("file\\"+i.text())
self.scan_file()
def file_item_detail(self):
print("show detail with slected items.")
items=self.file_list.selectedItems()
details=[]
for i in items:
details+=file_detail.detail("file/"+i.text())
if(len(details)==0):
return
pos=QCursor.pos()
lab=float_lable.floatBox(self.widget,details,x=pos.x(),y=pos.y())
lab.show()
# 初始化打开端口按钮
def com_but_init(self):
self.com_but = QPushButton(self.widget)
self.com_but.setObjectName(u"com_but")
self.com_but.setGeometry(QRect(590, 10, 150, 28))
self.com_but.setText("打开端口")
self.com_but.clicked.connect(self.com_but_clicked)
# 初始化发送文件按钮
def save_but_init(self):
self.save_but = QPushButton(self.widget)
self.save_but.setObjectName(u"save_but")
self.save_but.setGeometry(QRect(590, 60, 150, 28))
self.save_but.setText("发送文件到串口设备")
self.save_but.clicked.connect(self.save_but_clicked)
self.save_but.setToolTip("请选择要升级的文件,将发送选择的文件到串口设备中。")
# 初始化发送命令按钮
def cmd_but_init(self):
self.cmd_but = QPushButton(self.widget)
self.cmd_but.setObjectName(u"save_but")
self.cmd_but.setGeometry(QRect(590, 100, 150, 28))
self.cmd_but.setText("升级从设备")
self.cmd_but.clicked.connect(self.cmd_but_clicked)
# 初始化升级小板按钮
def updatas_but_init(self):
self.updatas_but = QPushButton(self.widget)
self.updatas_but.setObjectName(u"updatas_but")
self.updatas_but.setGeometry(QRect(590, 100, 150, 28))
self.updatas_but.setText("升级小板")
self.updatas_but.clicked.connect(self.updatas_but_clicked)
self.updatas_but.setToolTip("先将小板程序发送到串口设备中,再使用此按钮升级小板。")
# 初始化在线状态按钮
def sstate_but_init(self):
self.sstate_but = QPushButton(self.widget)
self.sstate_but.setObjectName(u"sstate_but")
self.sstate_but.setGeometry(QRect(590, 140, 150, 28))
self.sstate_but.setText("串口设备参数")
self.sstate_but.clicked.connect(self.sstate_but_clicked)
self.sstate_but.setToolTip("显示串口设备的基本参数。")
# 初始化方案状态按钮
def scheme_but_init(self):
self.scheme_but = QPushButton(self.widget)
self.scheme_but.setObjectName(u"scheme_but")
self.scheme_but.setGeometry(QRect(590, 180, 150, 28))
self.scheme_but.setText("方案参数")
self.scheme_but.clicked.connect(self.scheme_but_clicked)
self.scheme_but.setToolTip("显示串口设备的方案参数。")
# 初始化下载文件按钮
def download_but_init(self):
self.download_but = QPushButton(self.widget)
self.download_but.setObjectName(u"download_but")
self.download_but.setGeometry(QRect(590, 220, 150, 28))
self.download_but.setText("从服务器下载文件")
self.download_but.clicked.connect(self.download_but_clicked)
self.download_but.setToolTip("从服务器下载文件到本地。")
# com口
def com_init(self):
self.com = QComboBox(self.widget)
self.com.setObjectName(u"com")
self.com.setGeometry(QRect(85, 10, 300, 25))
self.com.setEditable(True)
self.com.currentIndexChanged.connect(self.com_changed)
self.com.addItem("utcp:7777")
ports_list = list(serial.tools.list_ports.comports())
for comport in ports_list:
# print(comport.name,comport.description)
self.com.addItem(comport.name+":"+comport.description)
self.com_label = QLabel(self.widget)
self.com_label.setObjectName(u"label")
self.com_label.setGeometry(QRect(30, 16, 72, 15))
self.com_label.setText("COM口:")
# 选择波特率
def combsp_init(self):
self.combsp = QComboBox(self.widget)
self.combsp.setObjectName(u"combsp")
self.combsp.setGeometry(QRect(470, 10, 80, 25))
self.combsp.setEditable(True)
self.combsp.addItem("115200")
self.combsp.addItem("57600")
self.combsp.addItem("38400")
self.combsp.addItem("9600")
self.combsp_label = QLabel(self.widget)
self.combsp_label.setObjectName(u"label")
self.combsp_label.setGeometry(QRect(410, 16, 72, 15))
self.combsp_label.setText("波特率:")
# 显示消息框
def show_msg(self,msg:str):
m=QMessageBox(self.widget)
m.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
m.setText(msg)
m.setWindowTitle("提示")
m.show()
# 找到已选择的文件
def get_selected_file_by_type(self,type:str):
file_list=[]
items=self.file_list.selectedItems()
for i in items:
if(i.text()[-len(type):]==type):
file_list.append(i.text())
if(len(file_list)!=1):
self.set_infotext("请选择一个并且只选择一个 "+type+" 文件")
return ""
return file_list[0]
# 获取已选择的文件列表
def get_selected_fils(self):
file_list=[]
items=self.file_list.selectedItems()
if(len(items)==0):
self.set_infotext("请选择至少一个文件")
return []
have_elf=False
have_bin=False
have_lua=False
for i in items:
if(i.text()[-4:]==".elf"):
if(have_elf==True):
self.set_infotext("只可选择一个 .elf 文件")
return []
have_elf=True
if(i.text()[-4:]==".bin"):
if(have_bin==True):
self.set_infotext("只可选择一个 .bin 文件")
return []
have_bin=True
if(i.text()[-4:]==".lua"):
if(have_lua==True):
self.set_infotext("只可选择一个 .lua 文件")
return []
have_lua=True
file_list.append(i.text())
return file_list
def item_clicked(self,item:QListWidgetItem ):
print("item clicked.")
print("slected item is",item.text())
def com_but_clicked(self):
print("com but clicked")
if(self.port_is_open==False):
self.open_port()
else:
self.close_port()
def com_changed(self,index:int):
print("com changed")
self.close_port()
def get_cmd(self,file:str):
l=[(".bin",0xee),(".pkt",0xed),(".json",0x32),(".jwt",0xec)]
for i in l:
if(file.endswith(i[0])):
return i[1]
return 0
# 发送文件按钮按下
def save_but_clicked(self):
print("save_but_clicked.")
path = self.getpath()+"file\\"
file_list=self.get_selected_fils()
if(len(file_list)==0):
return
self.cmd=self.get_cmd(file_list[0])
print("cmd=",self.cmd)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("上传文件")
w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.handle_=prottcp.handle()
if(file_list[0].endswith(".json")):
self.handle_.set_json("file/"+file_list[0])
else:
self.handle_.set_file("file/"+file_list[0])
d=self.handle_.get_data()
try:
self.port.send(self.cmd,d)
except Exception as e:
print("com not open")
self.set_infotext("端口未打开")
del self.handle_
w.close()
return
self.creat_progress(w)
w.show()
# self.failed_signal.connect(w.close)
# w.destroyed.connect(self.close_port)
def updata_failed(self):
self.set_infotext("打开端口失败")
# 创建进度条
def creat_progress(self,father:QDialog):
self.probar_list=[]
for i in range(1):
p=progress_box()
p.init(father,"发送文件",+i*40)
self.rate_signal.connect(p.set_rate)
self.probar_list.append(p)
# 发送命令按钮按下
def cmd_but_clicked(self):
print("cmd_but clicked.")
file=self.get_selected_file_by_type(".bin")
if(len(file)==0):
return
print("file:",file)
w=QDialog(self.widget)
w.resize(703-150, 1*40+20)
w.setWindowTitle("升级mcu")
self.updata_mcu(file)
self.creat_progress(w)
w.show()
def sstate_but_clicked(self):
print("sstate_but clicked.")
try:
self.port.send_str("sysinfo")
except Exception as e:
print("com not open")
print(str(e))
def scheme_but_clicked(self):
print("scheme_but clicked.")
try:
self.port.send_str("scheme")
except Exception as e:
print("com not open")
print(str(e))
def updatas_but_clicked(self):
print("updatas_but clicked.")
try:
self.port.send_str("updatas 1,2,3,4,5,6,7,8,9,10")
self.set_infotext("已发送升级指令,请留意小板升级情况")
except Exception as e:
self.set_infotext("命令发送失败,是否未打开端口?")
print("com not open")
print(str(e))
def download_but_clicked(self):
sql=mysql.sql()
if(sql.init("")):
str_list=sql.show_tables()
sel=select_list.select_list(self.widget,"选择文件目录",str_list)
path=sel.show()
# print("path:",path)
if(len(path)<=0):
print("not select any item,break.")
return
sql.table_name=path
items=sql.items()
str_list=[]
for i in items:
item_name=i[2].replace("\\","/")
item_name=item_name.split("/")[-1]
if(i[4]!=None):
s=str(i[0])+"|"+i[1]+"|"+item_name+"|"+i[4]
else:
s=str(i[0])+"|"+i[1]+"|"+item_name
# print(s)
str_list.append((s,))
sel=select_list.file_list(self.widget,"选择文件",str_list)
item=sel.show()
# print("item:",item)
if(len(item)<=0):
print("not select any item,break.")
return
item=item.split("|")
self.set_infotext("正在从服务器下载文件,可能需要一些时间,请耐心等待.")
t = threading.Thread(target=self.sql_download, args=(sql,int(item[0]),))
t.start()
# 从数据库下载文件,在后台运行
def sql_download(self,sql:mysql.sql,index:int):
file_name=sql.download(index)
dst="file/"+file_name.split("/")[-1]
shutil.copy(file_name,dst)
self.sql_download_end_signal.emit()
self.set_infotext("已下载文件:"+dst)
# 开始运行
def run(self):
self.widget.show()
def set_port_state(self,state:bool):
self.port_is_open=state
if(state==True):
self.com_but.setText("关闭端口")
else:
self.com_but.setText("打开端口")
# 扫描文件
def scan_file(self):
self.file_list.clear()
self.file_list.addItems(self.find_type([".bin",".json",".pkt",".jwt"]))
# 扫描指定类型的文件
def find_type(self,types:list):
path = self.getpath()+"file\\"
if not os.path.exists(path):
os.makedirs(path)
list=os.listdir(path)
file_list=[]
for t in types:
for i in list:
if(len(t)>0):
if(i[-len(t):]==t):
file_list.append(i)
else:
file_list.append(i)
return file_list
# 获得文件绝对路径
def getpath(self):
path=os.path.abspath(sys.argv[0])
list_str=path.split("\\")
return path.replace(list_str[-1],"")
# 调用此函数打开通信
def open_port(self):
t = threading.Thread(target=self.com_thread, args=())
t.start()
def com_thread(self):
self.port=prottcp.protu()
item=self.com.itemText(self.com.currentIndex())
bsp=self.combsp.itemText(self.combsp.currentIndex())
com=item.split(":")[0]
if(com!="utcp"):
item=com+":"+bsp
print("item text:",item)
if(self.port.init(item)==False):
print("init port failed.")
self.failed_signal.emit()
self.set_port_state(False)
return
else:
print("init port success.")
self.set_port_state(True)
self.port.recv_signal.connect(self.recv_slot)
self.port.recv_str_signal.connect(self.recv_str_slot)
self.port.start_recv()
# self.port.wait()
def close_port(self):
print("close port")
self.set_port_state(False)
try:
self.port.close()
self.port.recv_signal.disconnect(self.recv_slot)
self.port.recv_str_signal.disconnect(self.recv_str_slot)
except Exception as e:
pass
def recv_str_slot(self,cmd:int,txt:str,err:str):
print("|-|",txt)
with open("device_log.txt","a+") as f:
f.write(txt+'\n')
def recv_slot(self,cmd:int,data:bytearray,err:str):
# print("recv:",cmd,data)
if(self.cmd!=cmd):
return
try:
data=self.handle_.get_data()
if(len(data)>0):
self.port.send(cmd,data)
rate=self.handle_.get_rate()
self.rate_signal.emit(rate)
else:
del self.handle_
except Exception as e:
print(str(e))
# 开始升级mcu
def updata_mcu(self,file):
pass
# 小板通信测试
def comm_test(self):
pass
def end_slot(self,ip,ack,err):
# print(ip,ack,err)
if(ack==False):
self.show_msg(ip+":"+err)
def rate_slot(self,rate):
# print("rate signal:",ip,rate)
self.rate_signal.emit(rate)
def data_slot(self,ip,text):
pass
# 创建数据显示
def creat_databoxs(self,father:QDialog,data_list:list):
self.datab_list=[]
for i in range(len(data_list)):
p=data_box()
p.init(father,data_list[i][0],data_list[i][1],+i*50)
self.datab_list.append(p)
class locker():
def _get_lock(self):
file_name = os.path.basename(__file__)
# linux等平台依然使用标准的/var/run其他nt等平台使用当前目录
if os.name == "posix":
lock_file_name = f"/var/run/{file_name}.pid"
else:
lock_file_name = f"{file_name}.pid"
self.fd = open(lock_file_name, "w")
try:
portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB)
# 将当前进程号写入文件
# 如果获取不到锁上一步就已经异常了,所以不用担心覆盖
self.fd.writelines(str(os.getpid()))
# 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件
self.fd.flush()
except:
print(f"{file_name} have another instance running.")
exit(1)
def __init__(self):
self._get_lock()
# 和fcntl有点区别portalocker释放锁直接有unlock()方法
# 还是一样,其实并不需要在最后自己主动释放锁
def __del__(self):
portalocker.unlock(self.fd)
if __name__ == "__main__":
lock=locker()
app = QApplication(sys.argv)
dlg=updata_dlg()
dlg.run()
app.exec()