#!/usr/bin/env python # -*- coding: UTF-8 -*- import datetime import fileinput import re import sys import serial import time import xmodem import os import binascii import threading import argparse from bin.log import myprint from bin.log import log_init from bin.log import mywrite from bin.bin_to_hex import bin_to_hex_file from bin.bin_to_hex import bin_file_decrypt from bin.bin_to_hex import clear_tmp from bin.bin_to_hex import load_flash_info from bin.bin_to_hex import check_bin_type from bin.base import bin_path from bin.factory_mode import ftm_handle def init_send(s_port:serial.Serial, send_str:str): s_info = bytearray() send_str=send_str.encode('utf-8') key_words={ b"WQKL":b"Recieving RAM-IMAGE in xmodem : C", b"HZPG":b"waiting recieve image: C" } while True: s_port.write(send_str) time.sleep(0.1) s_info += s_port.read(1) bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) s_info += tmp if(s_info.find(key_words[send_str])>=0): m_ram=True else: m_ram=False if m_ram: myprint ("Program enters transmission mode...") break def burn_ram_bin(x_modem:xmodem.XMODEM, r_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(r_file, 'rb') except Exception: myprint(f"Cannot load file {r_file}") sys.exit(-1) myprint (f"Transferring {r_file}...") xmodem_send = x_modem.send(stream, callback=download_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ") def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file): global time_stamp_end s_info = bytearray() while True: bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) if(len(tmp)==0): continue print_device_str(tmp) s_info += tmp if(s_info.find(b'CCC')>=0 and bytes2read==1): m_flash=True else: m_flash=False if(s_info.find(b"Updating done, PLS reboot the device...")>=0): m_done=True else: m_done=False if m_flash: s_info = bytearray() stime = datetime.datetime.now() try: stream = open(f_file, 'rb') except Exception: myprint(f"Cannot load file {f_file}") sys.exit(-1) myprint (f"Transferring {f_file}...") xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") if(xmodem_send is False): break elif m_done: myprint("Update done.") break else: pass def upload_callback(total_packets, success_count, error_count, packet_size): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() def download_callback(total_packets, success_count, error_count): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() # 打印串口收到的字符 def print_device_str(data:bytearray): data=data.replace(b"\r\n",b"\n") try: d=data.decode('utf-8') mywrite(d) except Exception as e: mywrite(f"{data}\n") _recv_thread=False # 读取input def read_input(s_port:serial.Serial): global _recv_thread def read(): while True: try: d=input() except KeyboardInterrupt: break if(d=='exit'): break try: s_port.write(d.encode('utf-8')+b'\n') except Exception: break read() _recv_thread=False def recv_ser_end(): global _recv_thread _recv_thread=False # 循环接收串口数据 def recv_ser_data(s_port:serial.Serial): global _recv_thread def recv(s_port:serial.Serial): global _recv_thread while _recv_thread: time.sleep(0.1) bytes2read = s_port.in_waiting if(bytes2read>0): tmp = s_port.read(bytes2read) try: txt=tmp.decode('utf-8') mywrite(txt) except Exception as e: pass _recv_thread=True t=threading.Thread(target=recv,args=(s_port,)) t.start() # 上传固件 def upload_bin(x_modem:xmodem.XMODEM, w_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(w_file, 'wb+') except Exception: myprint(f"Cannot cteate file {w_file}") sys.exit(-1) myprint (f"Receiving {w_file}..." ) xmodem_send = x_modem.recv(stream, callback=upload_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") def recv_ser_data_ftm(s_port:serial.Serial,timeout:float=1): recv_data=bytearray() while timeout>0: time.sleep(0.1) timeout-=0.1 bytes2read = s_port.in_waiting if(bytes2read>0): tmp = s_port.read(bytes2read) recv_data+=tmp ftm_handle(recv_data) def getc(size, timeout=1): data=ser.read(size) return data or None def putc(data, timeout=1): ser.write(data) time.sleep(0.03) _work_dir='work' if not os.path.exists(_work_dir): os.mkdir(_work_dir) # 日志文件 固定存放在work目录 def calc_log_file_name(): name=os.path.split(log_file)[-1] file=name+'.log' return os.path.join(_work_dir,file) # 上传的flash镜像 固定存放在work目录 def calc_upload_name(): name=os.path.split(iot_flash_file)[-1] return os.path.join(_work_dir,name) # 根据flash镜像生成的hex文件 固定存放在work目录 def calc_hex_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.txt' return os.path.join(_work_dir,name) # 上传的flash信息 固定存放在work目录 def calc_flash_info_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.info' return os.path.join(_work_dir,name) # 上传固件 def upload_fun(): # 显示flash信息 ser.write(b"f i\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 显示image信息 ser.write(b"f s\n") time.sleep(1) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 上传整个镜像 ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8")) time.sleep(0.5) myprint(ser.read(4096).decode('utf-8')) upload_bin(modem,calc_upload_name()) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" ) myprint("Start transform bin to hex.") bin_to_hex_file(calc_upload_name(), calc_hex_name(),layout_index) myprint("Transform to hex end.") # 烧录固件 def burn_fun(): time.sleep(3) ser_read_data=ser.read(4096) print_device_str(ser_read_data) ser.baudrate=nb_rate burn_flash_bin(ser, modem, iot_flash_file) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s") # time.sleep(3) # ser_read_data=ser.read(4096) # print_device_str(ser_read_data) # 配置要上传的镜像配置 def get_upload_cfg(key:str): if(key=='all'): return key info_list=load_flash_info(calc_flash_info_name()) # print(info_list) for item in info_list: index=item["ImgType"].find(key.upper()) if(index>=0): return f"{item['Offset']} {item['Size']}" myprint(f'Can not found key:{key}') return "all" # 列出所有 .bin 文件 def list_bin_file(): path=os.path.curdir items:list[str]=[] l=os.listdir(path) for item in l: if(os.path.isfile(item)): items.append(item) l=os.listdir(bin_path()) for item in l: tmp=os.path.join(bin_path(),item) if(os.path.isfile(tmp)): items.append(item) ret_list:list[str]=[] for item in items: if(item.endswith(".bin")): ret_list.append(item) return ret_list # 获取ram文件的路径 def ram_file_redirect(file:str): if os.path.exists(file): return file else: p=os.path.join(bin_path(),file) if os.path.exists(p): return p myprint(f"文件不存在 {file}") sys.exit(-1) def global_def(): global init_str global serial_com global b_rate global nb_rate global ram_file global iot_flash_file global ser global time_stamp_start global time_stamp_end global function_type global upload_key global parser global user_log global log_timeout global layout_index global log_file global skip_ram args = parser.parse_args() init_str="WQKL" serial_com=args.port b_rate=args.b_rate nb_rate=args.nb_rate user_log=args.log log_timeout=args.timeout layout_index=args.layout_index skip_ram=args.skip_ram # 上传或者下载flash镜像 if(args.flash_file is not None): iot_flash_file=args.flash_file log_file=iot_flash_file if(os.path.exists(iot_flash_file)): function_type='download' iot_flash_file=bin_file_decrypt(iot_flash_file) if args.kunlun_version is None: args.kunlun_version=check_bin_type(iot_flash_file) ram_file=f'kl{args.kunlun_version}_ram_build.bin' ram_file=os.path.join(bin_path(),ram_file) else: function_type='upload' ram_file=f'bootram_kl{args.kunlun_version}.bin' ram_file=os.path.join(bin_path(),ram_file) if(args.kunlun_version is None): print("请指定参数 -k") sys.exit(-1) elif(args.kunlun_version == "4"): init_str="HZPG" if(args.upload_key is not None): upload_key = args.upload_key else: upload_key='all' elif(args.ram_file is not None): function_type='ram' ram_file=ram_file_redirect(args.ram_file) log_file=args.ram_file elif(args.bin_convert is not None): function_type='convert' iot_flash_file=args.bin_convert log_file=iot_flash_file elif(args.console is not None): log_file=args.console function_type='console' elif(args.ftm): log_file=time.strftime("%Y%m%d-%H%M%S") function_type='ftm' elif(args.list): log_file=time.strftime("%Y%m%d-%H%M%S") function_type='list' def parser_init(): global parser parser = argparse.ArgumentParser(description='kunlun相关工具脚本') # 添加参数 parser.add_argument('-p','--port',action='store', help='输入使用的串口') parser.add_argument('-r', '--ram_file', action='store', help='输入要下载的ram文件名称,此项和 -f 选项不能同时指定') parser.add_argument('-f', '--flash_file', action='store', help='输入要下载或上传的flash文件名称,如果文件存在则下载;不存在则上传') parser.add_argument('-c', '--console', action='store', help='指定控制台保存的文件名,不指定则不进入控制台') parser.add_argument('-u','--upload_key',action='store',help='上传时保存分区的关键字,不指定则上传所有') parser.add_argument('-k','--kunlun_version',action='store',help='上传下载时的kunlun版本 3或者1') parser.add_argument('-b','--bin_convert',action='store',help='转换bin文件') parser.add_argument('-l','--log',action='store_true',default=False, help='下载ram或flash程序之后是否进入log界面,默认否') parser.add_argument('-t','--timeout',action='store',type=float,default=5,help='启用log时接收log的时间') parser.add_argument('-i','--layout_index',action='store',type=int,help='解析接收到的flash文件或转换bin文件时使用的layout,不指定则不解析') parser.add_argument('--b_rate',action='store',type=int,default=115200,help='下载ram程序 上传flash数据 控制台 接收log等 使用的串口波特率') parser.add_argument('--nb_rate',action='store',type=int,default=1500000,help='下载flash程序使用的串口波特率') parser.add_argument('--ftm',action='store_true',default=False,help='进入工厂模式') parser.add_argument('--list',action='store_true',default=False,help='列出所有.bin文件') parser.add_argument('--skip_ram',action='store_true',default=False,help='下载flash文件时是否跳过下载ram.bin的步骤,默认否') def print_help(): help=f''' 输入: {sys.argv[0]} -h 查看帮助 ''' print(help) # 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载 # kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] # kunlun.py [com] [ram.bin] # kunlun.py [file.bin] if __name__ == '__main__': function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram,"convert"" 转换bin time_stamp_start, time_stamp_end = 0, 0 init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None upload_key, parser, layout_index, log_file=None, None, None, None user_log=False log_timeout=0 skip_ram=False parser_init() global_def() if(function_type is None): print("param err.") print_help() sys.exit(-1) log_init(calc_log_file_name()) if(function_type=='convert'): bin_to_hex_file(iot_flash_file,calc_hex_name(),layout_index) sys.exit(0) if(function_type=="list"): l=list_bin_file() myprint("当前目录和bin目录的.bin文件如下") for item in l: myprint(item) sys.exit(0) if(serial_com is None): myprint(f"请指定串口 {serial_com}") sys.exit(-1) try: ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3) except Exception: myprint(f"serial com:{serial_com} open failed.") sys.exit(-1) if(function_type=='console'): recv_ser_data(ser) read_input(ser) sys.exit(0) if(function_type=="ftm"): myprint("此功能尚未实现") sys.exit(0) modem = xmodem.XMODEM(getc, putc, mode='xmodem1k') # 发送启动字符让设备进入xmodem模式 if not skip_ram: init_send(ser, init_str) burn_ram_bin(modem, ram_file) if(function_type=='upload'): upload_fun() elif(function_type=='download'): burn_fun() clear_tmp() if(user_log): ser.baudrate=b_rate recv_ser_data(ser) time.sleep(log_timeout) recv_ser_end() elif(function_type=='ram'): if(user_log): ser.baudrate=b_rate recv_ser_data(ser) time.sleep(log_timeout) recv_ser_end() else: ser.baudrate=b_rate recv_ser_data(ser) read_input(ser) # if __name__ == "__main__": # read_input(None)