批检仪赋码工具根据设备类型禁用按钮

守护进程添加升级小板功能,升级时蜂鸣器提示
This commit is contained in:
ranchuan
2023-10-18 18:23:45 +08:00
parent 67f240b109
commit 6fa4ae47f0
4 changed files with 201 additions and 39 deletions

View File

@@ -6,6 +6,8 @@ import shutil
import threading
import re
import time
import socket
import json
@@ -13,18 +15,87 @@ import time
# 日志文件路径
log_filepath = '/home/root/log/daemon_log.txt'
# 定义udp操作
class myudp(object):
def __init__(self):
self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
local_addr = ("", 0)
self.udp.bind(local_addr)
self.udp.settimeout(10)
def send_cmd(self,text:str):
send_data = text+"\n"
self.udp.sendto(send_data.encode("utf-8"), ("127.0.0.1", 7777))
while True:
recv=self.recv()
print(recv)
if(recv==None) or (recv[:4]=="ack:"):
return recv
def recv(self):
try:
recv_data = self.udp.recvfrom(1024)
recv_msg = recv_data[0]
send_addr = recv_data[1]
# print ("收到的信息为:", recv_msg.decode("utf-8"))
text=recv_msg.decode("utf-8")
return text
except Exception as err:
print("err:",str(err))
return None
def close(self):
# 5.关闭套接字
self.udp.close()
# 定义蜂鸣器
class beep(object):
def __init__(self) -> None:
try:
# self.bee=open("/sys/class/leds/user-led/brightness","w+")
self.bee=open("/sys/class/leds/beep/brightness","w+")
except Exception as err:
write_info("beep init err:"+str(err))
def power(self,p:bool):
if(self.bee!=None):
if(p==True):
self.bee.write("1")
else:
self.bee.write("0")
self.bee.flush()
def close(self):
if(self.bee!=None):
self.power(False)
self.bee.close()
# 配置信息读取
class cfg(object):
def __init__(self) -> None:
try:
file=open("/home/root/config/cfg.json","rb")
self.json_obj=json.loads(file.read())
file.close()
except Exception as err:
print(str(err))
def slave_addrs(self):
addrs=""
if(self.json_obj!=None):
num=self.json_obj["slave_num"]
for i in range(num):
addrs+=str(i+1)+','
return addrs[:-1]
def write_info(text:str):
fm = '%Y-%m-%d %X'
nowtime = time.strftime(fm, time.localtime())
@@ -45,13 +116,20 @@ class auto_updata(object):
# self.file_path=self.sd_path+'/updata'
self.file_path=self.sd_path
self.sd_inserd_state = False
self.beep_power=False
self.file_list=[]
def sd_check(self):
ack=os.path.exists(self.sd_path)
if(ack!=self.sd_inserd_state):
if(ack==True):
write_info("sd inserd.")
self.copy_file()
if(self.copy_file()==True):
time.sleep(5)
self.beep_tip(True)
self.updata_slave()
self.beep_tip(False)
time.sleep(2.3)
self.beep_end()
else:
write_info("sd extracted.")
self.sd_inserd_state=ack
@@ -66,22 +144,72 @@ class auto_updata(object):
write_info("|---| "+i)
_do_cmd("systemctl stop atk-qtapp-start.service")
_do_cmd("mkdir /home/root/config")
_do_cmd("cp "+self.find_file_by_type(".elf")+" /usr/local/QDesktop-fb")
_do_cmd("cp "+self.find_file_by_type([".elf"])+" /usr/local/QDesktop-fb")
_do_cmd("chmod 777 /usr/local/QDesktop-fb")
_do_cmd("cp "+self.find_file_by_type(".bin")+" /home/root/config/checker_slave.bin")
_do_cmd("cp "+self.find_file_by_type([".bin"])+" /home/root/config/checker_slave.bin")
_do_cmd("cp "+self.find_file_by_type([".pkt"])+" /home/root/config/checker_slave.pkt")
_do_cmd("cp "+self.find_file_by_type("scheme.json")+" /home/root/config/checker_ye_cfg.json")
_do_cmd("cp "+self.file_path+"/cfg.json /home/root/config/cfg.json")
_do_cmd("cp "+self.find_file_by_type(".axf")+" /lib/firmware/checker_m4.axf")
# _do_cmd("cp "+self.find_file_by_type(".dtb")+" /boot/stm32mp157d-atk.dtb")
_do_cmd("sync")
_do_cmd("systemctl restart atk-qtapp-start.service")
write_info("autoupdata end.")
write_info("copy file end.")
return True
def find_file_by_type(self,type:str):
def find_file_by_type(self,types:list):
for i in self.file_list:
if(i[-len(type):]==type):
sp=i.split(".")[-1]
sp='.'+sp
if(sp in types):
return self.file_path+'/'+i
return "unknown"
def updata_slave(self):
cfg_f=cfg()
addrs=cfg_f.slave_addrs()
write_info("slave addrs:"+addrs)
slave_file=self.find_file_by_type([".bin"])
if(slave_file!="unknown"):
write_info("updata slave:"+slave_file)
u=myudp()
cmd="mcu updata "+addrs+" /home/root/config/checker_slave.bin"
a=u.send_cmd(cmd)
if(a!=None):
write_info("|--| "+a)
slave_file=self.find_file_by_type([".pkt"])
if(slave_file!="unknown"):
write_info("updata slave:"+slave_file)
u=myudp()
cmd="mcu updata "+addrs+" /home/root/config/checker_slave.pkt"
a=u.send_cmd(cmd)
if(a!=None):
write_info("|--| "+a)
# 升级完成时的连续三声提示
def beep_end(self):
b=beep()
for i in range(3):
b.power(True)
time.sleep(0.05)
b.power(False)
time.sleep(0.2)
b.close()
# 升级时的蜂鸣器提示
def beep_tip(self,power:bool):
if(power==True) and (self.beep_power!=True):
self.beep_power=power
t = threading.Thread(target=self._beep_run, args=())
t.start()
elif(power==False):
self.beep_power=power
def _beep_run(self):
b=beep()
while(self.beep_power==True):
b.power(True)
time.sleep(0.05)
b.power(False)
time.sleep(1.95)
b.close()
if __name__ == '__main__':
dir_name=os.path.dirname(log_filepath)
@@ -91,8 +219,16 @@ if __name__ == '__main__':
while True:
updata.sd_check()
time.sleep(5)
# u=myudp()
# if(len(sys.argv)>=2):
# cmd=sys.argv[1]
# else:
# cmd="whos"
# updata.beep_tip(True)
# u.send_cmd(cmd)
# updata.beep_tip(False)
# time.sleep(5)
# updata.beep_end()
# if __name__ == "__main__":