#!/usr/bin/env python # -*- coding: UTF-8 -*- import datetime import fileinput import re import sys import serial import time import xmodem import os import binascii import threading import argparse import base64 from bin.log import myprint from bin.log import log_init from bin.log import mywrite from bin.bin_to_hex import bin_to_hex_file from bin.bin_to_hex import bin_file_decrypt from bin.bin_to_hex import clear_tmp from bin.bin_to_hex import load_flash_info from bin.bin_to_hex import check_bin_type from bin.base import bin_path from bin.factory_mode import ftm_handle from bin.crc import CRC16 from bin.get_log import print_log from password import bootram_pssword def init_send(s_port:serial.Serial, send_str:str): s_info = bytearray() send_str=send_str.encode('utf-8') key_words={ b"WQKL":b"Recieving RAM-IMAGE in xmodem : C", b"HZPG":b"waiting receive image: C" } while True: s_port.write(send_str) time.sleep(0.1) s_info += s_port.read(1) bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) s_info += tmp if(s_info.find(key_words[send_str])>=0): m_ram=True else: m_ram=False if m_ram: myprint ("Program enters transmission mode...") break def burn_ram_bin(x_modem:xmodem.XMODEM, r_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(r_file, 'rb') except Exception: myprint(f"Cannot load file {r_file}") sys.exit(-1) myprint (f"Transferring {r_file}...") xmodem_send = x_modem.send(stream, callback=download_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ") def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file): global time_stamp_end s_info = bytearray() while True: bytes2read = s_port.in_waiting tmp = s_port.read(bytes2read) if(len(tmp)==0): continue print_device_str(tmp) s_info += tmp if(s_info.find(b'CCC')>=0 and bytes2read==1) or (s_info.find(b'Recieving FLASH-IMAGE in xmodem : C')>=0): m_flash=True else: m_flash=False if(s_info.find(b"Updating done, PLS reboot the device...")>=0): m_done=True else: m_done=False if m_flash: s_info = bytearray() stime = datetime.datetime.now() try: stream = open(f_file, 'rb') except Exception: myprint(f"Cannot load file {f_file}") sys.exit(-1) myprint (f"Transferring {f_file}...") xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") if(xmodem_send is False): break elif m_done: myprint("Update done.") break else: pass def upload_callback(total_packets, success_count, error_count, packet_size): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() def download_callback(total_packets, success_count, error_count): if total_packets % 10 == 0: sys.stdout.write('.') sys.stdout.flush() # 打印串口收到的字符 def print_device_str(data:bytearray): data=data.replace(b"\r\n",b"\n") index=0 while index0): tmp = s_port.read(bytes2read) try: txt=tmp.decode('utf-8') mywrite(txt) except Exception as e: pass _recv_thread=True t=threading.Thread(target=recv,args=(s_port,)) t.start() # 上传固件 def upload_bin(x_modem:xmodem.XMODEM, w_file): global time_stamp_end stime = datetime.datetime.now() try: stream = open(w_file, 'wb+') except Exception: myprint(f"Cannot cteate file {w_file}") sys.exit(-1) myprint (f"Receiving {w_file}..." ) xmodem_send = x_modem.recv(stream, callback=upload_callback) myprint('.') etime = datetime.datetime.now() time_stamp_end = (etime - stime).seconds myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ") def recv_ser_data_ftm(s_port:serial.Serial,timeout:float=1): recv_data=bytearray() while timeout>0: time.sleep(0.1) timeout-=0.1 bytes2read = s_port.in_waiting if(bytes2read>0): tmp = s_port.read(bytes2read) recv_data+=tmp ftm_handle(recv_data) def getc(size, timeout=1): data=ser.read(size) return data or None def putc(data, timeout=1): ser.write(data) time.sleep(0.03) _work_dir='work' if not os.path.exists(_work_dir): os.mkdir(_work_dir) # 日志文件 固定存放在work目录 def calc_log_file_name(): name=os.path.split(log_file)[-1] file=name+'.log' return os.path.join(_work_dir,file) # 上传的flash镜像 固定存放在work目录 def calc_upload_name(): name=os.path.split(iot_flash_file)[-1] return os.path.join(_work_dir,name) # 根据flash镜像生成的hex文件 固定存放在work目录 def calc_hex_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.txt' return os.path.join(_work_dir,name) # 上传的flash信息 固定存放在work目录 def calc_flash_info_name(): name=os.path.split(iot_flash_file)[-1] name=name+'.info' return os.path.join(_work_dir,name) # 计算bootram密码 def calc_bootram_pssword(info_file:str): with open(info_file,mode='r',encoding='utf-8') as f: lines=f.readlines() for line in lines: if(line.startswith('Chip Id')): chip_id=line.split(':')[-1].strip() password=bootram_pssword(chip_id) return f"{password:08x}" # 上传固件 def upload_fun(): # 显示flash信息 ser.write(b"f i\n") time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 输入密码 password=calc_bootram_pssword(calc_flash_info_name()) ser.write(f"PSS {password}\n".encode("utf-8")) time.sleep(0.5) ser_read_data=ser.read(4096) print_device_str(ser_read_data) # 显示image信息 ser.write(b"f s\n") time.sleep(1) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 上传整个镜像 ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8")) time.sleep(0.5) myprint(ser.read(4096).decode('utf-8')) upload_bin(modem,calc_upload_name()) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" ) myprint("Start transform bin to hex.") bin_to_hex_file(calc_upload_name(), calc_hex_name(),layout_index) myprint("Transform to hex end.") # 发送配置信息 def send_cfg_fun(): global nb_rate rate_table={ 115200:0, 460800:1, 1500000:2 } baud_cfg=f"u_baud={rate_table.get(nb_rate,0)}" cfg_str=base64.b64encode(baud_cfg.encode(encoding='utf-8')).decode(encoding='utf-8') text=f"ram_config:{cfg_str}:gifnoc_mar" crc=CRC16(text.encode('utf-8')) cfg_text=f"{text}#crc={crc}" # print(cfg_text) ser.write(f"{cfg_text}\n".encode("utf-8")) # 烧录固件 def burn_fun(): # time.sleep(1) send_cfg_fun() # time.sleep(0.02) ser_read_data=ser.read(4096) print_device_str(ser_read_data) ser.baudrate=nb_rate burn_flash_bin(ser, modem, iot_flash_file) myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s") # time.sleep(3) # ser_read_data=ser.read(4096) # print_device_str(ser_read_data) # 配置要上传的镜像配置 def get_upload_cfg(key:str): if(key=='all'): return key info_list=load_flash_info(calc_flash_info_name()) # print(info_list) for item in info_list: index=item["ImgType"].find(key.upper()) if(index>=0): return f"{item['Offset']} {item['Size']}" myprint(f'Can not found key:{key}') return "all" # 列出所有 .bin 文件 def list_bin_file(): path=os.path.curdir items:list[str]=[] l=os.listdir(path) for item in l: if(os.path.isfile(item)): items.append(item) l=os.listdir(bin_path()) for item in l: tmp=os.path.join(bin_path(),item) if(os.path.isfile(tmp)): items.append(item) ret_list:list[str]=[] for item in items: if(item.endswith(".bin")): ret_list.append(item) return ret_list # 获取ram文件的路径 def ram_file_redirect(file:str): if os.path.exists(file): return file else: p=os.path.join(bin_path(),file) if os.path.exists(p): return p myprint(f"文件不存在 {file}") sys.exit(-1) def global_def(): global init_str global serial_com global b_rate global nb_rate global ram_file global iot_flash_file global ser global time_stamp_start global time_stamp_end global function_type global upload_key global parser global user_log global log_timeout global layout_index global log_file global skip_ram global extract_log args = parser.parse_args() init_str="WQKL" serial_com=args.port b_rate=args.b_rate nb_rate=args.nb_rate user_log=args.log log_timeout=args.timeout layout_index=args.layout_index skip_ram=args.skip_ram extract_log=args.extract_log # 上传或者下载flash镜像 if(args.flash_file is not None): iot_flash_file=args.flash_file log_file=iot_flash_file if(args.ram_file is not None): ram_file=ram_file_redirect(args.ram_file) if(os.path.exists(iot_flash_file)): function_type='download' iot_flash_file=bin_file_decrypt(iot_flash_file) if args.kunlun_version is None: args.kunlun_version=check_bin_type(iot_flash_file) if ram_file is None: ram_file=f'kl{args.kunlun_version}_ram_build.bin' ram_file=ram_file_redirect(ram_file) else: function_type='upload' if ram_file is None: ram_file=f'bootram_kl{args.kunlun_version}.bin' ram_file=ram_file_redirect(ram_file) if(args.kunlun_version is None): print("请指定参数 -k") sys.exit(-1) elif(args.kunlun_version == "4"): init_str="HZPG" if(args.upload_key is not None): upload_key = args.upload_key else: upload_key='all' elif(args.ram_file is not None): function_type='ram' ram_file=ram_file_redirect(args.ram_file) log_file=args.ram_file if(args.kunlun_version is None): print("请指定参数 -k") sys.exit(-1) elif(args.kunlun_version == "4"): init_str="HZPG" elif(args.bin_convert is not None): function_type='convert' iot_flash_file=args.bin_convert log_file=iot_flash_file elif(args.console is not None): log_file=args.console function_type='console' elif(args.ftm): log_file=time.strftime("%Y%m%d-%H%M%S") function_type='ftm' elif(args.list): log_file=time.strftime("%Y%m%d-%H%M%S") function_type='list' def parser_init(): global parser parser = argparse.ArgumentParser(description='kunlun相关工具脚本') # 添加参数 parser.add_argument('-p','--port',action='store', help='输入使用的串口') parser.add_argument('-r', '--ram_file', action='store', help='输入要下载的ram文件名称,此项和 -f 选项不能同时指定') parser.add_argument('-f', '--flash_file', action='store', help='输入要下载或上传的flash文件名称,如果文件存在则下载;不存在则上传') parser.add_argument('-c', '--console', action='store', help='指定控制台保存的文件名,不指定则不进入控制台') parser.add_argument('-u','--upload_key',action='store',help='上传时保存分区的关键字,不指定则上传所有') parser.add_argument('-k','--kunlun_version',action='store',help='上传下载时的kunlun版本 3或者1') parser.add_argument('-b','--bin_convert',action='store',help='转换bin文件') parser.add_argument('-l','--log',action='store_true',default=False, help='下载ram或flash程序之后是否进入log界面,默认否') parser.add_argument('-t','--timeout',action='store',type=float,default=5,help='启用log时接收log的时间') parser.add_argument('-i','--layout_index',action='store',type=int,help='解析接收到的flash文件或转换bin文件时使用的layout,不指定则不解析') parser.add_argument('--b_rate',action='store',type=int,default=115200,help='下载ram程序 上传flash数据 控制台 接收log等 使用的串口波特率') parser.add_argument('--nb_rate',action='store',type=int,default=1500000,help='下载flash程序使用的串口波特率,支持115200,460800,1500000') parser.add_argument('--ftm',action='store_true',default=False,help='进入工厂模式') parser.add_argument('--list',action='store_true',default=False,help='列出所有.bin文件') parser.add_argument('--skip_ram',action='store_true',default=False,help='下载flash文件时是否跳过下载ram.bin的步骤,默认否') parser.add_argument('--extract_log',action='store_true',default=False,help='是否提取bin中的log信息,默认否') def print_help(): help=f''' 输入: {sys.argv[0]} -h 查看帮助 ''' print(help) if __name__ == '__main__': function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram,"convert"" 转换bin time_stamp_start, time_stamp_end = 0, 0 init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None upload_key, parser, layout_index, log_file, extract_log=None, None, None, None, False user_log=False log_timeout=0 skip_ram=False parser_init() global_def() if(function_type is None): print("param err.") print_help() sys.exit(-1) log_init(calc_log_file_name()) if(function_type=='convert'): bin_to_hex_file(iot_flash_file,calc_hex_name(),layout_index) if(extract_log): print_log(iot_flash_file) sys.exit(0) if(function_type=="list"): l=list_bin_file() myprint("当前目录和bin目录的.bin文件如下") for item in l: myprint(item) sys.exit(0) if(serial_com is None): myprint(f"请指定串口 {serial_com}") sys.exit(-1) try: ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3) except Exception: myprint(f"serial com:{serial_com} open failed.") sys.exit(-1) if(function_type=='console'): recv_ser_data(ser) read_input(ser) sys.exit(0) if(function_type=="ftm"): myprint("此功能尚未实现") sys.exit(0) modem = xmodem.XMODEM(getc, putc, mode='xmodem1k') # 发送启动字符让设备进入xmodem模式 if not skip_ram: init_send(ser, init_str) burn_ram_bin(modem, ram_file) if(function_type=='upload'): upload_fun() if(extract_log): print_log(calc_upload_name()) elif(function_type=='download'): burn_fun() clear_tmp() if(user_log): ser.baudrate=b_rate recv_ser_data(ser) time.sleep(log_timeout) recv_ser_end() elif(function_type=='ram'): if(user_log): ser.baudrate=b_rate recv_ser_data(ser) time.sleep(log_timeout) recv_ser_end() else: ser.baudrate=b_rate recv_ser_data(ser) read_input(ser) # if __name__ == "__main__": # nb_rate=115200 # send_cfg_fun()