import sys import os import shutil from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * import threading import serial import serial.tools.list_ports import time import re UPDATA_PATH="/run/media/sda/updata" # UPDATA_PATH="/" class console_dlg(QObject): recv_str_signal = pyqtSignal([str]) def __init__(self,father:QDialog,title:str): QObject.__init__(self) self.ser=None self.cmd_list=[] self.ls_cmd=False self.file_list=[] self.w=QDialog(father) self.w.resize(1200,500) self.w.setWindowTitle(title) self.w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose) self.w.setWindowModality(Qt.WindowModality.ApplicationModal) self.com_init() self.text_init() self.com_but_init() self.runcmd_but_init() self.str_list=["ATK-MP157 login: root", "Booting fw image checker_m4.axf", "# ls", "root@ATK-MP157:", "removable disk", "cd: can't cd to "+UPDATA_PATH, "No such file or directory" ] def text_init(self): self.console_text = QTextBrowser(self.w) self.console_text.setObjectName(u"str_list") self.console_text.setGeometry(QRect(20, 70, 1150, 430)) self.console_text.setFrameShape(QFrame.Shape.Box) self.console_text.setMidLineWidth(1) self.console_text.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) def com_init(self): self.com = QComboBox(self.w) self.com.setObjectName(u"com") self.com.setGeometry(QRect(85, 10, 300, 25)) self.com.setEditable(True) ports_list = list(serial.tools.list_ports.comports()) for comport in ports_list: # print(comport.name,comport.description) self.com.addItem(comport.name+":"+comport.description) self.com.currentIndexChanged.connect(self.com_changed) self.com_label = QLabel(self.w) self.com_label.setObjectName(u"label") self.com_label.setGeometry(QRect(30, 16, 72, 15)) self.com_label.setText("COM口:") # 初始化打开端口按钮 def com_but_init(self): self.com_but = QPushButton(self.w) self.com_but.setObjectName(u"com_but") self.com_but.setGeometry(QRect(590, 10, 93, 28)) self.com_but.setText("打开端口") self.com_but.clicked.connect(self.com_but_clicked) # 初始化执行命令按钮 def runcmd_but_init(self): self.runcmd_but = QPushButton(self.w) self.runcmd_but.setObjectName(u"runcmd_but") self.runcmd_but.setGeometry(QRect(700, 10, 93, 28)) self.runcmd_but.setText("执行命令") self.runcmd_but.clicked.connect(self.start_send_cmds) def com_changed(self,index:int): print("com changed") if(self.ser!=None): self.ser.close() def com_but_clicked(self): print("com but clicked") item=self.com.itemText(self.com.currentIndex()) com=item.split(":")[0] if(self.ser==None): try: bsp=int(115200) self.ser = serial.Serial(port=com, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) # print(str(self.ser)) t = threading.Thread(target=self._recv, args=()) t.start() except Exception as e: print("err:",str(e)) self.ser=None else: self.ser.close() def _recv(self): self.com_but.setText("关闭端口") self.recv_str_signal.connect(self.item_append) while(self.ser!=None): try: data=self.ser.readline() except Exception as e: print("readline err:",str(e)) break try: self.recv_str_signal.emit(data.decode("utf-8").strip()) except Exception as e: pass self.ser=None self.recv_str_signal.disconnect(self.item_append) try: self.com_but.setText("打开端口") except Exception as e: pass def send_str(self,text:str): if(self.ser!=None): try: self.ser.write(text.encode("utf-8")+b"\r\n") except Exception as e: print(str(e)) # 添加文字到显示区 def item_append(self,text:str): index,ack=self.type_of_str(text) if(index==0): print("linux started.") # self.start_send_cmds() elif(index==1): print("app started.") if(index!=-1): if(index==2): # 文件列表 self.ls_cmd=True txt="" +text+ " " else: if(self.ls_cmd==True): txt=self.decode_file_list(text) else: txt=text try: self.console_text.append(txt) self.console_text.moveCursor(QTextCursor.MoveOperation.End) except Exception as e: print(str(e)) if(index==3) and (ack==True): # 准备好接收下一个指令 if(self.ls_cmd==True): self.ls_cmd=False self._pack_cmd_list() self.send_cmdlist() if(index==4): # 已识别到U盘 self._item_append_green_str("已识别到U盘.") if(index==5): # 打开目录失败 self._item_append_red_str("打开目录失败,其插入U盘并且在根目录建立 'updata' 文件夹.") self.cmd_list.clear() if(index==6): # 找不到相应文件 self._item_append_red_str("找不到需要的文件,请把相应文件复制到U盘中.") self.cmd_list.clear() def _clear_irc_color(self, string): pattern = r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))' return re.sub(pattern, '', string) def decode_file_list(self,file_list:str): s=self._clear_irc_color(file_list) s_list=s.split() for i in s_list: self._item_append_green_str('|----|'+i) self.file_list.append(i) # print(self.file_list) return "" # 查找字幕类型,如果末尾是'#'则返回True def type_of_str(self,text:str): # print("type",text) for i in range(len(self.str_list)): if(text.find(self.str_list[i])!=-1): ack=('#'[0]==text[-1]) # print(text,ack,i) return i,ack return -1,False def _item_append_green_str(self,text:str): txt="" +text+ " " try: self.console_text.append(txt) self.console_text.moveCursor(QTextCursor.MoveOperation.End) except Exception as e: print(str(e)) def _item_append_red_str(self,text:str): txt="" +text+ " " try: self.console_text.append(txt) self.console_text.moveCursor(QTextCursor.MoveOperation.End) except Exception as e: print(str(e)) def file_file_by_type(self,type:str): for i in self.file_list: if(i[-len(type):]==type): return i print("diff:",self.file_list,type) return "unknown" def start_send_cmds(self): self.cmd_list.clear() self.file_list.clear() self._item_append_green_str("开始发送命令行,请不要关闭窗口。") self.cmd_list.append("cd "+UPDATA_PATH) self.cmd_list.append("ls") self.send_cmdlist() # 组装命令列表 def _pack_cmd_list(self): self.cmd_list.append("systemctl stop atk-qtapp-start.service") self.cmd_list.append("mkdir /home/root/config") self.cmd_list.append("cp "+self.file_file_by_type(".elf")+" /usr/local/QDesktop-fb") self.cmd_list.append("chmod 777 /usr/local/QDesktop-fb") self.cmd_list.append("cp "+self.file_file_by_type(".bin")+" /home/root/config/checker_slave.bin") self.cmd_list.append("cp "+self.file_file_by_type("scheme.json")+" /home/root/config/checker_ye_cfg.json") self.cmd_list.append("cp cfg.json /home/root/config/cfg.json") self.cmd_list.append("cp "+self.file_file_by_type(".axf")+" /lib/firmware/checker_m4.axf") self.cmd_list.append("cp "+self.file_file_by_type(".dtb")+" /boot/stm32mp157d-atk.dtb") self.cmd_list.append("sync") self.cmd_list.append("systemctl restart atk-qtapp-start.service") def send_cmdlist(self): if(len(self.cmd_list)>0): self._item_append_green_str("发送下一个命令,还剩 "+str(len(self.cmd_list))) print("send cmd:",self.cmd_list[0]) self.send_str(self.cmd_list[0]) self.cmd_list.pop(0) else: self._item_append_green_str("命令行发送完成。") def show(self): self.w.exec() if(self.ser!=None): self.ser.close()