#!/usr/bin/env python # -*- coding: UTF-8 -*- import datetime import fileinput import re import sys import serial import time import xmodem import os import binascii from bin.log import myprint from bin.log import log_init from bin.bin_to_hex import bin_to_hex_file from bin.bin_to_hex import bin_file_decrypt from bin.bin_to_hex import clear_tmp def init_send(s_port:serial.Serial, send_str:str): s_info = bytearray() send_str=send_str.encode('utf-8') while True: s_port.write(send_str) time.sleep(0.1) s_info += s_port.read(1) bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) s_info += tmp if(s_info.find(b"Recieving RAM-IMAGE in xmodem : C")>=0): m_ram=True else: m_ram=False if m_ram: myprint ("Program enters transmission mode...") break def burn_ram_bin(x_modem:xmodem.XMODEM, r_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(r_file, 'rb') except Exception: myprint(f"Cannot load file {r_file}") sys.exit(-1) myprint (f"Transferring {r_file}...") xmodem_send = x_modem.send(stream, callback=download_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ") def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file): global time_stamp_end s_info = bytearray() while True: bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) if(len(tmp)==0): continue print_device_str(tmp) s_info += tmp if(s_info.find(b'CCC')>=0 and bytes2read==1): m_flash=True else: m_flash=False if(s_info.find(b"Updating done, PLS reboot the device...")>=0): m_done=True else: m_done=False if m_flash: s_info = bytearray() stime = datetime.datetime.now() try: stream = open(f_file, 'rb') except Exception: myprint(f"Cannot load file {f_file}") sys.exit(-1) myprint (f"Transferring {f_file}...") xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") if(xmodem_send is False): break elif m_done: myprint("Update done.") break else: pass def upload_callback(total_packets, success_count, error_count, packet_size): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() def download_callback(total_packets, success_count, error_count): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() # 打印串口收到的字符 def print_device_str(data:bytearray): data:list[bytearray]=data.split(b"\r\n") for item in data: try: d=item.decode('utf-8') myprint("DEVICE:",d.strip()) except Exception as e: myprint("DEVICE:",item) # 上传固件 def upload_bin(x_modem:xmodem.XMODEM, w_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(w_file, 'wb+') except Exception: myprint(f"Cannot cteate file {w_file}") sys.exit(-1) myprint (f"Receiving {w_file}..." ) xmodem_send = x_modem.recv(stream, callback=upload_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") def getc(size, timeout=1): data=ser.read(size) return data or None def putc(data, timeout=1): ser.write(data) time.sleep(0.03) _work_dir='work' if not os.path.exists(_work_dir): os.mkdir(_work_dir) # 日志文件 固定存放在work目录 def calc_log_file_name(): name=os.path.split(iot_flash_file)[-1] log_file=name+'.log' return os.path.join(_work_dir,log_file) # 上传的flash镜像 固定存放在work目录 def calc_upload_name(): name=os.path.split(iot_flash_file)[-1] return os.path.join(_work_dir,name) # 根据flash镜像生成的hex文件 固定存放在work目录 def calc_hex_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.txt' return os.path.join(_work_dir,name) # 上传的flash信息 固定存放在work目录 def calc_flash_info_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.info' return os.path.join(_work_dir,name) # 上传固件 def upload_fun(): # 显示flash信息 ser.write(b"f i\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 显示image信息 ser.write(b"f s\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 上传整个镜像 ser.write(b"fw u d all\n") time.sleep(0.5) myprint(ser.read(4096).decode('utf-8')) upload_bin(modem,calc_upload_name()) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" ) myprint("Start transform bin to hex.") bin_to_hex_file(calc_upload_name(), calc_hex_name()) myprint("Transform to hex end.") # 烧录固件 def burn_fun(): time.sleep(3) ser_read_data=ser.read(4096) print_device_str(ser_read_data) ser.baudrate=nb_rate burn_flash_bin(ser, modem, iot_flash_file) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s") time.sleep(3) ser_read_data=ser.read(4096) print_device_str(ser_read_data) # 获取执行文件所在的目录 def bin_path(): if(sys.argv[0].endswith('.exe')): path=os.path.dirname(os.path.realpath(sys.executable)) else: path=os.path.abspath(__file__) path=os.path.split(path)[0] return os.path.join(path,'bin') def global_def(): global init_str global serial_com global b_rate global nb_rate global ram_file global iot_flash_file global ser global time_stamp_start global time_stamp_end global upload init_str="WQKL" serial_com=sys.argv[1] b_rate=115200 nb_rate=1500000 if(sys.argv[3]=='upload'): upload=True elif(sys.argv[3]=='download'): upload=False else: if(sys.argv[3].endswith('.bin')): if(os.path.exists(sys.argv[3])): # 存在这个文件 是下载 upload=False else: # 不存在这个文件 是上传 upload=True else: print("param err.") sys.exit(-1) if(upload): ram_file=f'bootram_{sys.argv[2]}.bin' else: ram_file=f'{sys.argv[2]}_ram_build.bin' ram_file=os.path.join(bin_path(),ram_file) iot_flash_file=sys.argv[-1] def print_help(): help=f''' Automatically determine whether to upload or download:" {sys.argv[0]} [com] [kl1/kl3] [upload.bin/download.bin] Manually specify whether to upload or download: {sys.argv[0]} [com] [kl1/kl3] [upload/download] [upload.bin/download.bin] Convert bin file: {sys.argv[0]} [file.bin] ''' print(help) # 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载 # kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] # kunlun.py [com] [kl1/kl3] [upload/download] [upload.bin/download.bin] # kunlun.py [file.bin] if __name__ == '__main__': if(len(sys.argv)<4): if(len(sys.argv)>=2): if(sys.argv[1].endswith('.bin')): if(len(sys.argv)>=3): enc=1 else: enc=0 bin_to_hex_file(sys.argv[1],"work/"+sys.argv[1]+".txt",enc) sys.exit(0) print("param too less") print_help() sys.exit(-1) upload=None time_stamp_start, time_stamp_end = 0, 0 init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None global_def() log_init(calc_log_file_name()) try: ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3) except Exception: myprint(f"serial com:{serial_com} open failed.") sys.exit(-1) modem = xmodem.XMODEM(getc, putc, mode='xmodem1k') # 发送启动字符让设备进入xmodem模式 init_send(ser, init_str) burn_ram_bin(modem, ram_file) if(upload): upload_fun() else: iot_flash_file=bin_file_decrypt(iot_flash_file) burn_fun() clear_tmp()