Files
python_tools/updata/console_uart.py

262 lines
9.7 KiB
Python
Raw Normal View History

2023-09-20 18:20:42 +08:00
import sys
import os
import shutil
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import threading
import serial
import serial.tools.list_ports
import time
import re
2023-09-20 18:20:42 +08:00
UPDATA_PATH="/run/media/sda/updata"
# UPDATA_PATH="/"
2023-09-20 18:20:42 +08:00
class console_dlg(QObject):
recv_str_signal = pyqtSignal([str])
def __init__(self,father:QDialog,title:str):
QObject.__init__(self)
self.ser=None
self.cmd_list=[]
self.ls_cmd=False
self.file_list=[]
2023-09-20 18:20:42 +08:00
self.w=QDialog(father)
self.w.resize(1200,500)
self.w.setWindowTitle(title)
self.w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.w.setWindowModality(Qt.WindowModality.ApplicationModal)
self.com_init()
self.text_init()
self.com_but_init()
self.runcmd_but_init()
self.str_list=["ATK-MP157 login: root",
"Booting fw image checker_m4.axf",
"# ls",
"root@ATK-MP157:",
"removable disk",
"cd: can't cd to "+UPDATA_PATH,
"No such file or directory"
]
2023-09-20 18:20:42 +08:00
def text_init(self):
self.console_text = QTextBrowser(self.w)
self.console_text.setObjectName(u"str_list")
self.console_text.setGeometry(QRect(20, 70, 1150, 430))
self.console_text.setFrameShape(QFrame.Shape.Box)
self.console_text.setMidLineWidth(1)
self.console_text.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
def com_init(self):
self.com = QComboBox(self.w)
self.com.setObjectName(u"com")
self.com.setGeometry(QRect(85, 10, 300, 25))
self.com.setEditable(True)
ports_list = list(serial.tools.list_ports.comports())
for comport in ports_list:
# print(comport.name,comport.description)
self.com.addItem(comport.name+":"+comport.description)
self.com.currentIndexChanged.connect(self.com_changed)
self.com_label = QLabel(self.w)
self.com_label.setObjectName(u"label")
self.com_label.setGeometry(QRect(30, 16, 72, 15))
self.com_label.setText("COM口:")
# 初始化打开端口按钮
def com_but_init(self):
self.com_but = QPushButton(self.w)
self.com_but.setObjectName(u"com_but")
self.com_but.setGeometry(QRect(590, 10, 93, 28))
self.com_but.setText("打开端口")
self.com_but.clicked.connect(self.com_but_clicked)
# 初始化执行命令按钮
def runcmd_but_init(self):
self.runcmd_but = QPushButton(self.w)
self.runcmd_but.setObjectName(u"runcmd_but")
self.runcmd_but.setGeometry(QRect(700, 10, 93, 28))
self.runcmd_but.setText("执行命令")
self.runcmd_but.clicked.connect(self.start_send_cmds)
def com_changed(self,index:int):
print("com changed")
if(self.ser!=None):
self.ser.close()
def com_but_clicked(self):
print("com but clicked")
item=self.com.itemText(self.com.currentIndex())
com=item.split(":")[0]
if(self.ser==None):
try:
bsp=int(115200)
self.ser = serial.Serial(port=com, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,timeout=None)
# print(str(self.ser))
2023-09-20 18:20:42 +08:00
t = threading.Thread(target=self._recv, args=())
t.start()
except Exception as e:
print("err:",str(e))
self.ser=None
else:
self.ser.close()
def _recv(self):
self.com_but.setText("关闭端口")
self.recv_str_signal.connect(self.item_append)
while(self.ser!=None):
try:
data=self.ser.readline()
except Exception as e:
print("readline err:",str(e))
break
try:
self.recv_str_signal.emit(data.decode("utf-8").strip())
except Exception as e:
pass
self.ser=None
self.recv_str_signal.disconnect(self.item_append)
try:
self.com_but.setText("打开端口")
except Exception as e:
pass
def send_str(self,text:str):
if(self.ser!=None):
try:
self.ser.write(text.encode("utf-8")+b"\r\n")
except Exception as e:
print(str(e))
# 添加文字到显示区
2023-09-20 18:20:42 +08:00
def item_append(self,text:str):
index,ack=self.type_of_str(text)
if(index==0):
print("linux started.")
# self.start_send_cmds()
2023-09-20 18:20:42 +08:00
elif(index==1):
print("app started.")
if(index!=-1):
if(index==2):
# 文件列表
self.ls_cmd=True
2023-09-20 18:20:42 +08:00
txt="<font color=blue>" +text+ "</font> <font color=black> </font>"
else:
if(self.ls_cmd==True):
txt=self.decode_file_list(text)
else:
txt=text
2023-09-20 18:20:42 +08:00
try:
self.console_text.append(txt)
self.console_text.moveCursor(QTextCursor.MoveOperation.End)
except Exception as e:
print(str(e))
if(index==3) and (ack==True):
2023-09-20 18:20:42 +08:00
# 准备好接收下一个指令
if(self.ls_cmd==True):
self.ls_cmd=False
self._pack_cmd_list()
2023-09-20 18:20:42 +08:00
self.send_cmdlist()
if(index==4):
# 已识别到U盘
self._item_append_green_str("已识别到U盘.")
if(index==5):
# 打开目录失败
2023-09-22 18:26:14 +08:00
self._item_append_red_str("打开目录失败,请插入U盘并且在根目录建立 'updata' 文件夹.")
self.cmd_list.clear()
if(index==6):
# 找不到相应文件
self._item_append_red_str("找不到需要的文件,请把相应文件复制到U盘中.")
self.cmd_list.clear()
def _clear_irc_color(self, string):
pattern = r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))'
return re.sub(pattern, '', string)
def decode_file_list(self,file_list:str):
s=self._clear_irc_color(file_list)
s_list=s.split()
for i in s_list:
self._item_append_green_str('|----|'+i)
self.file_list.append(i)
# print(self.file_list)
return ""
# 查找字幕类型,如果末尾是'#'则返回True
2023-09-20 18:20:42 +08:00
def type_of_str(self,text:str):
# print("type",text)
for i in range(len(self.str_list)):
if(text.find(self.str_list[i])!=-1):
ack=('#'[0]==text[-1])
# print(text,ack,i)
return i,ack
2023-09-20 18:20:42 +08:00
return -1,False
def _item_append_green_str(self,text:str):
txt="<font color=green>" +text+ "</font> <font color=black> </font>"
try:
self.console_text.append(txt)
self.console_text.moveCursor(QTextCursor.MoveOperation.End)
except Exception as e:
print(str(e))
def _item_append_red_str(self,text:str):
txt="<font color=red>" +text+ "</font> <font color=black> </font>"
try:
self.console_text.append(txt)
self.console_text.moveCursor(QTextCursor.MoveOperation.End)
except Exception as e:
print(str(e))
2023-09-22 18:26:14 +08:00
def find_file_by_type(self,type:str):
for i in self.file_list:
if(i[-len(type):]==type):
return i
print("diff:",self.file_list,type)
return "unknown"
2023-09-20 18:20:42 +08:00
def start_send_cmds(self):
self.cmd_list.clear()
self.file_list.clear()
2023-09-20 18:20:42 +08:00
self._item_append_green_str("开始发送命令行,请不要关闭窗口。")
self.cmd_list.append("cd "+UPDATA_PATH)
self.cmd_list.append("ls")
self.send_cmdlist()
# 组装命令列表
def _pack_cmd_list(self):
2023-09-20 18:20:42 +08:00
self.cmd_list.append("systemctl stop atk-qtapp-start.service")
2023-09-22 18:26:14 +08:00
self.cmd_list.append("systemctl stop pyeamon.service")
self.cmd_list.append("systemctl disable pyeamon.service")
2023-09-20 18:20:42 +08:00
self.cmd_list.append("mkdir /home/root/config")
2023-09-22 18:26:14 +08:00
self.cmd_list.append("cp "+self.find_file_by_type(".elf")+" /usr/local/QDesktop-fb")
2023-09-20 18:20:42 +08:00
self.cmd_list.append("chmod 777 /usr/local/QDesktop-fb")
2023-09-22 18:26:14 +08:00
self.cmd_list.append("cp "+self.find_file_by_type(".bin")+" /home/root/config/checker_slave.bin")
self.cmd_list.append("cp "+self.find_file_by_type("scheme.json")+" /home/root/config/checker_ye_cfg.json")
2023-09-20 18:20:42 +08:00
self.cmd_list.append("cp cfg.json /home/root/config/cfg.json")
self.cmd_list.append("cp "+self.find_file_by_type(".sh")+" /home/root/config/checker_app_pre.sh")
2023-09-22 18:26:14 +08:00
self.cmd_list.append("cp "+self.find_file_by_type(".axf")+" /lib/firmware/checker_m4.axf")
self.cmd_list.append("cp "+self.find_file_by_type(".dtb")+" /boot/stm32mp157d-atk.dtb")
self.cmd_list.append("cp "+self.find_file_by_type(".py")+ " /home/root/daemon.py")
self.cmd_list.append("cp "+self.find_file_by_type(".service")+ " /lib/systemd/system/pydeamon.service")
2023-09-20 18:20:42 +08:00
self.cmd_list.append("sync")
self.cmd_list.append("systemctl restart atk-qtapp-start.service")
2023-09-22 18:26:14 +08:00
self.cmd_list.append("systemctl enable pyeamon.service")
self.cmd_list.append("systemctl restart pyeamon.service")
2023-09-20 18:20:42 +08:00
def send_cmdlist(self):
if(len(self.cmd_list)>0):
self._item_append_green_str("发送下一个命令,还剩 "+str(len(self.cmd_list)))
print("send cmd:",self.cmd_list[0])
2023-09-20 18:20:42 +08:00
self.send_str(self.cmd_list[0])
self.cmd_list.pop(0)
else:
self._item_append_green_str("命令行发送完成。")
def show(self):
self.w.exec()
if(self.ser!=None):
self.ser.close()