400 lines
11 KiB
Python
400 lines
11 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: UTF-8 -*-
|
||
|
||
import datetime
|
||
import fileinput
|
||
import re
|
||
import sys
|
||
import serial
|
||
import time
|
||
import xmodem
|
||
import os
|
||
import binascii
|
||
import threading
|
||
from bin.log import myprint
|
||
from bin.log import log_init
|
||
from bin.log import mywrite
|
||
from bin.bin_to_hex import bin_to_hex_file
|
||
from bin.bin_to_hex import bin_file_decrypt
|
||
from bin.bin_to_hex import clear_tmp
|
||
from bin.bin_to_hex import load_flash_info
|
||
from bin.base import bin_path
|
||
|
||
|
||
def init_send(s_port:serial.Serial, send_str:str):
|
||
s_info = bytearray()
|
||
send_str=send_str.encode('utf-8')
|
||
while True:
|
||
s_port.write(send_str)
|
||
time.sleep(0.1)
|
||
s_info += s_port.read(1)
|
||
bytes2read = s_port.in_waiting
|
||
tmp = s_port.read(bytes2read)
|
||
s_info += tmp
|
||
if(s_info.find(b"Recieving RAM-IMAGE in xmodem : C")>=0):
|
||
m_ram=True
|
||
else:
|
||
m_ram=False
|
||
if m_ram:
|
||
myprint ("Program enters transmission mode...")
|
||
break
|
||
|
||
|
||
def burn_ram_bin(x_modem:xmodem.XMODEM, r_file):
|
||
global time_stamp_end
|
||
stime = datetime.datetime.now()
|
||
try:
|
||
stream = open(r_file, 'rb')
|
||
except Exception:
|
||
myprint(f"Cannot load file {r_file}")
|
||
sys.exit(-1)
|
||
myprint (f"Transferring {r_file}...")
|
||
|
||
xmodem_send = x_modem.send(stream, callback=download_callback)
|
||
myprint('.')
|
||
etime = datetime.datetime.now()
|
||
time_stamp_end = (etime - stime).seconds
|
||
myprint (f"Transferring {r_file} result: {xmodem_send}, consuming time: {(time_stamp_end- time_stamp_start)} s ")
|
||
|
||
def burn_flash_bin(s_port:serial.Serial, x_modem:xmodem.XMODEM, f_file):
|
||
global time_stamp_end
|
||
s_info = bytearray()
|
||
while True:
|
||
bytes2read = s_port.in_waiting
|
||
tmp = s_port.read(bytes2read)
|
||
if(len(tmp)==0):
|
||
continue
|
||
print_device_str(tmp)
|
||
s_info += tmp
|
||
if(s_info.find(b'CCC')>=0 and bytes2read==1):
|
||
m_flash=True
|
||
else:
|
||
m_flash=False
|
||
if(s_info.find(b"Updating done, PLS reboot the device...")>=0):
|
||
m_done=True
|
||
else:
|
||
m_done=False
|
||
|
||
if m_flash:
|
||
s_info = bytearray()
|
||
stime = datetime.datetime.now()
|
||
try:
|
||
stream = open(f_file, 'rb')
|
||
except Exception:
|
||
myprint(f"Cannot load file {f_file}")
|
||
sys.exit(-1)
|
||
myprint (f"Transferring {f_file}...")
|
||
|
||
xmodem_send = x_modem.send(stream, quiet=True, callback=download_callback,retry=16)
|
||
myprint('.')
|
||
etime = datetime.datetime.now()
|
||
time_stamp_end = (etime - stime).seconds
|
||
myprint (f"Transferring {f_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
|
||
if(xmodem_send is False):
|
||
break
|
||
elif m_done:
|
||
myprint("Update done.")
|
||
break
|
||
else:
|
||
pass
|
||
|
||
|
||
def upload_callback(total_packets, success_count, error_count, packet_size):
|
||
if total_packets % 10 == 0:
|
||
sys.stdout.write('.')
|
||
sys.stdout.flush()
|
||
|
||
|
||
def download_callback(total_packets, success_count, error_count):
|
||
if total_packets % 10 == 0:
|
||
sys.stdout.write('.')
|
||
sys.stdout.flush()
|
||
|
||
# 打印串口收到的字符
|
||
def print_device_str(data:bytearray):
|
||
data:list[bytearray]=data.split(b"\r\n")
|
||
for item in data:
|
||
try:
|
||
d=item.decode('utf-8')
|
||
myprint("DEVICE:",d.strip())
|
||
except Exception as e:
|
||
myprint("DEVICE:",item)
|
||
|
||
|
||
|
||
_recv_thread=False
|
||
# 读取input
|
||
def read_input(s_port:serial.Serial):
|
||
global _recv_thread
|
||
def read():
|
||
while True:
|
||
try:
|
||
d=input()
|
||
except KeyboardInterrupt:
|
||
break
|
||
if(d=='exit'):
|
||
break
|
||
try:
|
||
s_port.write(d.encode('utf-8')+b'\n')
|
||
except Exception:
|
||
break
|
||
read()
|
||
_recv_thread=False
|
||
|
||
# 循环接收串口数据
|
||
def recv_ser_data(s_port:serial.Serial):
|
||
global _recv_thread
|
||
def recv(s_port:serial.Serial):
|
||
global _recv_thread
|
||
while _recv_thread:
|
||
time.sleep(0.1)
|
||
bytes2read = s_port.in_waiting
|
||
if(bytes2read>0):
|
||
tmp = s_port.read(bytes2read)
|
||
try:
|
||
txt=tmp.decode('utf-8')
|
||
mywrite(txt)
|
||
except Exception as e:
|
||
pass
|
||
_recv_thread=True
|
||
t=threading.Thread(target=recv,args=(s_port,))
|
||
t.start()
|
||
|
||
# 上传固件
|
||
def upload_bin(x_modem:xmodem.XMODEM, w_file):
|
||
global time_stamp_end
|
||
stime = datetime.datetime.now()
|
||
try:
|
||
stream = open(w_file, 'wb+')
|
||
except Exception:
|
||
myprint(f"Cannot cteate file {w_file}")
|
||
sys.exit(-1)
|
||
myprint (f"Receiving {w_file}..." )
|
||
|
||
xmodem_send = x_modem.recv(stream, callback=upload_callback)
|
||
myprint('.')
|
||
etime = datetime.datetime.now()
|
||
time_stamp_end = (etime - stime).seconds
|
||
myprint (f"Receiving {w_file} result: {xmodem_send}, consuming time: {(time_stamp_end-time_stamp_start)} s ")
|
||
|
||
|
||
|
||
|
||
def getc(size, timeout=1):
|
||
data=ser.read(size)
|
||
return data or None
|
||
|
||
|
||
def putc(data, timeout=1):
|
||
ser.write(data)
|
||
time.sleep(0.03)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
_work_dir='work'
|
||
if not os.path.exists(_work_dir):
|
||
os.mkdir(_work_dir)
|
||
|
||
|
||
# 日志文件 固定存放在work目录
|
||
def calc_log_file_name():
|
||
name=os.path.split(iot_flash_file)[-1]
|
||
log_file=name+'.log'
|
||
return os.path.join(_work_dir,log_file)
|
||
|
||
# 上传的flash镜像 固定存放在work目录
|
||
def calc_upload_name():
|
||
name=os.path.split(iot_flash_file)[-1]
|
||
return os.path.join(_work_dir,name)
|
||
|
||
# 根据flash镜像生成的hex文件 固定存放在work目录
|
||
def calc_hex_name():
|
||
name=os.path.split(iot_flash_file)[-1]
|
||
name=name+'.txt'
|
||
return os.path.join(_work_dir,name)
|
||
|
||
# 上传的flash信息 固定存放在work目录
|
||
def calc_flash_info_name():
|
||
name=os.path.split(iot_flash_file)[-1]
|
||
name=name+'.info'
|
||
return os.path.join(_work_dir,name)
|
||
|
||
|
||
|
||
|
||
# 上传固件
|
||
def upload_fun():
|
||
# 显示flash信息
|
||
ser.write(b"f i\n")
|
||
time.sleep(0.5)
|
||
ser_read_data=ser.read(4096)
|
||
print_device_str(ser_read_data)
|
||
with open(calc_flash_info_name(),mode='w+',encoding='utf-8') as f:
|
||
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
|
||
# 显示image信息
|
||
ser.write(b"f s\n")
|
||
time.sleep(1)
|
||
ser_read_data=ser.read(4096)
|
||
print_device_str(ser_read_data)
|
||
with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f:
|
||
f.writelines(ser_read_data.decode('utf-8').split('\r\n'))
|
||
# 上传整个镜像
|
||
ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8"))
|
||
time.sleep(0.5)
|
||
myprint(ser.read(4096).decode('utf-8'))
|
||
upload_bin(modem,calc_upload_name())
|
||
|
||
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s" )
|
||
myprint("Start transform bin to hex.")
|
||
bin_to_hex_file(calc_upload_name(), calc_hex_name())
|
||
myprint("Transform to hex end.")
|
||
|
||
|
||
# 烧录固件
|
||
def burn_fun():
|
||
time.sleep(3)
|
||
ser_read_data=ser.read(4096)
|
||
print_device_str(ser_read_data)
|
||
ser.baudrate=nb_rate
|
||
|
||
burn_flash_bin(ser, modem, iot_flash_file)
|
||
myprint (f"Total transmission time: {(time_stamp_start+time_stamp_end)} s")
|
||
time.sleep(3)
|
||
ser_read_data=ser.read(4096)
|
||
print_device_str(ser_read_data)
|
||
|
||
|
||
# 配置要上传的镜像配置
|
||
def get_upload_cfg(key:str):
|
||
if(key=='all'):
|
||
return key
|
||
info_list=load_flash_info(calc_flash_info_name())
|
||
# print(info_list)
|
||
for item in info_list:
|
||
index=item["ImgType"].find(key.upper())
|
||
if(index>=0):
|
||
return f"{item['Offset']} {item['Size']}"
|
||
myprint(f'Can not found key:{key}')
|
||
return "all"
|
||
|
||
|
||
|
||
|
||
def global_def():
|
||
global init_str
|
||
global serial_com
|
||
global b_rate
|
||
global nb_rate
|
||
global ram_file
|
||
global iot_flash_file
|
||
global ser
|
||
global time_stamp_start
|
||
global time_stamp_end
|
||
global function_type
|
||
global upload_key
|
||
|
||
arg_num=len(sys.argv)
|
||
if(arg_num<=1):
|
||
return
|
||
|
||
init_str="WQKL"
|
||
serial_com=sys.argv[1]
|
||
b_rate=115200
|
||
nb_rate=1500000
|
||
def set_upload():
|
||
global function_type
|
||
if(sys.argv[3].endswith('.bin')):
|
||
if(os.path.exists(sys.argv[3])):
|
||
# 存在这个文件 是下载
|
||
function_type='download'
|
||
else:
|
||
# 不存在这个文件 是上传
|
||
function_type='upload'
|
||
else:
|
||
print("param err.")
|
||
sys.exit(-1)
|
||
if(arg_num==2):
|
||
if(sys.argv[1].endswith('.bin')):
|
||
function_type='convert'
|
||
elif(arg_num==3):
|
||
if(sys.argv[2].endswith('.bin')):
|
||
function_type='ram'
|
||
elif(arg_num==4):
|
||
set_upload()
|
||
upload_key="all"
|
||
elif(arg_num==5):
|
||
set_upload()
|
||
upload_key=sys.argv[4]
|
||
|
||
if(function_type=='upload'):
|
||
ram_file=f'bootram_{sys.argv[2]}.bin'
|
||
iot_flash_file=sys.argv[3]
|
||
ram_file=os.path.join(bin_path(),ram_file)
|
||
elif(function_type=='download'):
|
||
ram_file=f'{sys.argv[2]}_ram_build.bin'
|
||
iot_flash_file=sys.argv[3]
|
||
ram_file=os.path.join(bin_path(),ram_file)
|
||
elif(function_type=='ram'):
|
||
ram_file=sys.argv[2]
|
||
iot_flash_file=sys.argv[2]
|
||
elif(function_type=='convert'):
|
||
iot_flash_file=sys.argv[1]
|
||
|
||
|
||
def print_help():
|
||
help=f'''
|
||
自动判断上传还是下载:
|
||
{sys.argv[0]} [com] [kl1/kl3] [upload.bin/download.bin] <key>
|
||
下载ram文件:
|
||
{sys.argv[0]} [com] [ram.bin]
|
||
转换bin文件:
|
||
{sys.argv[0]} [file.bin]
|
||
'''
|
||
print(help)
|
||
|
||
# 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载
|
||
# kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] <key>
|
||
# kunlun.py [com] [ram.bin]
|
||
# kunlun.py [file.bin] <enc>
|
||
if __name__ == '__main__':
|
||
function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram,"convert"" 转换bin
|
||
time_stamp_start, time_stamp_end = 0, 0
|
||
init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None
|
||
upload_key=None
|
||
global_def()
|
||
if(function_type is None):
|
||
print("param err.")
|
||
print_help()
|
||
sys.exit(-1)
|
||
log_init(calc_log_file_name())
|
||
if(function_type=='convert'):
|
||
bin_to_hex_file(iot_flash_file,calc_hex_name())
|
||
sys.exit(0)
|
||
|
||
try:
|
||
ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3)
|
||
except Exception:
|
||
myprint(f"serial com:{serial_com} open failed.")
|
||
sys.exit(-1)
|
||
modem = xmodem.XMODEM(getc, putc, mode='xmodem1k')
|
||
|
||
# 发送启动字符让设备进入xmodem模式
|
||
init_send(ser, init_str)
|
||
burn_ram_bin(modem, ram_file)
|
||
|
||
if(function_type=='upload'):
|
||
upload_fun()
|
||
elif(function_type=='download'):
|
||
iot_flash_file=bin_file_decrypt(iot_flash_file)
|
||
burn_fun()
|
||
clear_tmp()
|
||
elif(function_type=='ram'):
|
||
recv_ser_data(ser)
|
||
read_input(ser)
|
||
|
||
# if __name__ == "__main__":
|
||
# read_input(None)
|