import sys import os import shutil from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * import threading import serial import serial.tools.list_ports import time class console_dlg(QObject): recv_str_signal = pyqtSignal([str]) def __init__(self,father:QDialog,title:str): QObject.__init__(self) self.ser=None self.cmd_list=[] self.w=QDialog(father) self.w.resize(1200,500) self.w.setWindowTitle(title) self.w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose) self.w.setWindowModality(Qt.WindowModality.ApplicationModal) self.com_init() self.text_init() self.com_but_init() self.runcmd_but_init() def text_init(self): self.console_text = QTextBrowser(self.w) self.console_text.setObjectName(u"str_list") self.console_text.setGeometry(QRect(20, 70, 1150, 430)) self.console_text.setFrameShape(QFrame.Shape.Box) self.console_text.setMidLineWidth(1) self.console_text.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) def com_init(self): self.com = QComboBox(self.w) self.com.setObjectName(u"com") self.com.setGeometry(QRect(85, 10, 300, 25)) self.com.setEditable(True) ports_list = list(serial.tools.list_ports.comports()) for comport in ports_list: # print(comport.name,comport.description) self.com.addItem(comport.name+":"+comport.description) self.com.currentIndexChanged.connect(self.com_changed) self.com_label = QLabel(self.w) self.com_label.setObjectName(u"label") self.com_label.setGeometry(QRect(30, 16, 72, 15)) self.com_label.setText("COM口:") # 初始化打开端口按钮 def com_but_init(self): self.com_but = QPushButton(self.w) self.com_but.setObjectName(u"com_but") self.com_but.setGeometry(QRect(590, 10, 93, 28)) self.com_but.setText("打开端口") self.com_but.clicked.connect(self.com_but_clicked) # 初始化执行命令按钮 def runcmd_but_init(self): self.runcmd_but = QPushButton(self.w) self.runcmd_but.setObjectName(u"runcmd_but") self.runcmd_but.setGeometry(QRect(700, 10, 93, 28)) self.runcmd_but.setText("执行命令") self.runcmd_but.clicked.connect(self.start_send_cmds) def com_changed(self,index:int): print("com changed") if(self.ser!=None): self.ser.close() def com_but_clicked(self): print("com but clicked") item=self.com.itemText(self.com.currentIndex()) com=item.split(":")[0] if(self.ser==None): try: bsp=int(115200) self.ser = serial.Serial(port=com, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) print(str(self.ser)) t = threading.Thread(target=self._recv, args=()) t.start() except Exception as e: print("err:",str(e)) self.ser=None else: self.ser.close() def _recv(self): self.com_but.setText("关闭端口") self.recv_str_signal.connect(self.item_append) while(self.ser!=None): try: data=self.ser.readline() except Exception as e: print("readline err:",str(e)) break try: self.recv_str_signal.emit(data.decode("utf-8").strip()) except Exception as e: pass self.ser=None self.recv_str_signal.disconnect(self.item_append) try: self.com_but.setText("打开端口") except Exception as e: pass def send_str(self,text:str): if(self.ser!=None): try: self.ser.write(text.encode("utf-8")+b"\r\n") except Exception as e: print(str(e)) def item_append(self,text:str): index,ack=self.type_of_str(text) if(index==0): print("linux started.") self.start_send_cmds() elif(index==1): print("app started.") if(index!=-1): txt="" +text+ " " else: txt=text try: self.console_text.append(txt) self.console_text.moveCursor(QTextCursor.MoveOperation.End) except Exception as e: print(str(e)) if((index==2) or (index==3))and (ack==True): # 准备好接收下一个指令 self.send_cmdlist() # 查找字幕类型,如果完全相等则返回True def type_of_str(self,text:str): str_list=["ATK-MP157 login: root","Booting fw image checker_m4.axf","root@ATK-MP157:~#","root@ATK-MP157:/run/media/sda1/updata#","sda: sda1"] for i in range(len(str_list)): if(text.find(str_list[i])!=-1): if(str_list[i]==text): return i,True else: return i,False return -1,False def _item_append_green_str(self,text:str): txt="" +text+ " " try: self.console_text.append(txt) self.console_text.moveCursor(QTextCursor.MoveOperation.End) except Exception as e: print(str(e)) def start_send_cmds(self): self._item_append_green_str("开始发送命令行,请不要关闭窗口。") self.cmd_list.append("cd /run/media/sda1/updata") self.cmd_list.append("systemctl stop atk-qtapp-start.service") self.cmd_list.append("mkdir /home/root/config") self.cmd_list.append("cp checker_host.elf /usr/local/QDesktop-fb") self.cmd_list.append("chmod 777 /usr/local/QDesktop-fb") self.cmd_list.append("cp checker_slave_app_can.bin /home/root/config/checker_slave.bin") self.cmd_list.append("cp cfg.json /home/root/config/cfg.json") self.cmd_list.append("cp checker_m4.axf /lib/firmware/checker_m4.axf") self.cmd_list.append("cp stm32mp157d-atk.dtb /boot/stm32mp157d-atk.dtb") self.cmd_list.append("sync") self.cmd_list.append("systemctl restart atk-qtapp-start.service") self.send_cmdlist() def send_cmdlist(self): if(len(self.cmd_list)>0): self._item_append_green_str("发送下一个命令,还剩 "+str(len(self.cmd_list))) self.send_str(self.cmd_list[0]) self.cmd_list.pop(0) else: self._item_append_green_str("命令行发送完成。") def show(self): self.w.exec() if(self.ser!=None): self.ser.close()