266 lines
7.6 KiB
Python
266 lines
7.6 KiB
Python
#!/usr/bin/python3
|
|
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import threading
|
|
import re
|
|
import time
|
|
import socket
|
|
import json
|
|
|
|
|
|
|
|
# 定义守护进程
|
|
# 2023.10.19
|
|
# 检测到插入sd卡时快速响5声
|
|
# 升级主板完成后连续响2声
|
|
# 小板升级完成后连续响3声
|
|
|
|
|
|
# 日志文件路径
|
|
log_filepath = '/home/root/log/daemon_log.txt'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 定义udp操作
|
|
class myudp(object):
|
|
def __init__(self):
|
|
self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
local_addr = ("", 0)
|
|
self.udp.bind(local_addr)
|
|
self.udp.settimeout(10)
|
|
|
|
def send_cmd(self,text:str):
|
|
send_data = text+"\n"
|
|
self.udp.sendto(send_data.encode("utf-8"), ("127.0.0.1", 7777))
|
|
while True:
|
|
recv=self.recv()
|
|
print(recv)
|
|
if(recv==None) or (recv[:4]=="ack:"):
|
|
return recv
|
|
|
|
def recv(self):
|
|
try:
|
|
recv_data = self.udp.recvfrom(1024)
|
|
recv_msg = recv_data[0]
|
|
send_addr = recv_data[1]
|
|
# print ("收到的信息为:", recv_msg.decode("utf-8"))
|
|
text=recv_msg.decode("utf-8")
|
|
return text
|
|
except Exception as err:
|
|
print("err:",str(err))
|
|
return None
|
|
def close(self):
|
|
# 5.关闭套接字
|
|
self.udp.close()
|
|
|
|
|
|
|
|
# 定义蜂鸣器
|
|
class beep(object):
|
|
def __init__(self) -> None:
|
|
try:
|
|
# self.bee=open("/sys/class/leds/user-led/brightness","w+")
|
|
self.bee=open("/sys/class/leds/beep/brightness","w+")
|
|
except Exception as err:
|
|
write_info("beep init err:"+str(err))
|
|
def power(self,p:bool):
|
|
if(self.bee!=None):
|
|
if(p==True):
|
|
self.bee.write("1")
|
|
else:
|
|
self.bee.write("0")
|
|
self.bee.flush()
|
|
def close(self):
|
|
if(self.bee!=None):
|
|
self.power(False)
|
|
self.bee.close()
|
|
|
|
|
|
|
|
# 配置信息读取
|
|
class cfg(object):
|
|
def __init__(self) -> None:
|
|
try:
|
|
file=open("/home/root/config/cfg.json","rb")
|
|
self.json_obj=json.loads(file.read())
|
|
file.close()
|
|
except Exception as err:
|
|
print(str(err))
|
|
def slave_addrs(self):
|
|
addrs=""
|
|
if(self.json_obj!=None):
|
|
num=self.json_obj["slave_num"]
|
|
for i in range(num):
|
|
addrs+=str(i+1)+','
|
|
return addrs[:-1]
|
|
return addrs
|
|
|
|
def write_info(text:str):
|
|
fm = '%Y-%m-%d %X'
|
|
nowtime = time.strftime(fm, time.localtime())
|
|
with open(log_filepath, 'a') as fp:
|
|
fp.write(nowtime+ " | "+text+"\n")
|
|
print(text)
|
|
def _do_cmd(cmd:str):
|
|
write_info(cmd)
|
|
ret = os.popen(cmd).readlines()
|
|
for i in range(len(ret)):
|
|
ret[i]=ret[i].strip()
|
|
write_info(ret[i])
|
|
return ret
|
|
|
|
class auto_updata(object):
|
|
def __init__(self):
|
|
self.sd_path="/run/media/sda/updata"
|
|
# self.file_path=self.sd_path+'/updata'
|
|
self.file_path=self.sd_path
|
|
self.sd_inserd_state = False
|
|
self.beep_power=False
|
|
self.file_list=[]
|
|
def sd_check(self):
|
|
ack=os.path.exists(self.sd_path)
|
|
if(ack!=self.sd_inserd_state):
|
|
if(ack==True):
|
|
write_info("sd inserd.")
|
|
self.beep_insert()
|
|
if(self.copy_file()==True):
|
|
time.sleep(5)
|
|
self.beep_tip(True)
|
|
self.updata_slave()
|
|
self.beep_tip(False)
|
|
time.sleep(2.3)
|
|
self.beep_end()
|
|
else:
|
|
write_info("sd extracted.")
|
|
self.sd_inserd_state=ack
|
|
def copy_file(self):
|
|
self.file_list.clear()
|
|
try:
|
|
self.file_list=os.listdir(self.file_path)
|
|
except Exception as e:
|
|
write_info(str(e))
|
|
return False
|
|
for i in self.file_list:
|
|
write_info("|---| "+i)
|
|
_do_cmd("systemctl stop atk-qtapp-start.service")
|
|
_do_cmd("mkdir /home/root/config")
|
|
_do_cmd("cp "+self.find_file_by_type([".elf"])+" /usr/local/QDesktop-fb")
|
|
_do_cmd("chmod 777 /usr/local/QDesktop-fb")
|
|
_do_cmd("cp "+self.find_file_by_type([".bin"])+" /home/root/config/checker_slave.bin")
|
|
_do_cmd("cp "+self.find_file_by_type([".pkt"])+" /home/root/config/checker_slave.pkt")
|
|
_do_cmd("cp "+self.find_file_by_type([".json"])+" /home/root/config/checker_ye_cfg.json")
|
|
_do_cmd("cp "+self.file_path+"/cfg.json /home/root/config/cfg.json")
|
|
_do_cmd("cp "+self.find_file_by_type([".axf"])+" /lib/firmware/checker_m4.axf")
|
|
# _do_cmd("cp "+self.find_file_by_type(".dtb")+" /boot/stm32mp157d-atk.dtb")
|
|
_do_cmd("sync")
|
|
_do_cmd("systemctl restart atk-qtapp-start.service")
|
|
write_info("copy file end.")
|
|
return True
|
|
def find_file_by_type(self,types:list):
|
|
for i in self.file_list:
|
|
sp=i.split(".")[-1]
|
|
sp='.'+sp
|
|
if(i!="cfg.json"):
|
|
if(sp in types):
|
|
return self.file_path+'/'+i
|
|
return "unknown"
|
|
def updata_slave(self):
|
|
cfg_f=cfg()
|
|
addrs=cfg_f.slave_addrs()
|
|
write_info("slave addrs:"+addrs)
|
|
slave_file=self.find_file_by_type([".bin"])
|
|
if(slave_file!="unknown"):
|
|
write_info("updata slave:"+slave_file)
|
|
u=myudp()
|
|
cmd="mcu updata "+addrs+" /home/root/config/checker_slave.bin"
|
|
a=u.send_cmd(cmd)
|
|
if(a!=None):
|
|
write_info("|--| "+a)
|
|
slave_file=self.find_file_by_type([".pkt"])
|
|
if(slave_file!="unknown"):
|
|
write_info("updata slave:"+slave_file)
|
|
u=myudp()
|
|
cmd="mcu updata "+addrs+" /home/root/config/checker_slave.pkt"
|
|
a=u.send_cmd(cmd)
|
|
if(a!=None):
|
|
write_info("|--| "+a)
|
|
slave_file=self.find_file_by_type([".json"])
|
|
if(slave_file!="unknown"):
|
|
time.sleep(5)
|
|
write_info("updata scheme:"+slave_file)
|
|
u=myudp()
|
|
cmd="mcu scheme "+addrs
|
|
a=u.send_cmd(cmd)
|
|
if(a!=None):
|
|
write_info("|--| "+a)
|
|
|
|
|
|
# 升级完成时的连续三声提示
|
|
def beep_end(self):
|
|
b=beep()
|
|
for i in range(3):
|
|
b.power(True)
|
|
time.sleep(0.05)
|
|
b.power(False)
|
|
time.sleep(0.2)
|
|
b.close()
|
|
# 检测到sd卡插入时连续响5声
|
|
def beep_insert(self):
|
|
b=beep()
|
|
for i in range(5):
|
|
b.power(True)
|
|
time.sleep(0.05)
|
|
b.power(False)
|
|
time.sleep(0.1)
|
|
b.close()
|
|
# 升级时的蜂鸣器提示
|
|
def beep_tip(self,power:bool):
|
|
if(power==True) and (self.beep_power!=True):
|
|
self.beep_power=power
|
|
t = threading.Thread(target=self._beep_run, args=())
|
|
t.start()
|
|
elif(power==False):
|
|
self.beep_power=power
|
|
def _beep_run(self):
|
|
b=beep()
|
|
while(self.beep_power==True):
|
|
b.power(True)
|
|
time.sleep(0.05)
|
|
b.power(False)
|
|
time.sleep(1.95)
|
|
b.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
dir_name=os.path.dirname(log_filepath)
|
|
if not os.path.exists(dir_name):
|
|
os.makedirs(dir_name)
|
|
updata=auto_updata()
|
|
while True:
|
|
updata.sd_check()
|
|
time.sleep(5)
|
|
# u=myudp()
|
|
# if(len(sys.argv)>=2):
|
|
# cmd=sys.argv[1]
|
|
# else:
|
|
# cmd="whos"
|
|
# updata.beep_tip(True)
|
|
# u.send_cmd(cmd)
|
|
# updata.beep_tip(False)
|
|
# time.sleep(5)
|
|
# updata.beep_end()
|
|
|
|
|
|
# if __name__ == "__main__":
|
|
# scan_files("/run/media")
|
|
|
|
|
|
|
|
|
|
|