#!/usr/bin/env python # -*- coding: UTF-8 -*- import datetime import fileinput import re import sys import serial import time import xmodem import os import binascii import threading import argparse from bin.log import myprint from bin.log import log_init from bin.log import mywrite from bin.bin_to_hex import bin_to_hex_file from bin.bin_to_hex import bin_file_decrypt from bin.bin_to_hex import clear_tmp from bin.bin_to_hex import load_flash_info from bin.base import bin_path def init_send(s_port:serial.Serial, send_str:str): s_info = bytearray() send_str=send_str.encode('utf-8') while True: s_port.write(send_str) time.sleep(0.1) s_info += s_port.read(1) bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) s_info += tmp if(s_info.find(b"Recieving RAM-IMAGE in xmodem : C")>=0): m_ram=True else: m_ram=False if m_ram: myprint ("Program enters transmission mode...") break def burn_ram_bin(x_modem:xmodem.XMODEM, r_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(r_file, 'rb') except Exception: myprint(f"Cannot load file {r_file}") sys.exit(-1) myprint (f"Transferring {r_file}...") xmodem_send = x_modem.send(stream, callback=download_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ") def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file): global time_stamp_end s_info = bytearray() while True: bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) if(len(tmp)==0): continue print_device_str(tmp) s_info += tmp if(s_info.find(b'CCC')>=0 and bytes2read==1): m_flash=True else: m_flash=False if(s_info.find(b"Updating done, PLS reboot the device...")>=0): m_done=True else: m_done=False if m_flash: s_info = bytearray() stime = datetime.datetime.now() try: stream = open(f_file, 'rb') except Exception: myprint(f"Cannot load file {f_file}") sys.exit(-1) myprint (f"Transferring {f_file}...") xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") if(xmodem_send is False): break elif m_done: myprint("Update done.") break else: pass def upload_callback(total_packets, success_count, error_count, packet_size): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() def download_callback(total_packets, success_count, error_count): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() # 打印串口收到的字符 def print_device_str(data:bytearray): data:list[bytearray]=data.split(b"\r\n") for item in data: try: d=item.decode('utf-8') myprint("DEVICE:",d.strip()) except Exception as e: myprint("DEVICE:",item) _recv_thread=False # 读取input def read_input(s_port:serial.Serial): global _recv_thread def read(): while True: try: d=input() except KeyboardInterrupt: break if(d=='exit'): break try: s_port.write(d.encode('utf-8')+b'\n') except Exception: break read() _recv_thread=False def recv_ser_end(): global _recv_thread _recv_thread=False # 循环接收串口数据 def recv_ser_data(s_port:serial.Serial): global _recv_thread def recv(s_port:serial.Serial): global _recv_thread while _recv_thread: time.sleep(0.1) bytes2read = s_port.in_waiting if(bytes2read>0): tmp = s_port.read(bytes2read) try: txt=tmp.decode('utf-8') mywrite(txt) except Exception as e: pass _recv_thread=True t=threading.Thread(target=recv,args=(s_port,)) t.start() # 上传固件 def upload_bin(x_modem:xmodem.XMODEM, w_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(w_file, 'wb+') except Exception: myprint(f"Cannot cteate file {w_file}") sys.exit(-1) myprint (f"Receiving {w_file}..." ) xmodem_send = x_modem.recv(stream, callback=upload_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") def getc(size, timeout=1): data=ser.read(size) return data or None def putc(data, timeout=1): ser.write(data) time.sleep(0.03) _work_dir='work' if not os.path.exists(_work_dir): os.mkdir(_work_dir) # 日志文件 固定存放在work目录 def calc_log_file_name(): name=os.path.split(iot_flash_file)[-1] log_file=name+'.log' return os.path.join(_work_dir,log_file) # 上传的flash镜像 固定存放在work目录 def calc_upload_name(): name=os.path.split(iot_flash_file)[-1] return os.path.join(_work_dir,name) # 根据flash镜像生成的hex文件 固定存放在work目录 def calc_hex_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.txt' return os.path.join(_work_dir,name) # 上传的flash信息 固定存放在work目录 def calc_flash_info_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.info' return os.path.join(_work_dir,name) # 上传固件 def upload_fun(): # 显示flash信息 ser.write(b"f i\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 显示image信息 ser.write(b"f s\n") time.sleep(1) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 上传整个镜像 ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8")) time.sleep(0.5) myprint(ser.read(4096).decode('utf-8')) upload_bin(modem,calc_upload_name()) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" ) myprint("Start transform bin to hex.") bin_to_hex_file(calc_upload_name(), calc_hex_name(),layout_index) myprint("Transform to hex end.") # 烧录固件 def burn_fun(): time.sleep(3) ser_read_data=ser.read(4096) print_device_str(ser_read_data) ser.baudrate=nb_rate burn_flash_bin(ser, modem, iot_flash_file) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s") # time.sleep(3) # ser_read_data=ser.read(4096) # print_device_str(ser_read_data) # 配置要上传的镜像配置 def get_upload_cfg(key:str): if(key=='all'): return key info_list=load_flash_info(calc_flash_info_name()) # print(info_list) for item in info_list: index=item["ImgType"].find(key.upper()) if(index>=0): return f"{item['Offset']} {item['Size']}" myprint(f'Can not found key:{key}') return "all" def global_def(): global init_str global serial_com global b_rate global nb_rate global ram_file global iot_flash_file global ser global time_stamp_start global time_stamp_end global function_type global upload_key global parser global user_log global log_timeout global layout_index args = parser.parse_args() init_str="WQKL" serial_com=args.port b_rate=args.b_rate nb_rate=args.nb_rate user_log=args.log log_timeout=args.timeout layout_index=args.layout_index # 上传或者下载flash镜像 if(args.flash_file is not None): iot_flash_file=args.flash_file if(os.path.exists(iot_flash_file)): function_type='download' ram_file=f'kl{args.kunlun_version}_ram_build.bin' ram_file=os.path.join(bin_path(),ram_file) else: function_type='upload' ram_file=f'bootram_kl{args.kunlun_version}.bin' ram_file=os.path.join(bin_path(),ram_file) if(args.kunlun_version is None): print("请指定参数 -k") sys.exit(-1) if(args.upload_key is not None): upload_key = args.upload_key else: upload_key='all' elif(args.ram_file is not None): function_type='ram' ram_file=args.ram_file iot_flash_file=ram_file elif(args.bin_convert is not None): function_type='convert' iot_flash_file=args.bin_convert elif(args.console is not None): iot_flash_file=args.console function_type='console' def parser_init(): global parser parser = argparse.ArgumentParser(description='kunlun相关工具脚本') # 添加参数 parser.add_argument('-p','--port',action='store', help='输入使用的串口') parser.add_argument('-r', '--ram_file', action='store', help='输入要下载的ram文件名称,此项和 -f 选项不能同时指定') parser.add_argument('-f', '--flash_file', action='store', help='输入要下载或上传的flash文件名称,如果文件存在则下载;不存在则上传') parser.add_argument('-c', '--console', action='store', help='指定控制台保存的文件名,不指定则不进入控制台') parser.add_argument('-u','--upload_key',action='store',help='上传时保存分区的关键字,不指定则上传所有') parser.add_argument('-k','--kunlun_version',action='store',help='上传下载时的kunlun版本 3或者1') parser.add_argument('-b','--bin_convert',action='store',help='转换bin文件') parser.add_argument('-l','--log',action='store_true',default=False, help='下载ram或flash程序之后是否进入log界面,默认否') parser.add_argument('-t','--timeout',action='store',type=float,default=5,help='启用log时接收log的时间') parser.add_argument('-i','--layout_index',action='store',type=int,help='解析接收到的flash文件或转换bin文件时使用的layout,不指定则不解析') parser.add_argument('--b_rate',action='store',type=int,default=115200,help='下载ram程序 上传flash数据 控制台 接收log等 使用的串口波特率') parser.add_argument('--nb_rate',action='store',type=int,default=1500000,help='下载flash程序使用的串口波特率') def print_help(): help=f''' 输入: {sys.argv[0]} -h 查看帮助 ''' print(help) # 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载 # kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] # kunlun.py [com] [ram.bin] # kunlun.py [file.bin] if __name__ == '__main__': function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram,"convert"" 转换bin time_stamp_start, time_stamp_end = 0, 0 init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None upload_key=None parser=None user_log=False log_timeout=0 layout_index=None parser_init() global_def() if(function_type is None): print("param err.") print_help() sys.exit(-1) log_init(calc_log_file_name()) if(function_type=='convert'): bin_to_hex_file(iot_flash_file,calc_hex_name(),layout_index) sys.exit(0) if(serial_com is None): myprint(f"请指定串口 {serial_com}") sys.exit(-1) try: ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3) except Exception: myprint(f"serial com:{serial_com} open failed.") sys.exit(-1) if(function_type=='console'): recv_ser_data(ser) read_input(ser) sys.exit(0) modem = xmodem.XMODEM(getc, putc, mode='xmodem1k') # 发送启动字符让设备进入xmodem模式 init_send(ser, init_str) burn_ram_bin(modem, ram_file) if(function_type=='upload'): upload_fun() elif(function_type=='download'): iot_flash_file=bin_file_decrypt(iot_flash_file) burn_fun() clear_tmp() if(user_log): ser.baudrate=b_rate recv_ser_data(ser) time.sleep(log_timeout) recv_ser_end() elif(function_type=='ram'): if(user_log): ser.baudrate=b_rate recv_ser_data(ser) time.sleep(log_timeout) recv_ser_end() else: ser.baudrate=b_rate recv_ser_data(ser) read_input(ser) # if __name__ == "__main__": # read_input(None)