import sys import os import shutil from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * import threading import time import portalocker import serial import serial.tools.list_ports import prottcp import mysql import select_list STR_RED="\033[1;31m" STR_BLUE="\033[1;34m" STR_END="\033[0m" class progress_box(QObject): def __init__(self) -> None: QObject.__init__(self) def init(self,father:QDialog,text:str,x:int): # print("text=",text) self.lable = QLabel(father) self.lable.setObjectName(u"label") self.lable.setGeometry(QRect(40, x, 120, 15)) self.lable.setText(text) self.p=QProgressBar(father) self.p.setValue(0) self.p.setGeometry(QRect(40,x+20,450,20)) def set_rate(self,rate:int): # print(text) self.p.setValue(rate) class data_box(QObject): def __init__(self) -> None: QObject.__init__(self) self.select_item="" def init(self,father:QDialog,title:str,text:str,x:int): # print("text=",text) self.lable = QLabel(father) self.lable.setObjectName(u"label") self.lable.setGeometry(QRect(40, x, 120, 20)) self.lable.setText(title) self.text = QLabel(father) self.text.setObjectName(u"text") self.text.setGeometry(QRect(40,x+20,450,30)) self.text.setText(text) class updata_dlg(QObject): # 进度信号,ip,1~100 rate_signal =pyqtSignal([int]) # 结束信号,ip,成败,描述 end_signal = pyqtSignal([bool,str]) failed_signal = pyqtSignal() def __init__(self): QObject.__init__(self) self.app = QApplication(sys.argv) self.widget = QWidget() self.widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose) self.widget.resize(703, 409) self.widget.setWindowTitle("赋码仪程序升级") self.file_list_init() self.com_but_init() self.save_but_init() self.updatas_but_init() # self.cmd_but_init() self.sstate_but_init() self.scheme_but_init() self.com_init() self.combsp_init() self.download_but_init() self.widget.destroyed.connect(self.quit) self.failed_signal.connect(self.updata_failed) self.cmd=0 self.port_is_open=False def quit(self): self.close_port() # 程序退出 qApp.exit(1) # 初始化文件列表 def file_list_init(self): self.file_list = QListWidget(self.widget) self.file_list.setObjectName(u"file_list") self.file_list.setGeometry(QRect(25, 60, 531, 310)) self.file_list.setFrameShape(QFrame.Shape.Box) self.file_list.setMidLineWidth(1) self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection) self.file_list.itemDoubleClicked.connect(self.item_clicked) # 初始化打开端口按钮 def com_but_init(self): self.com_but = QPushButton(self.widget) self.com_but.setObjectName(u"com_but") self.com_but.setGeometry(QRect(590, 10, 93, 28)) self.com_but.setText("打开端口") self.com_but.clicked.connect(self.com_but_clicked) # 初始化发送文件按钮 def save_but_init(self): self.save_but = QPushButton(self.widget) self.save_but.setObjectName(u"save_but") self.save_but.setGeometry(QRect(590, 60, 93, 28)) self.save_but.setText("发送文件") self.save_but.clicked.connect(self.save_but_clicked) # 初始化发送命令按钮 def cmd_but_init(self): self.cmd_but = QPushButton(self.widget) self.cmd_but.setObjectName(u"save_but") self.cmd_but.setGeometry(QRect(590, 100, 93, 28)) self.cmd_but.setText("升级MCU") self.cmd_but.clicked.connect(self.cmd_but_clicked) # 初始化升级小板按钮 def updatas_but_init(self): self.updatas_but = QPushButton(self.widget) self.updatas_but.setObjectName(u"updatas_but") self.updatas_but.setGeometry(QRect(590, 100, 93, 28)) self.updatas_but.setText("升级小板") self.updatas_but.clicked.connect(self.updatas_but_clicked) # 初始化在线状态按钮 def sstate_but_init(self): self.sstate_but = QPushButton(self.widget) self.sstate_but.setObjectName(u"sstate_but") self.sstate_but.setGeometry(QRect(590, 140, 93, 28)) self.sstate_but.setText("主板参数") self.sstate_but.clicked.connect(self.sstate_but_clicked) # 初始化方案状态按钮 def scheme_but_init(self): self.scheme_but = QPushButton(self.widget) self.scheme_but.setObjectName(u"scheme_but") self.scheme_but.setGeometry(QRect(590, 180, 93, 28)) self.scheme_but.setText("方案参数") self.scheme_but.clicked.connect(self.scheme_but_clicked) # 初始化下载文件按钮 def download_but_init(self): self.download_but = QPushButton(self.widget) self.download_but.setObjectName(u"download_but") self.download_but.setGeometry(QRect(590, 220, 93, 28)) self.download_but.setText("下载文件") self.download_but.clicked.connect(self.download_but_clicked) # com口 def com_init(self): self.com = QComboBox(self.widget) self.com.setObjectName(u"com") self.com.setGeometry(QRect(85, 10, 300, 25)) self.com.setEditable(True) self.com.currentIndexChanged.connect(self.com_changed) self.com.addItem("utcp:7777") ports_list = list(serial.tools.list_ports.comports()) for comport in ports_list: # print(comport.name,comport.description) self.com.addItem(comport.name+":"+comport.description) self.com_label = QLabel(self.widget) self.com_label.setObjectName(u"label") self.com_label.setGeometry(QRect(30, 16, 72, 15)) self.com_label.setText("COM口:") # 选择波特率 def combsp_init(self): self.combsp = QComboBox(self.widget) self.combsp.setObjectName(u"combsp") self.combsp.setGeometry(QRect(470, 10, 80, 25)) self.combsp.setEditable(True) self.combsp.addItem("115200") self.combsp.addItem("57600") self.combsp.addItem("38400") self.combsp.addItem("9600") self.combsp_label = QLabel(self.widget) self.combsp_label.setObjectName(u"label") self.combsp_label.setGeometry(QRect(410, 16, 72, 15)) self.combsp_label.setText("波特率:") # 显示消息框 def show_msg(self,msg:str): m=QMessageBox(self.widget) m.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose) m.setText(msg) m.setWindowTitle("提示") m.show() # 找到已选择的文件 def get_selected_file_by_type(self,type:str): file_list=[] items=self.file_list.selectedItems() for i in items: if(i.text()[-len(type):]==type): file_list.append(i.text()) if(len(file_list)!=1): self.show_msg("请选择一个并且只选择一个 "+type+" 文件") return "" return file_list[0] # 获取已选择的文件列表 def get_selected_fils(self): file_list=[] items=self.file_list.selectedItems() if(len(items)==0): self.show_msg("请选择至少一个文件") return [] have_elf=False have_bin=False have_lua=False for i in items: if(i.text()[-4:]==".elf"): if(have_elf==True): self.show_msg("只可选择一个 .elf 文件") return [] have_elf=True if(i.text()[-4:]==".bin"): if(have_bin==True): self.show_msg("只可选择一个 .bin 文件") return [] have_bin=True if(i.text()[-4:]==".lua"): if(have_lua==True): self.show_msg("只可选择一个 .lua 文件") return [] have_lua=True file_list.append(i.text()) return file_list def item_clicked(self,item:QListWidgetItem ): print("item clicked.") print("slected item is",item.text()) def com_but_clicked(self): print("com but clicked") if(self.port_is_open==False): self.open_port() else: self.close_port() def com_changed(self,index:int): print("com changed") self.close_port() def get_cmd(self,file:str): l=[(".bin",0xee),(".pkt",0xed),(".json",0x32),(".jwt",0xec)] for i in l: if(file.endswith(i[0])): return i[1] return 0 # 发送文件按钮按下 def save_but_clicked(self): print("save_but_clicked.") path = self.getpath()+"file\\" file_list=self.get_selected_fils() if(len(file_list)==0): return self.cmd=self.get_cmd(file_list[0]) print("cmd=",self.cmd) w=QDialog(self.widget) w.resize(703-150, 1*40+20) w.setWindowTitle("上传文件") w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose) self.handle_=prottcp.handle() if(file_list[0].endswith(".json")): self.handle_.set_json("file/"+file_list[0]) else: self.handle_.set_file("file/"+file_list[0]) d=self.handle_.get_data() try: self.port.send(self.cmd,d) except Exception as e: print("com not open") self.show_msg("端口未打开") del self.handle_ w.close() return self.creat_progress(w) w.show() # self.failed_signal.connect(w.close) # w.destroyed.connect(self.close_port) def updata_failed(self): self.show_msg("打开端口失败") # 创建进度条 def creat_progress(self,father:QDialog): self.probar_list=[] for i in range(1): p=progress_box() p.init(father,"发送文件",+i*40) self.rate_signal.connect(p.set_rate) self.probar_list.append(p) # 发送命令按钮按下 def cmd_but_clicked(self): print("cmd_but clicked.") file=self.get_selected_file_by_type(".bin") if(len(file)==0): return print("file:",file) w=QDialog(self.widget) w.resize(703-150, 1*40+20) w.setWindowTitle("升级mcu") self.updata_mcu(file) self.creat_progress(w) w.show() def sstate_but_clicked(self): print("sstate_but clicked.") try: self.port.send_str("sysinfo") except Exception as e: print("com not open") print(str(e)) def scheme_but_clicked(self): print("scheme_but clicked.") try: self.port.send_str("scheme") except Exception as e: print("com not open") print(str(e)) def updatas_but_clicked(self): print("updatas_but clicked.") try: self.port.send_str("updatas 1,2,3,4,5,6,7,8,9,10") self.show_msg("已发送升级指令,请留意小板升级情况") except Exception as e: self.show_msg("命令发送失败") print("com not open") print(str(e)) def download_but_clicked(self): sql=mysql.sql() if(sql.init("")): str_list=sql.show_tables() sel=select_list.select_list(self.widget,"选择文件目录",str_list) path=sel.show() # print("path:",path) if(len(path)<=0): print("not select any item,break.") return sql.table_name=path items=sql.items() str_list=[] for i in items: item_name=i[2].replace("\\","/") item_name=item_name.split("/")[-1] if(i[4]!=None): s=str(i[0])+"|"+i[1]+"|"+item_name+"|"+i[4] else: s=str(i[0])+"|"+i[1]+"|"+item_name # print(s) str_list.append((s,)) sel=select_list.select_list(self.widget,"选择文件",str_list) item=sel.show() # print("item:",item) if(len(item)<=0): print("not select any item,break.") return item=item.split("|") file_name=sql.download(int(item[0])) dst="file/"+file_name.split("/")[-1] shutil.copy(file_name,dst) self.scan_file() # 开始运行 def run(self): self.widget.show() sys.exit(self.app.exec()) def set_port_state(self,state:bool): self.port_is_open=state if(state==True): self.com_but.setText("关闭端口") else: self.com_but.setText("打开端口") # 扫描文件 def scan_file(self): self.file_list.clear() self.file_list.addItems(self.find_type([".bin",".json",".pkt",".jwt"])) # 扫描指定类型的文件 def find_type(self,types:list): path = self.getpath()+"file\\" if not os.path.exists(path): os.makedirs(path) list=os.listdir(path) file_list=[] for t in types: for i in list: if(len(t)>0): if(i[-len(t):]==t): file_list.append(i) else: file_list.append(i) return file_list # 获得文件绝对路径 def getpath(self): path=os.path.abspath(sys.argv[0]) list_str=path.split("\\") return path.replace(list_str[-1],"") # 调用此函数打开通信 def open_port(self): t = threading.Thread(target=self.com_thread, args=()) t.start() def com_thread(self): self.port=prottcp.protu() item=self.com.itemText(self.com.currentIndex()) bsp=self.combsp.itemText(self.combsp.currentIndex()) com=item.split(":")[0] if(com!="utcp"): item=com+":"+bsp print("item text:",item) if(self.port.init(item)==False): print("init port failed.") self.failed_signal.emit() self.set_port_state(False) return else: print("init port success.") self.set_port_state(True) self.port.recv_signal.connect(self.recv_slot) self.port.recv_str_signal.connect(self.recv_str_slot) self.port.start_recv() self.port.wait() def close_port(self): print("close port") self.set_port_state(False) try: self.port.close() self.port.recv_signal.disconnect(self.recv_slot) self.port.recv_str_signal.disconnect(self.recv_str_slot) except Exception as e: pass def recv_str_slot(self,cmd:int,txt:str,err:str): print("|-|",txt) with open("device_log.txt","a+") as f: f.write(txt+'\n') def recv_slot(self,cmd:int,data:bytearray,err:str): # print("recv:",cmd,data) if(self.cmd!=cmd): return try: data=self.handle_.get_data() if(len(data)>0): self.port.send(cmd,data) rate=self.handle_.get_rate() self.rate_signal.emit(rate) else: del self.handle_ except Exception as e: print(str(e)) # 开始升级mcu def updata_mcu(self,file): pass # 小板通信测试 def comm_test(self): pass def end_slot(self,ip,ack,err): # print(ip,ack,err) if(ack==False): self.show_msg(ip+":"+err) def rate_slot(self,rate): # print("rate signal:",ip,rate) self.rate_signal.emit(rate) def data_slot(self,ip,text): pass # 创建数据显示 def creat_databoxs(self,father:QDialog,data_list:list): self.datab_list=[] for i in range(len(data_list)): p=data_box() p.init(father,data_list[i][0],data_list[i][1],+i*50) self.datab_list.append(p) class locker(): def _get_lock(self): file_name = os.path.basename(__file__) # linux等平台依然使用标准的/var/run,其他nt等平台使用当前目录 if os.name == "posix": lock_file_name = f"/var/run/{file_name}.pid" else: lock_file_name = f"{file_name}.pid" self.fd = open(lock_file_name, "w") try: portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB) # 将当前进程号写入文件 # 如果获取不到锁上一步就已经异常了,所以不用担心覆盖 self.fd.writelines(str(os.getpid())) # 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件 self.fd.flush() except: print(f"{file_name} have another instance running.") exit(1) def __init__(self): self._get_lock() # 和fcntl有点区别,portalocker释放锁直接有unlock()方法 # 还是一样,其实并不需要在最后自己主动释放锁 def __del__(self): portalocker.unlock(self.fd) if __name__ == "__main__": lock=locker() dlg=updata_dlg() dlg.scan_file() dlg.run()