Files
kunlun_ramtool/kunlun.py
2024-10-18 19:14:48 +08:00

438 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import datetime
import fileinput
import re
import sys
import serial
import time
import xmodem
import os
import binascii
import threading
import argparse
from bin.log import myprint
from bin.log import log_init
from bin.log import mywrite
from bin.bin_to_hex import bin_to_hex_file
from bin.bin_to_hex import bin_file_decrypt
from bin.bin_to_hex import clear_tmp
from bin.bin_to_hex import load_flash_info
from bin.base import bin_path
def init_send(s_port:serial.Serial, send_str:str):
s_info = bytearray()
send_str=send_str.encode('utf-8')
while True:
s_port.write(send_str)
time.sleep(0.1)
s_info += s_port.read(1)
bytes2read = s_port.in_waiting
tmp = s_port.read(bytes2read)
s_info += tmp
if(s_info.find(b"Recieving RAM-IMAGE in xmodem : C")>=0):
m_ram=True
else:
m_ram=False
if m_ram:
myprint ("Program enters transmission mode...")
break
def burn_ram_bin(x_modem:xmodem.XMODEM, r_file):
global time_stamp_end
stime = datetime.datetime.now()
try:
stream = open(r_file, 'rb')
except Exception:
myprint(f"Cannot load file {r_file}")
sys.exit(-1)
myprint (f"Transferring {r_file}...")
xmodem_send = x_modem.send(stream, callback=download_callback)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ")
def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file):
global time_stamp_end
s_info = bytearray()
while True:
bytes2read = s_port.in_waiting
tmp = s_port.read(bytes2read)
if(len(tmp)==0):
continue
print_device_str(tmp)
s_info += tmp
if(s_info.find(b'CCC')>=0 and bytes2read==1):
m_flash=True
else:
m_flash=False
if(s_info.find(b"Updating done, PLS reboot the device...")>=0):
m_done=True
else:
m_done=False
if m_flash:
s_info = bytearray()
stime = datetime.datetime.now()
try:
stream = open(f_file, 'rb')
except Exception:
myprint(f"Cannot load file {f_file}")
sys.exit(-1)
myprint (f"Transferring {f_file}...")
xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
if(xmodem_send is False):
break
elif m_done:
myprint("Update done.")
break
else:
pass
def upload_callback(total_packets, success_count, error_count, packet_size):
if total_packets % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
def download_callback(total_packets, success_count, error_count):
if total_packets % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
# 打印串口收到的字符
def print_device_str(data:bytearray):
data:list[bytearray]=data.split(b"\r\n")
for item in data:
try:
d=item.decode('utf-8')
myprint("DEVICE:",d.strip())
except Exception as e:
myprint("DEVICE:",item)
_recv_thread=False
# 读取input
def read_input(s_port:serial.Serial):
global _recv_thread
def read():
while True:
try:
d=input()
except KeyboardInterrupt:
break
if(d=='exit'):
break
try:
s_port.write(d.encode('utf-8')+b'\n')
except Exception:
break
read()
_recv_thread=False
def recv_ser_end():
global _recv_thread
_recv_thread=False
# 循环接收串口数据
def recv_ser_data(s_port:serial.Serial):
global _recv_thread
def recv(s_port:serial.Serial):
global _recv_thread
while _recv_thread:
time.sleep(0.1)
bytes2read = s_port.in_waiting
if(bytes2read>0):
tmp = s_port.read(bytes2read)
try:
txt=tmp.decode('utf-8')
mywrite(txt)
except Exception as e:
pass
_recv_thread=True
t=threading.Thread(target=recv,args=(s_port,))
t.start()
# 上传固件
def upload_bin(x_modem:xmodem.XMODEM, w_file):
global time_stamp_end
stime = datetime.datetime.now()
try:
stream = open(w_file, 'wb+')
except Exception:
myprint(f"Cannot cteate file {w_file}")
sys.exit(-1)
myprint (f"Receiving {w_file}..." )
xmodem_send = x_modem.recv(stream, callback=upload_callback)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
def getc(size, timeout=1):
data=ser.read(size)
return data or None
def putc(data, timeout=1):
ser.write(data)
time.sleep(0.03)
_work_dir='work'
if not os.path.exists(_work_dir):
os.mkdir(_work_dir)
# 日志文件 固定存放在work目录
def calc_log_file_name():
name=os.path.split(iot_flash_file)[-1]
log_file=name+'.log'
return os.path.join(_work_dir,log_file)
# 上传的flash镜像 固定存放在work目录
def calc_upload_name():
name=os.path.split(iot_flash_file)[-1]
return os.path.join(_work_dir,name)
# 根据flash镜像生成的hex文件 固定存放在work目录
def calc_hex_name():
name=os.path.split(iot_flash_file)[-1]
name=name+'.txt'
return os.path.join(_work_dir,name)
# 上传的flash信息 固定存放在work目录
def calc_flash_info_name():
name=os.path.split(iot_flash_file)[-1]
name=name+'.info'
return os.path.join(_work_dir,name)
# 上传固件
def upload_fun():
# 显示flash信息
ser.write(b"f i\n")
time.sleep(0.5)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f:
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
# 显示image信息
ser.write(b"f s\n")
time.sleep(1)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f:
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
# 上传整个镜像
ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8"))
time.sleep(0.5)
myprint(ser.read(4096).decode('utf-8'))
upload_bin(modem,calc_upload_name())
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" )
myprint("Start transform bin to hex.")
bin_to_hex_file(calc_upload_name(), calc_hex_name())
myprint("Transform to hex end.")
# 烧录固件
def burn_fun():
time.sleep(3)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
ser.baudrate=nb_rate
burn_flash_bin(ser, modem, iot_flash_file)
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s")
# time.sleep(3)
# ser_read_data=ser.read(4096)
# print_device_str(ser_read_data)
# 配置要上传的镜像配置
def get_upload_cfg(key:str):
if(key=='all'):
return key
info_list=load_flash_info(calc_flash_info_name())
# print(info_list)
for item in info_list:
index=item["ImgType"].find(key.upper())
if(index>=0):
return f"{item['Offset']} {item['Size']}"
myprint(f'Can not found key:{key}')
return "all"
def global_def():
global init_str
global serial_com
global b_rate
global nb_rate
global ram_file
global iot_flash_file
global ser
global time_stamp_start
global time_stamp_end
global function_type
global upload_key
global parser
global user_log
global log_timeout
args = parser.parse_args()
init_str="WQKL"
serial_com=args.port
b_rate=115200
nb_rate=1500000
user_log=args.log
log_timeout=args.timeout
# 上传或者下载flash镜像
if(args.flash_file is not None):
iot_flash_file=args.flash_file
if(os.path.exists(iot_flash_file)):
function_type='download'
ram_file=f'kl{args.kunlun_version}_ram_build.bin'
ram_file=os.path.join(bin_path(),ram_file)
else:
function_type='upload'
ram_file=f'bootram_kl{args.kunlun_version}.bin'
ram_file=os.path.join(bin_path(),ram_file)
if(args.kunlun_version is None):
print("请指定参数 -k")
sys.exit(-1)
if(args.upload_key is not None):
upload_key = args.upload_key
else:
upload_key='all'
elif(args.ram_file is not None):
function_type='ram'
ram_file=args.ram_file
iot_flash_file=ram_file
elif(args.bin_convert is not None):
function_type='convert'
iot_flash_file=args.bin_convert
elif(args.console is not None):
iot_flash_file=args.console
function_type='console'
def parser_init():
global parser
parser = argparse.ArgumentParser(description='kunlun相关工具脚本')
# 添加参数
parser.add_argument('-p','--port',action='store', help='输入使用的串口')
parser.add_argument('-r', '--ram_file', action='store', help='输入使用的ram程序')
parser.add_argument('-f', '--flash_file', action='store', help='输入使用的flash程序')
parser.add_argument('-c', '--console', action='store', help='指定控制台保存的文件名,不指定则不进入控制台')
parser.add_argument('-u','--upload_key',action='store',help='上传时保存分区的关键字,不指定则上传所有')
parser.add_argument('-k','--kunlun_version',action='store',help='上传下载时的kunlun版本 3或者1')
parser.add_argument('-b','--bin_convert',action='store',help='转换bin文件')
parser.add_argument('-l','--log',action='store_true',default=False, help='下载ram或flash程序之后是否进入log界面默认否')
parser.add_argument('-t','--timeout',action='store',type=float,default=5,help='启用log时接收log的时间')
def print_help():
help=f'''
输入:
{sys.argv[0]} -h
查看帮助
'''
print(help)
# 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载
# kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] <key>
# kunlun.py [com] [ram.bin]
# kunlun.py [file.bin] <enc>
if __name__ == '__main__':
function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram"convert"" 转换bin
time_stamp_start, time_stamp_end = 0, 0
init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None
upload_key=None
parser=None
user_log=False
log_timeout=0
parser_init()
global_def()
if(function_type is None):
print("param err.")
print_help()
sys.exit(-1)
log_init(calc_log_file_name())
if(function_type=='convert'):
bin_to_hex_file(iot_flash_file,calc_hex_name())
sys.exit(0)
if(serial_com is None):
myprint(f"请指定串口 {serial_com}")
sys.exit(-1)
try:
ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3)
except Exception:
myprint(f"serial com:{serial_com} open failed.")
sys.exit(-1)
if(function_type=='console'):
recv_ser_data(ser)
read_input(ser)
sys.exit(0)
modem = xmodem.XMODEM(getc, putc, mode='xmodem1k')
# 发送启动字符让设备进入xmodem模式
init_send(ser, init_str)
burn_ram_bin(modem, ram_file)
if(function_type=='upload'):
upload_fun()
elif(function_type=='download'):
iot_flash_file=bin_file_decrypt(iot_flash_file)
burn_fun()
clear_tmp()
if(user_log):
ser.baudrate=b_rate
recv_ser_data(ser)
time.sleep(log_timeout)
recv_ser_end()
elif(function_type=='ram'):
if(user_log):
ser.baudrate=b_rate
recv_ser_data(ser)
time.sleep(log_timeout)
recv_ser_end()
else:
ser.baudrate=b_rate
recv_ser_data(ser)
read_input(ser)
# if __name__ == "__main__":
# read_input(None)