#!/usr/bin/env python # -*- coding: UTF-8 -*- import datetime import fileinput import re import sys import serial import time import xmodem import os import binascii from log import myprint from log import log_init from bin_to_hex import bin_to_hex_file def init_send(s_port:serial.Serial, send_str): s_info = bytearray() while True: s_port.write(send_str) time.sleep(0.1) s_info += s_port.read(1) bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) s_info += tmp if(s_info.find(b"Recieving RAM-IMAGE in xmodem : C")>=0): m_ram=True else: m_ram=False if m_ram: myprint ("Program enters transmission mode...") break def burn_ram_bin(x_modem:xmodem.XMODEM, r_file): global trans_time_0 stime = datetime.datetime.now() myprint ("Transferring %s..." % r_file) try: stream = open(r_file, 'rb') except Exception: myprint("Cannot load file, please check the file path and retry. Press to exit") sys.exit() xmodem_send = x_modem.send(stream, callback=download_callback) etime = datetime.datetime.now() trans_time_0 = (etime - stime).seconds myprint ("\nTransferring %s result: %s, consuming time: %s s \n" % (r_file, xmodem_send, trans_time_0)) def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file): global trans_time_1 s_info = bytearray() while True: bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) if(len(tmp)==0): continue print_device_str(tmp) s_info += tmp if(s_info.find(b'C')>=0 and bytes2read==1): m_flash=True else: m_flash=False if(s_info.find(b"Updating done, PLS reboot the device...")>=0): m_done=True else: m_done=False if m_flash: myprint (r"Recieving FLASH-IMAGE in xmodem : C") s_info = bytearray() stime = datetime.datetime.now() myprint ("Transferring %s..." % f_file) try: stream = open(f_file, 'rb') except Exception: myprint("Cannot load file, please check the file path and retry. Press to exit") sys.exit() xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback) etime = datetime.datetime.now() trans_time_1 = (etime - stime).seconds myprint ("\nTransferring %s result: %s, consuming time: %d s \n" % (f_file,xmodem_send, trans_time_1)) elif m_done: myprint("Update done.") break else: pass def upload_callback(total_packets, success_count, error_count, packet_size): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() def download_callback(total_packets, success_count, error_count): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() # 打印串口收到的字符 def print_device_str(data:bytearray): data:list[bytearray]=data.split(b"\r\n") for item in data: try: d=item.decode('utf-8') myprint(d.strip()) except Exception as e: myprint(f" {item.hex()}") # 上传固件 def upload_bin(x_modem:xmodem.XMODEM, w_file): global trans_time_0 stime = datetime.datetime.now() myprint ("Transferring %s..." % w_file) try: stream = open(w_file, 'wb+') except Exception: myprint("Cannot load file, please check the file path and retry. Press to exit") sys.exit() xmodem_send = x_modem.recv(stream, callback=upload_callback) etime = datetime.datetime.now() trans_time_0 = (etime - stime).seconds myprint ("\nTransferring ram.bin result: %s, consuming time: %s s \n" % (xmodem_send, trans_time_0)) def getc(size, timeout=1): data=ser.read(size) return data or None def putc(data, timeout=1): ser.write(data) time.sleep(0.03) _work_dir='work' if not os.path.exists(_work_dir): os.mkdir(_work_dir) def calc_log_file_name(): log_file=sys.argv[1].split('.')[0]+'.log' return os.path.join(_work_dir,log_file) def calc_upload_name(): name=sys.argv[1] return os.path.join(_work_dir,name) def calc_hex_name(): name=sys.argv[1]+'.txt' return os.path.join(_work_dir,name) def calc_flash_info_name(): name=sys.argv[1].split('.')[0]+'.info' return os.path.join(_work_dir,name) # 上传固件 def upload_fun(): # 显示flash信息 ser.write(b"f i\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 显示image信息 ser.write(b"f s\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 上传整个镜像 ser.write(b"fw u d all\n") time.sleep(0.5) myprint(ser.read(4096).decode('utf-8')) upload_bin(modem,calc_upload_name()) myprint ("Total transmission time: %s s" % str(trans_time_0+trans_time_1)) myprint("Start transform bin to hex.") bin_to_hex_file(calc_upload_name(), calc_hex_name()) myprint("Transform to hex end.") # 烧录固件 def burn_fun(): time.sleep(3) ser_read_data=ser.read(4096) print_device_str(ser_read_data) ser.baudrate=nb_rate burn_flash_bin(ser, modem, iot_flash_file) myprint ("Total transmission time: %s s" % str(trans_time_0+trans_time_1)) time.sleep(3) ser_read_data=ser.read(4096) print_device_str(ser_read_data) # kunlun.py [upload.bin] if __name__ == '__main__': if(len(sys.argv)<2): print("param too less") exit(-1) log_init(calc_log_file_name()) exit_flag = 0 trans_time_0, trans_time_1 = 0, 0 config_file = r"xmodem_config.txt" init_str, sp_num, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None for eachline in fileinput.FileInput(config_file): m_info = re.match(r"(\w+)\s*=\s*(.+)", eachline) if m_info: if m_info.group(1) == "init_str": init_str = m_info.group(2) elif m_info.group(1) == "serial_port_num": sp_num = int(m_info.group(2)) elif m_info.group(1) == "baud_rate": b_rate = int(m_info.group(2)) elif m_info.group(1) == "new_baud_rate": nb_rate = int(m_info.group(2)) elif m_info.group(1) == "ram_file": ram_file = m_info.group(2) elif m_info.group(1) == "iot_flash_file": iot_flash_file = m_info.group(2) else: pass try: ser = serial.Serial(port='COM' + str(sp_num), baudrate=b_rate, timeout=0.3) except Exception: myprint("Serial Port COM%s Conflicts!!! Press to Close it and retry..." % str(sp_num)) sys.exit() modem = xmodem.XMODEM(getc, putc, mode='xmodem1k') # 发送启动字符让设备进入xmodem模式 init_send(ser, init_str.encode("utf-8")) burn_ram_bin(modem, ram_file) # burn_ram_bin(modem, "kl3_ram_build.bin") upload_fun() # burn_fun()