Files
kunlun_ramtool/kunlun.py
ranchuan fbda160e9a 新增以下提交
1.自动判断bin文件是否需要解密
2.添加下载指定ram.bin的功能,支持命令行交互
3.去除手动指定上传还是下载的命令选项
2024-10-17 16:14:50 +08:00

397 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import datetime
import fileinput
import re
import sys
import serial
import time
import xmodem
import os
import binascii
import threading
from bin.log import myprint
from bin.log import log_init
from bin.log import mywrite
from bin.bin_to_hex import bin_to_hex_file
from bin.bin_to_hex import bin_file_decrypt
from bin.bin_to_hex import clear_tmp
from bin.bin_to_hex import load_flash_info
from bin.base import bin_path
def init_send(s_port:serial.Serial, send_str:str):
s_info = bytearray()
send_str=send_str.encode('utf-8')
while True:
s_port.write(send_str)
time.sleep(0.1)
s_info += s_port.read(1)
bytes2read = s_port.in_waiting
tmp = s_port.read(bytes2read)
s_info += tmp
if(s_info.find(b"Recieving RAM-IMAGE in xmodem : C")>=0):
m_ram=True
else:
m_ram=False
if m_ram:
myprint ("Program enters transmission mode...")
break
def burn_ram_bin(x_modem:xmodem.XMODEM, r_file):
global time_stamp_end
stime = datetime.datetime.now()
try:
stream = open(r_file, 'rb')
except Exception:
myprint(f"Cannot load file {r_file}")
sys.exit(-1)
myprint (f"Transferring {r_file}...")
xmodem_send = x_modem.send(stream, callback=download_callback)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ")
def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file):
global time_stamp_end
s_info = bytearray()
while True:
bytes2read = s_port.in_waiting
tmp = s_port.read(bytes2read)
if(len(tmp)==0):
continue
print_device_str(tmp)
s_info += tmp
if(s_info.find(b'CCC')>=0 and bytes2read==1):
m_flash=True
else:
m_flash=False
if(s_info.find(b"Updating done, PLS reboot the device...")>=0):
m_done=True
else:
m_done=False
if m_flash:
s_info = bytearray()
stime = datetime.datetime.now()
try:
stream = open(f_file, 'rb')
except Exception:
myprint(f"Cannot load file {f_file}")
sys.exit(-1)
myprint (f"Transferring {f_file}...")
xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
if(xmodem_send is False):
break
elif m_done:
myprint("Update done.")
break
else:
pass
def upload_callback(total_packets, success_count, error_count, packet_size):
if total_packets % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
def download_callback(total_packets, success_count, error_count):
if total_packets % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
# 打印串口收到的字符
def print_device_str(data:bytearray):
data:list[bytearray]=data.split(b"\r\n")
for item in data:
try:
d=item.decode('utf-8')
myprint("DEVICE:",d.strip())
except Exception as e:
myprint("DEVICE:",item)
_recv_thread=False
# 读取input
def read_input(s_port:serial.Serial):
global _recv_thread
def read():
while True:
try:
d=input()
except KeyboardInterrupt:
break
if(d=='exit'):
break
try:
s_port.write(d.encode('utf-8')+b'\n')
except Exception:
break
read()
_recv_thread=False
# 循环接收串口数据
def recv_ser_data(s_port:serial.Serial):
global _recv_thread
def recv(s_port:serial.Serial):
global _recv_thread
while _recv_thread:
time.sleep(0.1)
bytes2read = s_port.in_waiting
if(bytes2read>0):
tmp = s_port.read(bytes2read)
try:
txt=tmp.decode('utf-8')
mywrite(txt)
except Exception as e:
pass
_recv_thread=True
t=threading.Thread(target=recv,args=(s_port,))
t.start()
# 上传固件
def upload_bin(x_modem:xmodem.XMODEM, w_file):
global time_stamp_end
stime = datetime.datetime.now()
try:
stream = open(w_file, 'wb+')
except Exception:
myprint(f"Cannot cteate file {w_file}")
sys.exit(-1)
myprint (f"Receiving {w_file}..." )
xmodem_send = x_modem.recv(stream, callback=upload_callback)
myprint('.')
etime = datetime.datetime.now()
time_stamp_end = (etime - stime).seconds
myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
def getc(size, timeout=1):
data=ser.read(size)
return data or None
def putc(data, timeout=1):
ser.write(data)
time.sleep(0.03)
_work_dir='work'
if not os.path.exists(_work_dir):
os.mkdir(_work_dir)
# 日志文件 固定存放在work目录
def calc_log_file_name():
name=os.path.split(iot_flash_file)[-1]
log_file=name+'.log'
return os.path.join(_work_dir,log_file)
# 上传的flash镜像 固定存放在work目录
def calc_upload_name():
name=os.path.split(iot_flash_file)[-1]
return os.path.join(_work_dir,name)
# 根据flash镜像生成的hex文件 固定存放在work目录
def calc_hex_name():
name=os.path.split(iot_flash_file)[-1]
name=name+'.txt'
return os.path.join(_work_dir,name)
# 上传的flash信息 固定存放在work目录
def calc_flash_info_name():
name=os.path.split(iot_flash_file)[-1]
name=name+'.info'
return os.path.join(_work_dir,name)
# 上传固件
def upload_fun():
# 显示flash信息
ser.write(b"f i\n")
time.sleep(0.5)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f:
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
# 显示image信息
ser.write(b"f s\n")
time.sleep(1)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f:
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
# 上传整个镜像
ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8"))
time.sleep(0.5)
myprint(ser.read(4096).decode('utf-8'))
upload_bin(modem,calc_upload_name())
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" )
myprint("Start transform bin to hex.")
bin_to_hex_file(calc_upload_name(), calc_hex_name())
myprint("Transform to hex end.")
# 烧录固件
def burn_fun():
time.sleep(3)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
ser.baudrate=nb_rate
burn_flash_bin(ser, modem, iot_flash_file)
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s")
time.sleep(3)
ser_read_data=ser.read(4096)
print_device_str(ser_read_data)
# 配置要上传的镜像配置
def get_upload_cfg(key:str):
if(key=='all'):
return key
info_list=load_flash_info(calc_flash_info_name())
# print(info_list)
for item in info_list:
index=item["ImgType"].find(key.upper())
if(index>=0):
return f"{item['Offset']} {item['Size']}"
myprint(f'Can not found key:{key}')
return "all"
def global_def():
global init_str
global serial_com
global b_rate
global nb_rate
global ram_file
global iot_flash_file
global ser
global time_stamp_start
global time_stamp_end
global function_type
global upload_key
init_str="WQKL"
serial_com=sys.argv[1]
b_rate=115200
nb_rate=1500000
arg_num=len(sys.argv)
def set_upload():
global function_type
if(sys.argv[3].endswith('.bin')):
if(os.path.exists(sys.argv[3])):
# 存在这个文件 是下载
function_type='download'
else:
# 不存在这个文件 是上传
function_type='upload'
else:
print("param err.")
sys.exit(-1)
if(arg_num==2):
if(sys.argv[1].endswith('.bin')):
function_type='convert'
elif(arg_num==3):
if(sys.argv[2].endswith('.bin')):
function_type='ram'
elif(arg_num==4):
set_upload()
upload_key="all"
elif(arg_num==5):
set_upload()
upload_key=sys.argv[4]
if(function_type=='upload'):
ram_file=f'bootram_{sys.argv[2]}.bin'
iot_flash_file=sys.argv[3]
elif(function_type=='download'):
ram_file=f'{sys.argv[2]}_ram_build.bin'
iot_flash_file=sys.argv[3]
elif(function_type=='ram'):
ram_file=sys.argv[2]
iot_flash_file=sys.argv[2]
return
elif(function_type=='convert'):
iot_flash_file=sys.argv[1]
return
ram_file=os.path.join(bin_path(),ram_file)
def print_help():
help=f'''
自动判断上传还是下载:
{sys.argv[0]} [com] [kl1/kl3] [upload.bin/download.bin] <key>
下载ram文件:
{sys.argv[0]} [com] [ram.bin]
转换bin文件:
{sys.argv[0]} [file.bin]
'''
print(help)
# 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载
# kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] <key>
# kunlun.py [com] [ram.bin]
# kunlun.py [file.bin] <enc>
if __name__ == '__main__':
function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram"convert"" 转换bin
time_stamp_start, time_stamp_end = 0, 0
init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None
upload_key=None
global_def()
if(function_type is None):
print("param err.")
print_help()
sys.exit(-1)
log_init(calc_log_file_name())
if(function_type=='convert'):
bin_to_hex_file(iot_flash_file,calc_hex_name())
sys.exit(0)
try:
ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3)
except Exception:
myprint(f"serial com:{serial_com} open failed.")
sys.exit(-1)
modem = xmodem.XMODEM(getc, putc, mode='xmodem1k')
# 发送启动字符让设备进入xmodem模式
init_send(ser, init_str)
burn_ram_bin(modem, ram_file)
if(function_type=='upload'):
upload_fun()
elif(function_type=='download'):
iot_flash_file=bin_file_decrypt(iot_flash_file)
burn_fun()
clear_tmp()
elif(function_type=='ram'):
recv_ser_data(ser)
read_input(ser)
# if __name__ == "__main__":
# read_input(None)