Files
kunlun_ramtool/kunlun.py

516 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import datetime
import fileinput
import re
import sys
import serial
import time
import xmodem
import os
import binascii
import threading
import argparse
from bin.log import myprint
from bin.log import log_init
from bin.log import mywrite
from bin.bin_to_hex import bin_to_hex_file
from bin.bin_to_hex import bin_file_decrypt
from bin.bin_to_hex import clear_tmp
from bin.bin_to_hex import load_flash_info
from bin.bin_to_hex import check_bin_type
from bin.base import bin_path
from bin.factory_mode import ftm_handle
def init_send(s_port:serial.Serial, send_str:str):
s_info = bytearray()
send_str=send_str.encode('utf-8')
key_words={
b"WQKL":b"Recieving RAM-IMAGE in xmodem : C",
b"HZPG":b"waiting recieve image: C"
}
while True:
s_port.write(send_str)
time.sleep(0.1)
s_info += s_port.read(1)
bytes2read = s_port.in_waiting
tmp = s_port.read(bytes2read)
s_info += tmp
if(s_info.find(key_words[send_str])>=0):
m_ram=True
else:
m_ram=False
if m_ram:
myprint ("Program enters transmission mode...")
break
def burn_ram_bin(x_modem:xmodem.XMODEM, r_file):
global time_stamp_end
stime = datetime.datetime.now()
try:
stream = open(r_file, 'rb')
except Exception:
myprint(f"Cannot load file {r_file}")
sys.exit(-1)
myprint (f"Transferring {r_file}...")
xmodem_send = x_modem.send(stream, callback=download_callback)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ")
def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file):
global time_stamp_end
s_info = bytearray()
while True:
bytes2read = s_port.in_waiting
tmp = s_port.read(bytes2read)
if(len(tmp)==0):
continue
print_device_str(tmp)
s_info += tmp
if(s_info.find(b'CCC')>=0 and bytes2read==1):
m_flash=True
else:
m_flash=False
if(s_info.find(b"Updating done, PLS reboot the device...")>=0):
m_done=True
else:
m_done=False
if m_flash:
s_info = bytearray()
stime = datetime.datetime.now()
try:
stream = open(f_file, 'rb')
except Exception:
myprint(f"Cannot load file {f_file}")
sys.exit(-1)
myprint (f"Transferring {f_file}...")
xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
if(xmodem_send is False):
break
elif m_done:
myprint("Update done.")
break
else:
pass
def upload_callback(total_packets, success_count, error_count, packet_size):
if total_packets % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
def download_callback(total_packets, success_count, error_count):
if total_packets % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
# 打印串口收到的字符
def print_device_str(data:bytearray):
data=data.replace(b"\r\n",b"\n")
try:
d=data.decode('utf-8')
mywrite(d)
except Exception as e:
mywrite(f"{data}\n")
_recv_thread=False
# 读取input
def read_input(s_port:serial.Serial):
global _recv_thread
def read():
while True:
try:
d=input()
except KeyboardInterrupt:
break
if(d=='exit'):
break
try:
s_port.write(d.encode('utf-8')+b'\n')
except Exception:
break
read()
_recv_thread=False
def recv_ser_end():
global _recv_thread
_recv_thread=False
# 循环接收串口数据
def recv_ser_data(s_port:serial.Serial):
global _recv_thread
def recv(s_port:serial.Serial):
global _recv_thread
while _recv_thread:
time.sleep(0.1)
bytes2read = s_port.in_waiting
if(bytes2read>0):
tmp = s_port.read(bytes2read)
try:
txt=tmp.decode('utf-8')
mywrite(txt)
except Exception as e:
pass
_recv_thread=True
t=threading.Thread(target=recv,args=(s_port,))
t.start()
# 上传固件
def upload_bin(x_modem:xmodem.XMODEM, w_file):
global time_stamp_end
stime = datetime.datetime.now()
try:
stream = open(w_file, 'wb+')
except Exception:
myprint(f"Cannot cteate file {w_file}")
sys.exit(-1)
myprint (f"Receiving {w_file}..." )
xmodem_send = x_modem.recv(stream, callback=upload_callback)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
def recv_ser_data_ftm(s_port:serial.Serial,timeout:float=1):
recv_data=bytearray()
while timeout>0:
time.sleep(0.1)
timeout-=0.1
bytes2read = s_port.in_waiting
if(bytes2read>0):
tmp = s_port.read(bytes2read)
recv_data+=tmp
ftm_handle(recv_data)
def getc(size, timeout=1):
data=ser.read(size)
return data or None
def putc(data, timeout=1):
ser.write(data)
time.sleep(0.03)
_work_dir='work'
if not os.path.exists(_work_dir):
os.mkdir(_work_dir)
# 日志文件 固定存放在work目录
def calc_log_file_name():
name=os.path.split(log_file)[-1]
file=name+'.log'
return os.path.join(_work_dir,file)
# 上传的flash镜像 固定存放在work目录
def calc_upload_name():
name=os.path.split(iot_flash_file)[-1]
return os.path.join(_work_dir,name)
# 根据flash镜像生成的hex文件 固定存放在work目录
def calc_hex_name():
name=os.path.split(iot_flash_file)[-1]
name=name+'.txt'
return os.path.join(_work_dir,name)
# 上传的flash信息 固定存放在work目录
def calc_flash_info_name():
name=os.path.split(iot_flash_file)[-1]
name=name+'.info'
return os.path.join(_work_dir,name)
# 上传固件
def upload_fun():
# 显示flash信息
ser.write(b"f i\n")
time.sleep(0.5)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f:
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
# 显示image信息
ser.write(b"f s\n")
time.sleep(1)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f:
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
# 上传整个镜像
ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8"))
time.sleep(0.5)
myprint(ser.read(4096).decode('utf-8'))
upload_bin(modem,calc_upload_name())
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" )
myprint("Start transform bin to hex.")
bin_to_hex_file(calc_upload_name(), calc_hex_name(),layout_index)
myprint("Transform to hex end.")
# 烧录固件
def burn_fun():
time.sleep(3)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
ser.baudrate=nb_rate
burn_flash_bin(ser, modem, iot_flash_file)
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s")
# time.sleep(3)
# ser_read_data=ser.read(4096)
# print_device_str(ser_read_data)
# 配置要上传的镜像配置
def get_upload_cfg(key:str):
if(key=='all'):
return key
info_list=load_flash_info(calc_flash_info_name())
# print(info_list)
for item in info_list:
index=item["ImgType"].find(key.upper())
if(index>=0):
return f"{item['Offset']} {item['Size']}"
myprint(f'Can not found key:{key}')
return "all"
# 列出所有 .bin 文件
def list_bin_file():
path=os.path.curdir
items:list[str]=[]
l=os.listdir(path)
for item in l:
if(os.path.isfile(item)):
items.append(item)
l=os.listdir(bin_path())
for item in l:
tmp=os.path.join(bin_path(),item)
if(os.path.isfile(tmp)):
items.append(item)
ret_list:list[str]=[]
for item in items:
if(item.endswith(".bin")):
ret_list.append(item)
return ret_list
# 获取ram文件的路径
def ram_file_redirect(file:str):
if os.path.exists(file):
return file
else:
p=os.path.join(bin_path(),file)
if os.path.exists(p):
return p
myprint(f"文件不存在 {file}")
sys.exit(-1)
def global_def():
global init_str
global serial_com
global b_rate
global nb_rate
global ram_file
global iot_flash_file
global ser
global time_stamp_start
global time_stamp_end
global function_type
global upload_key
global parser
global user_log
global log_timeout
global layout_index
global log_file
global skip_ram
args = parser.parse_args()
init_str="WQKL"
serial_com=args.port
b_rate=args.b_rate
nb_rate=args.nb_rate
user_log=args.log
log_timeout=args.timeout
layout_index=args.layout_index
skip_ram=args.skip_ram
# 上传或者下载flash镜像
if(args.flash_file is not None):
iot_flash_file=args.flash_file
log_file=iot_flash_file
if(os.path.exists(iot_flash_file)):
function_type='download'
iot_flash_file=bin_file_decrypt(iot_flash_file)
if args.kunlun_version is None:
args.kunlun_version=check_bin_type(iot_flash_file)
ram_file=f'kl{args.kunlun_version}_ram_build.bin'
ram_file=ram_file_redirect(ram_file)
else:
function_type='upload'
ram_file=f'bootram_kl{args.kunlun_version}.bin'
ram_file=ram_file_redirect(ram_file)
if(args.ram_file is not None):
ram_file=ram_file_redirect(args.ram_file)
if(args.kunlun_version is None):
print("请指定参数 -k")
sys.exit(-1)
elif(args.kunlun_version == "4"):
init_str="HZPG"
if(args.upload_key is not None):
upload_key = args.upload_key
else:
upload_key='all'
elif(args.ram_file is not None):
function_type='ram'
ram_file=ram_file_redirect(args.ram_file)
log_file=args.ram_file
elif(args.bin_convert is not None):
function_type='convert'
iot_flash_file=args.bin_convert
log_file=iot_flash_file
elif(args.console is not None):
log_file=args.console
function_type='console'
elif(args.ftm):
log_file=time.strftime("%Y%m%d-%H%M%S")
function_type='ftm'
elif(args.list):
log_file=time.strftime("%Y%m%d-%H%M%S")
function_type='list'
def parser_init():
global parser
parser = argparse.ArgumentParser(description='kunlun相关工具脚本')
# 添加参数
parser.add_argument('-p','--port',action='store', help='输入使用的串口')
parser.add_argument('-r', '--ram_file', action='store', help='输入要下载的ram文件名称此项和 -f 选项不能同时指定')
parser.add_argument('-f', '--flash_file', action='store', help='输入要下载或上传的flash文件名称如果文件存在则下载不存在则上传')
parser.add_argument('-c', '--console', action='store', help='指定控制台保存的文件名,不指定则不进入控制台')
parser.add_argument('-u','--upload_key',action='store',help='上传时保存分区的关键字,不指定则上传所有')
parser.add_argument('-k','--kunlun_version',action='store',help='上传下载时的kunlun版本 3或者1')
parser.add_argument('-b','--bin_convert',action='store',help='转换bin文件')
parser.add_argument('-l','--log',action='store_true',default=False, help='下载ram或flash程序之后是否进入log界面默认否')
parser.add_argument('-t','--timeout',action='store',type=float,default=5,help='启用log时接收log的时间')
parser.add_argument('-i','--layout_index',action='store',type=int,help='解析接收到的flash文件或转换bin文件时使用的layout不指定则不解析')
parser.add_argument('--b_rate',action='store',type=int,default=115200,help='下载ram程序 上传flash数据 控制台 接收log等 使用的串口波特率')
parser.add_argument('--nb_rate',action='store',type=int,default=1500000,help='下载flash程序使用的串口波特率')
parser.add_argument('--ftm',action='store_true',default=False,help='进入工厂模式')
parser.add_argument('--list',action='store_true',default=False,help='列出所有.bin文件')
parser.add_argument('--skip_ram',action='store_true',default=False,help='下载flash文件时是否跳过下载ram.bin的步骤默认否')
def print_help():
help=f'''
输入:
{sys.argv[0]} -h
查看帮助
'''
print(help)
# 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载
# kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] <key>
# kunlun.py [com] [ram.bin]
# kunlun.py [file.bin] <enc>
if __name__ == '__main__':
function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram"convert"" 转换bin
time_stamp_start, time_stamp_end = 0, 0
init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None
upload_key, parser, layout_index, log_file=None, None, None, None
user_log=False
log_timeout=0
skip_ram=False
parser_init()
global_def()
if(function_type is None):
print("param err.")
print_help()
sys.exit(-1)
log_init(calc_log_file_name())
if(function_type=='convert'):
bin_to_hex_file(iot_flash_file,calc_hex_name(),layout_index)
sys.exit(0)
if(function_type=="list"):
l=list_bin_file()
myprint("当前目录和bin目录的.bin文件如下")
for item in l:
myprint(item)
sys.exit(0)
if(serial_com is None):
myprint(f"请指定串口 {serial_com}")
sys.exit(-1)
try:
ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3)
except Exception:
myprint(f"serial com:{serial_com} open failed.")
sys.exit(-1)
if(function_type=='console'):
recv_ser_data(ser)
read_input(ser)
sys.exit(0)
if(function_type=="ftm"):
myprint("此功能尚未实现")
sys.exit(0)
modem = xmodem.XMODEM(getc, putc, mode='xmodem1k')
# 发送启动字符让设备进入xmodem模式
if not skip_ram:
init_send(ser, init_str)
burn_ram_bin(modem, ram_file)
if(function_type=='upload'):
upload_fun()
elif(function_type=='download'):
burn_fun()
clear_tmp()
if(user_log):
ser.baudrate=b_rate
recv_ser_data(ser)
time.sleep(log_timeout)
recv_ser_end()
elif(function_type=='ram'):
if(user_log):
ser.baudrate=b_rate
recv_ser_data(ser)
time.sleep(log_timeout)
recv_ser_end()
else:
ser.baudrate=b_rate
recv_ser_data(ser)
read_input(ser)
# if __name__ == "__main__":
# read_input(None)