#!/usr/bin/python3 import os import sys import shutil import threading import re import time import socket import json # 定义守护进程 # 2023.10.19 # 检测到插入sd卡时快速响5声 # 升级主板完成后连续响2声 # 小板升级完成后连续响3声 # 日志文件路径 log_filepath = '/home/root/log/daemon_log.txt' # 定义udp操作 class myudp(object): def __init__(self): self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) local_addr = ("", 0) self.udp.bind(local_addr) self.udp.settimeout(10) def send_cmd(self,text:str): send_data = text+"\n" self.udp.sendto(send_data.encode("utf-8"), ("127.0.0.1", 7777)) while True: recv=self.recv() print(recv) if(recv==None) or (recv[:4]=="ack:"): return recv def recv(self): try: recv_data = self.udp.recvfrom(1024) recv_msg = recv_data[0] send_addr = recv_data[1] # print ("收到的信息为:", recv_msg.decode("utf-8")) text=recv_msg.decode("utf-8") return text except Exception as err: print("err:",str(err)) return None def close(self): # 5.关闭套接字 self.udp.close() # 定义蜂鸣器 class beep(object): def __init__(self) -> None: try: # self.bee=open("/sys/class/leds/user-led/brightness","w+") self.bee=open("/sys/class/leds/beep/brightness","w+") except Exception as err: write_info("beep init err:"+str(err)) def power(self,p:bool): if(self.bee!=None): if(p==True): self.bee.write("1") else: self.bee.write("0") self.bee.flush() def close(self): if(self.bee!=None): self.power(False) self.bee.close() # 配置信息读取 class cfg(object): def __init__(self) -> None: try: file=open("/home/root/config/cfg.json","rb") self.json_obj=json.loads(file.read()) file.close() except Exception as err: print(str(err)) def slave_addrs(self): addrs="" if(self.json_obj!=None): num=self.json_obj["slave_num"] for i in range(num): addrs+=str(i+1)+',' return addrs[:-1] return addrs def write_info(text:str): fm = '%Y-%m-%d %X' nowtime = time.strftime(fm, time.localtime()) with open(log_filepath, 'a') as fp: fp.write(nowtime+ " | "+text+"\n") print(text) def _do_cmd(cmd:str): write_info(cmd) ret = os.popen(cmd).readlines() for i in range(len(ret)): ret[i]=ret[i].strip() write_info(ret[i]) return ret class auto_updata(object): def __init__(self): self.sd_path="/run/media/sda/updata" # self.file_path=self.sd_path+'/updata' self.file_path=self.sd_path self.sd_inserd_state = False self.beep_power=False self.file_list=[] def sd_check(self): ack=os.path.exists(self.sd_path) if(ack!=self.sd_inserd_state): if(ack==True): write_info("sd inserd.") self.beep_insert() if(self.copy_file()==True): time.sleep(5) self.beep_tip(True) self.updata_slave() self.beep_tip(False) time.sleep(2.3) self.beep_end() else: write_info("sd extracted.") self.sd_inserd_state=ack def copy_file(self): self.file_list.clear() try: self.file_list=os.listdir(self.file_path) except Exception as e: write_info(str(e)) return False for i in self.file_list: write_info("|---| "+i) _do_cmd("systemctl stop atk-qtapp-start.service") _do_cmd("mkdir /home/root/config") _do_cmd("cp "+self.find_file_by_type([".elf"])+" /usr/local/QDesktop-fb") _do_cmd("chmod 777 /usr/local/QDesktop-fb") _do_cmd("cp "+self.find_file_by_type([".bin"])+" /home/root/config/checker_slave.bin") _do_cmd("cp "+self.find_file_by_type([".pkt"])+" /home/root/config/checker_slave.pkt") _do_cmd("cp "+self.find_file_by_type([".json"])+" /home/root/config/checker_ye_cfg.json") _do_cmd("cp "+self.file_path+"/cfg.json /home/root/config/cfg.json") _do_cmd("cp "+self.find_file_by_type([".axf"])+" /lib/firmware/checker_m4.axf") # _do_cmd("cp "+self.find_file_by_type(".dtb")+" /boot/stm32mp157d-atk.dtb") _do_cmd("sync") _do_cmd("systemctl restart atk-qtapp-start.service") write_info("copy file end.") return True def find_file_by_type(self,types:list): for i in self.file_list: sp=i.split(".")[-1] sp='.'+sp if(i!="cfg.json"): if(sp in types): return self.file_path+'/'+i return "unknown" def updata_slave(self): cfg_f=cfg() addrs=cfg_f.slave_addrs() write_info("slave addrs:"+addrs) slave_file=self.find_file_by_type([".bin"]) if(slave_file!="unknown"): write_info("updata slave:"+slave_file) u=myudp() cmd="mcu updata "+addrs+" /home/root/config/checker_slave.bin" a=u.send_cmd(cmd) if(a!=None): write_info("|--| "+a) slave_file=self.find_file_by_type([".pkt"]) if(slave_file!="unknown"): write_info("updata slave:"+slave_file) u=myudp() cmd="mcu updata "+addrs+" /home/root/config/checker_slave.pkt" a=u.send_cmd(cmd) if(a!=None): write_info("|--| "+a) slave_file=self.find_file_by_type([".json"]) if(slave_file!="unknown"): time.sleep(5) write_info("updata scheme:"+slave_file) u=myudp() cmd="mcu scheme "+addrs a=u.send_cmd(cmd) if(a!=None): write_info("|--| "+a) # 升级完成时的连续三声提示 def beep_end(self): b=beep() for i in range(3): b.power(True) time.sleep(0.05) b.power(False) time.sleep(0.2) b.close() # 检测到sd卡插入时连续响5声 def beep_insert(self): b=beep() for i in range(5): b.power(True) time.sleep(0.05) b.power(False) time.sleep(0.1) b.close() # 升级时的蜂鸣器提示 def beep_tip(self,power:bool): if(power==True) and (self.beep_power!=True): self.beep_power=power t = threading.Thread(target=self._beep_run, args=()) t.start() elif(power==False): self.beep_power=power def _beep_run(self): b=beep() while(self.beep_power==True): b.power(True) time.sleep(0.05) b.power(False) time.sleep(1.95) b.close() if __name__ == '__main__': dir_name=os.path.dirname(log_filepath) if not os.path.exists(dir_name): os.makedirs(dir_name) updata=auto_updata() while True: updata.sd_check() time.sleep(5) # u=myudp() # if(len(sys.argv)>=2): # cmd=sys.argv[1] # else: # cmd="whos" # updata.beep_tip(True) # u.send_cmd(cmd) # updata.beep_tip(False) # time.sleep(5) # updata.beep_end() # if __name__ == "__main__": # scan_files("/run/media")