import udp import ftp import sys import os import shutil from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * import threading import time import portalocker import encodings.idna import base64 import memory_pic import ctypes ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID("myappid") STR_RED="\033[1;31m" STR_BLUE="\033[1;34m" STR_END="\033[0m" class progress_box(QObject): rate_signal =pyqtSignal([int]) def __init__(self) -> None: QObject.__init__(self) def init(self,father:QDialog,text:str,x:int): # print("text=",text) self.lable = QLabel(father) self.lable.setObjectName(u"label") self.lable.setGeometry(QRect(40, x, 120, 15)) self.lable.setText(text) self.stat = QLabel(father) self.stat.setObjectName(u"label") self.stat.setGeometry(QRect(500, x+20, 120, 15)) self.stat.setText("进行中...") self.p=QProgressBar(father) self.p.setValue(0) self.p.setGeometry(QRect(40,x+20,450,20)) self.rate_signal.connect(self.p.setValue) def set_rate(self,text:str,rate:int): if(text==self.lable.text()): # print(text) self.rate_signal.emit(rate) def set_txt(self,text:str,ack:bool,stat:int): if(text==self.lable.text()): self.stat.setText(stat) class data_box(QObject): def __init__(self) -> None: QObject.__init__(self) def init(self,father:QDialog,title:str,text:str,x:int): # print("text=",text) self.lable = QLabel(father) self.lable.setObjectName(u"label") self.lable.setGeometry(QRect(40, x, 120, 20)) self.lable.setText(title) self.text = QLabel(father) self.text.setObjectName(u"text") self.text.setGeometry(QRect(40,x+20,450,30)) self.text.setText(text) class updata_dlg(QObject): # 进度信号,ip,1~100 rate_signal =pyqtSignal([str,int]) # 结束信号,ip,成败,描述 end_signal = pyqtSignal([str,bool,str]) def __init__(self): QObject.__init__(self) self.app = QApplication(sys.argv) self.widget = QWidget() self.widget.resize(703, 409) self.widget.setWindowTitle("批检仪程序升级") self.widget.setWindowFlags(Qt.WindowStaysOnTopHint) self.file_list_init() self.slave_list_init() self.save_but_init() self.cmd_but_init() self.refresh_but_init() self.sstate_but_init() self.hand_but_init() self.addfile_but_init() self.ip_prefix_init() self.ip_hand_init() self.widget.destroyed.connect(self.quit) self.scan_file() self.scan_slave() Logo = QPixmap() Logo.loadFromData(base64.b64decode(memory_pic.icon_ico)) icon = QIcon() icon.addPixmap(Logo, QIcon.Normal, QIcon.Off) self.widget.setWindowIcon(icon) def quit(self): # 程序退出 qApp.exit(1) # 初始化文件列表 def file_list_init(self): self.file_list = QListWidget(self.widget) self.file_list.setObjectName(u"file_list") self.file_list.setGeometry(QRect(25, 250, 531, 141)) self.file_list.setFrameShape(QFrame.Box) self.file_list.setMidLineWidth(1) self.file_list.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) self.file_list.setSelectionMode(QAbstractItemView.MultiSelection) self.file_list_label = QLabel(self.widget) self.file_list_label.setObjectName(u"label") self.file_list_label.setGeometry(QRect(30, 230, 200, 15)) self.file_list_label.setText("默认文件列表:") # 初始化从机列表 def slave_list_init(self): self.slave_list = QListWidget(self.widget) self.slave_list.setObjectName(u"slave_list") self.slave_list.setGeometry(QRect(25, 80, 531, 141)) self.slave_list.setFrameShape(QFrame.Box) self.slave_list.setMidLineWidth(1) self.slave_list.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) self.slave_list.setSelectionMode(QAbstractItemView.MultiSelection) self.slave_list.doubleClicked.connect(self.slave_list.editItem) self.slave_list_label = QLabel(self.widget) self.slave_list_label.setObjectName(u"label") self.slave_list_label.setGeometry(QRect(30, 60, 200, 15)) self.slave_list_label.setText("在线主板列表:") # 初始化手动添加IP按钮 def hand_but_init(self): self.hand_but = QPushButton(self.widget) self.hand_but.setObjectName(u"hand_but") self.hand_but.setGeometry(QRect(590, 60, 93, 28)) self.hand_but.setText("手动添加IP") self.hand_but.clicked.connect(self.hand_but_clicked) # 初始化发送文件按钮 def save_but_init(self): self.save_but = QPushButton(self.widget) self.save_but.setObjectName(u"save_but") self.save_but.setGeometry(QRect(590, 100, 93, 28)) self.save_but.setText("发送文件") self.save_but.clicked.connect(self.save_but_clicked) # 初始化发送命令按钮 def cmd_but_init(self): self.cmd_but = QPushButton(self.widget) self.cmd_but.setObjectName(u"save_but") self.cmd_but.setGeometry(QRect(590, 140, 93, 28)) self.cmd_but.setText("升级MCU") self.cmd_but.clicked.connect(self.cmd_but_clicked) # 初始化刷新按钮 def refresh_but_init(self): self.refresh_but = QPushButton(self.widget) self.refresh_but.setObjectName(u"save_but") self.refresh_but.setGeometry(QRect(590, 180, 93, 28)) self.refresh_but.setText("刷新IP地址") self.refresh_but.clicked.connect(self.refresh_but_clicked) # 初始化在线状态按钮 def sstate_but_init(self): self.sstate_but = QPushButton(self.widget) self.sstate_but.setObjectName(u"sstate_but") self.sstate_but.setGeometry(QRect(590, 220, 93, 28)) self.sstate_but.setText("MCU在线状态") self.sstate_but.clicked.connect(self.sstate_but_clicked) # 初始化添加文件按钮 def addfile_but_init(self): self.addfile_but = QPushButton(self.widget) self.addfile_but.setObjectName(u"addfile_but") self.addfile_but.setGeometry(QRect(590, 260, 93, 28)) self.addfile_but.setText("添加文件") self.addfile_but.clicked.connect(self.addfile_but_clicked) # ip前缀 def ip_prefix_init(self): self.ip_prefix = QLineEdit(self.widget) self.ip_prefix.setObjectName(u"ip_prefix") self.ip_prefix.setGeometry(QRect(30, 30, 113, 21)) self.ip_prefix.setText("192.168.80") self.ip_prefix_label = QLabel(self.widget) self.ip_prefix_label.setObjectName(u"label") self.ip_prefix_label.setGeometry(QRect(30, 10, 72, 15)) self.ip_prefix_label.setText("IP前缀") # 手动添加ip def ip_hand_init(self): self.ip_hand = QLineEdit(self.widget) self.ip_hand.setObjectName(u"ip_hand") self.ip_hand.setGeometry(QRect(300, 30, 120, 21)) ip=self.ip_prefix.text() self.ip_hand.setText(ip+".81") self.ip_hand_label = QLabel(self.widget) self.ip_hand_label.setObjectName(u"ip_hand_label") self.ip_hand_label.setGeometry(QRect(300, 10, 120, 15)) self.ip_hand_label.setText("手动添加IP地址") # 显示消息框 def show_msg(self,msg:str): m=QMessageBox(self.widget) m.setText(msg) m.setWindowTitle("提示") m.show() # 找到已选择的文件 def get_selected_file_by_type(self,type:str): file_list=[] items=self.file_list.selectedItems() for i in items: if(i.text()[-len(type):]==type): file_list.append(i.text()) if(len(file_list)!=1): self.show_msg("请选择一个并且只选择一个 "+type+" 文件") return "" return file_list[0] # 获取已选择的文件列表 def get_selected_fils(self): file_list=[] items=self.file_list.selectedItems() if(len(items)==0): self.show_msg("请选择至少一个文件") return [] have_elf=False have_bin=False have_lua=False for i in items: if(i.text()[-4:]==".elf"): if(have_elf==True): self.show_msg("只可选择一个 .elf 文件") return [] have_elf=True if(i.text()[-4:]==".bin"): if(have_bin==True): self.show_msg("只可选择一个 .bin 文件") return [] have_bin=True if(i.text()[-4:]==".lua"): if(have_lua==True): self.show_msg("只可选择一个 .lua 文件") return [] have_lua=True file_list.append(i.text()) return file_list def get_selected_slave(self): slave_list=[] items=self.slave_list.selectedItems() if(len(items)==0): self.show_msg("请选择至少一个ip地址") return [] for i in items: str_list=i.text().split(",") slave_list.append((str_list[0],str_list[1])) return slave_list # 根据文件列表生成目标列表 def build_dst_list(self,file_list): dst_list=[] for i in file_list: if(i[-4:]==".elf"): dst_list.append("/usr/local/QDesktop-fb") elif(i[-4:]==".dtb"): dst_list.append("/run/media/mmcblk2p2/"+i) elif(i.find("checker_ye_cfg")>=0): dst_list.append("/home/root/config/"+"checker_ye_cfg.json") elif(i[-4:]==".lua"): dst_list.append("/home/root/config/"+"judge.lua") else: dst_list.append("/home/root/config/"+i) return dst_list def hand_but_clicked(self): print("hand_but clicked.") ip=self.ip_hand.text() self.slave_list.addItem(ip+",local_id_0") # 发送文件按钮按下 def save_but_clicked(self): print("save_but_clicked.") path = self.getpath()+"file\\" file_list=self.get_selected_fils() if(len(file_list)==0): return dst_list=self.build_dst_list(file_list) for i in range(len(file_list)): file_list[i]=path+file_list[i] for i in zip(file_list,dst_list): print("file:",i[0],i[1]) slave_list=self.get_selected_slave() if(len(slave_list)==0): return print("slaves:",slave_list) w=QDialog(self.widget) w.resize(703-100, len(slave_list)*40+20) w.setWindowTitle("上传文件") self.updata(slave_list,dst_list,file_list) self.creat_progress(w,slave_list) w.show() # 创建进度条 def creat_progress(self,father:QDialog,ip_list:list): self.probar_list=[] for i in range(len(ip_list)): p=progress_box() p.init(father,ip_list[i][0],+i*40) self.rate_signal.connect(p.set_rate) self.end_signal.connect(p.set_txt) self.probar_list.append(p) # 发送命令按钮按下 def cmd_but_clicked(self): print("cmd_but clicked.") slave_list=self.get_selected_slave() if(len(slave_list)==0): return print("slaves:",slave_list) file=self.get_selected_file_by_type(".bin") if(len(file)==0): return print("file:",file) w=QDialog(self.widget) w.resize(703-100, len(slave_list)*40+20) w.setWindowTitle("升级mcu") self.updata_mcu(slave_list,file) self.creat_progress(w,slave_list) w.show() def refresh_but_clicked(self): print("refresh_but clicked.") self.slave_list.clear() self.scan_slave() def sstate_but_clicked(self): print("sstate_but clicked.") slave_list=self.get_selected_slave() if(len(slave_list)==0): return self.data_list=[] self.comm_test(slave_list) def addfile_but_clicked(self): print("addfile_but clicked") fileName,fileType = QFileDialog.getOpenFileNames(None, "选取文件", os.getcwd(), "主板程序(*.elf);;小板程序(*.bin);;检测方案(*.json);;判定脚本(*.lua);;任意文件(*)") print(fileName,fileType) path=self.getpath()+"file\\" for i in fileName: shutil.copy(i,path+i.split("/")[-1]) self.scan_file() # 开始运行 def run(self): self.widget.show() sys.exit(self.app.exec()) # 扫描文件 def scan_file(self): self.file_list.clear() self.file_list.addItems(self.find_type([".sh",".elf",".bin",".lua",".json",".dtb"])) # 扫描从机 def scan_slave(self): u=udp.myudp(1,255) ip_prefix=self.ip_prefix.text() u.find(ip_prefix) ip_list=u.dst_ip_list if(len(ip_list)==0): ret=QMessageBox.question(self.widget,"提示","未扫描到从机,是否自动添加?",QMessageBox.Yes|QMessageBox.No) if(ret==QMessageBox.Yes): ip_list.append((ip_prefix+".81","local_id_1")) ip_list.append((ip_prefix+".82","local_id_2")) ip_list.append((ip_prefix+".83","local_id_3")) ip_list.append((ip_prefix+".84","local_id_4")) ip_list.append((ip_prefix+".85","local_id_5")) ip_list.append((ip_prefix+".86","local_id_6")) ip_list.append((ip_prefix+".87","local_id_7")) ip_list.append((ip_prefix+".88","local_id_8")) list_str=[] for i in u.dst_ip_list: list_str.append(i[0]+','+i[1]) self.slave_list.addItems(list_str) # 扫描指定类型的文件 def find_type(self,types:list): path = self.getpath()+"file\\" if not os.path.exists(path): os.makedirs(path) list=os.listdir(path) file_list=[] for t in types: for i in list: if(len(t)>0): if(i[-len(t):]==t): file_list.append(i) else: file_list.append(i) return file_list # 获得文件绝对路径 def getpath(self): path=os.path.abspath(sys.argv[0]) list_str=path.split("\\") return path.replace(list_str[-1],"") # 调用此函数开始升级 def updata(self,ip_list,dst_list,src_list): t = threading.Thread(target=self.updata_thread, args=(ip_list,dst_list,src_list)) # t = threading.Thread(target=self.thread_test, args=(ip_list,dst_list,src_list)) t.start() # 开始升级mcu def updata_mcu(self,ip_list,file): u=udp.myudp(1,255) u.dst_ip_list=ip_list u.rate_signal.connect(self.rate_slot) u.end_signal.connect(self.end_slot) updata_cmd="mcu updata 1,2,3,4,5,6,7,8,9,10 " updata_cmd+="/home/root/config/"+file cmd_list=[] cmd_list.append((updata_cmd,1,900)) t = threading.Thread(target=u.bordcast, args=(cmd_list,)) t.start() # 小板通信测试 def comm_test(self,ip_list): u=udp.myudp(1,255) u.dst_ip_list=ip_list # u.rate_signal.connect(self.rate_slot) # u.end_signal.connect(self.end_slot) u.data_signal.connect(self.data_slot) updata_cmd="mcu comm_test" cmd_list=[] cmd_list.append((updata_cmd,2,5))# 两次返回,五秒超时 t = threading.Thread(target=u.bordcast, args=(cmd_list,)) t.start() # 测试线程 def thread_test(self,ip_list,dst_list,src_list): for i in range(100): for ip in ip_list: self.rate_slot(ip[0],i+1) time.sleep(0.02) def updata_thread(self,ip_list,dst_list,src_list): threads=[] for i in ip_list: t = threading.Thread(target=self.save_file, args=(i[0],True,dst_list ,src_list)) threads.append(t) t.start() #等待所有线程任务结束。 for t in threads: t.join() def end_slot(self,ip,ack,err): # print(ip,ack,err) if(ack==False): self.show_msg(ip+":"+err) else: self.end_signal.emit(ip,True,"完成") def rate_slot(self,ip,rate): # print("rate signal:",ip,rate) self.rate_signal.emit(ip,rate) def data_slot(self,ip,text): self.data_list.append((ip,text)) # 全部返回完时显示数据 slave_len=len(self.get_selected_slave()) if(len(self.data_list)>=slave_len): w=QDialog(self.widget) w.resize(703-150, slave_len*50+20) w.setWindowTitle("返回数据展示") self.creat_databoxs(w,self.data_list) w.show() # 创建数据显示 def creat_databoxs(self,father:QDialog,data_list:list): self.datab_list=[] for i in range(len(data_list)): p=data_box() p.init(father,data_list[i][0],data_list[i][1],+i*50) self.datab_list.append(p) def save_file(self,ip,restart:bool,dst_list,src_list): try: f=ftp.myftp(ip,"root","") # f.end_signal.connect(self.end_slot) f.rate_signal.connect(self.rate_slot) if(restart==True): print(ip,"|stop app.") f.send_cmd("systemctl stop atk-qtapp-start.service") f.put_file(ip,dst_list,src_list) if(restart==True): print(ip,"|start app.") f.send_cmd("systemctl restart atk-qtapp-start.service") f.send_cmd("sync",sleep_s=3) f.close() self.end_signal.emit(ip,True,"完成") # print(ip,"|save file end.") except Exception as r: err_str=str(r) print(STR_RED,ip,"|"+"try exception "+err_str,STR_END) # self.end_slot(ip,False,err_str) class locker(): def _get_lock(self): file_name = os.path.basename(__file__) # linux等平台依然使用标准的/var/run,其他nt等平台使用当前目录 if os.name == "posix": lock_file_name = f"/var/run/{file_name}.pid" else: lock_file_name = f"{file_name}.pid" self.fd = open(lock_file_name, "w") try: portalocker.lock(self.fd, portalocker.LOCK_EX | portalocker.LOCK_NB) # 将当前进程号写入文件 # 如果获取不到锁上一步就已经异常了,所以不用担心覆盖 self.fd.writelines(str(os.getpid())) # 写入的数据太少,默认会先被放在缓冲区,我们强制同步写入到文件 self.fd.flush() except: print(f"{file_name} have another instance running.") exit(1) def __init__(self): self._get_lock() # 和fcntl有点区别,portalocker释放锁直接有unlock()方法 # 还是一样,其实并不需要在最后自己主动释放锁 def __del__(self): portalocker.unlock(self.fd) if __name__ == "__main__": lock=locker() dlg=updata_dlg() dlg.run()