diff --git a/bin/bin_to_hex.py b/bin/bin_to_hex.py index 559c306..5ae87f8 100644 --- a/bin/bin_to_hex.py +++ b/bin/bin_to_hex.py @@ -214,6 +214,18 @@ def img_header_check(data:bytearray): return "V0" return "" +# 判断是不是加密了 +def pkt_encrypt_check(data:bytearray): + magic_data=data[12:16] + pat=magic_data[0] + for i in range(len(magic_data)): + magic_data[i]=magic_data[i]^pat + magic=(magic_data[0]<<24)|(magic_data[1]<<16)|(magic_data[2]<<8)|(magic_data[3]) + if (magic==_PKT_HEADER_MAGIC_NO): + if(pat!=0): + return True + return False + # 转化为hex文本 def bin_to_hex(bin:bytearray,f,oem_file=None): @@ -261,11 +273,11 @@ def bin_to_hex(bin:bytearray,f,oem_file=None): return kunlun -def bin_to_hex_file(bin_file_name:str,hex_file_name:str,enc=0): +def bin_to_hex_file(bin_file_name:str,hex_file_name:str): with open(bin_file_name,mode='rb') as f: bin=bytearray(f.read()) - if(enc): - myprint("start decrypt") + if(pkt_encrypt_check(bin)): + myprint("start decrypt:") pat=(bin[12]^0x00)&0xff myprint(f"decrypt image:{hex(pat)}({_PatternType.get(pat,'unknown')})") for index in range(len(bin)): @@ -291,7 +303,7 @@ def bin_file_decrypt(bin_file:str): with open(bin_file,mode='rb') as f: bin=bytearray(f.read()) # 自动判断是否需要解密 - if ((bin[12]==bin[13])and(bin[12]!=0x00)): + if (pkt_encrypt_check(bin)): pat=(bin[12]^0x00)&0xff myprint(f"decrypt image:{hex(pat)}({_PatternType.get(pat,'unknown')})") for index in range(len(bin)): @@ -313,8 +325,4 @@ def clear_tmp(): # bin_to_hex.py input_file if __name__ == "__main__": - if(len(sys.argv)>=3): - enc=1 - else: - enc=0 - bin_to_hex_file(sys.argv[1],"work/"+sys.argv[1]+".txt",enc) + bin_to_hex_file(sys.argv[1],"work/"+sys.argv[1]+".txt") diff --git a/bin/bootram_kl1_build.bin b/bin/bootram_kl1_build.bin new file mode 100644 index 0000000..9b3f094 Binary files /dev/null and b/bin/bootram_kl1_build.bin differ diff --git a/bin/log.py b/bin/log.py index cc31776..1f52929 100644 --- a/bin/log.py +++ b/bin/log.py @@ -1,5 +1,5 @@ import time - +import sys # 同一个进程中所有调用这个文件的 .py 文件都使用这个变量 @@ -23,6 +23,16 @@ def myprint_dec(func): myprint=myprint_dec(print) +def mywrite(data:str): + if(_log_fp is not None): + txt=data.replace('\r','\n') + txt=txt.replace('\n\n','\n') + txt=txt.replace('\n\n','\n') + _log_fp.write(txt) + _log_fp.flush() + sys.stdout.write(data) + sys.stdout.flush() + def log_init(file_name:str): global _log_fp if _log_fp is None: diff --git a/kunlun.py b/kunlun.py index e460a2f..a48f465 100644 --- a/kunlun.py +++ b/kunlun.py @@ -10,11 +10,14 @@ import time import xmodem import os import binascii +import threading from bin.log import myprint from bin.log import log_init +from bin.log import mywrite from bin.bin_to_hex import bin_to_hex_file from bin.bin_to_hex import bin_file_decrypt from bin.bin_to_hex import clear_tmp +from bin.bin_to_hex import load_flash_info from bin.base import bin_path @@ -117,6 +120,46 @@ def print_device_str(data:bytearray): except Exception as e: myprint("DEVICE:",item) + + +_recv_thread=False +# 读取input +def read_input(s_port:serial.Serial): + global _recv_thread + def read(): + while True: + try: + d=input() + except KeyboardInterrupt: + break + if(d=='exit'): + break + try: + s_port.write(d.encode('utf-8')+b'\n') + except Exception: + break + read() + _recv_thread=False + +# 循环接收串口数据 +def recv_ser_data(s_port:serial.Serial): + global _recv_thread + def recv(s_port:serial.Serial): + global _recv_thread + while _recv_thread: + time.sleep(0.1) + bytes2read = s_port.in_waiting + if(bytes2read>0): + tmp = s_port.read(bytes2read) + try: + txt=tmp.decode('utf-8') + mywrite(txt) + except Exception as e: + pass + _recv_thread=True + t=threading.Thread(target=recv,args=(s_port,)) + t.start() + # 上传固件 def upload_bin(x_modem:xmodem.XMODEM, w_file): global time_stamp_end @@ -193,13 +236,13 @@ def upload_fun(): f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 显示image信息 ser.write(b"f s\n") - time.sleep(0.5) + time.sleep(1) ser_read_data=ser.read(4096) print_device_str(ser_read_data) with open(calc_flash_info_name(),mode='a+',encoding='utf-8') as f: f.writelines(ser_read_data.decode('utf-8').split('\r\n')) # 上传整个镜像 - ser.write(b"fw u d all\n") + ser.write(f"fw u d {get_upload_cfg(upload_key)}\n".encode("utf-8")) time.sleep(0.5) myprint(ser.read(4096).decode('utf-8')) upload_bin(modem,calc_upload_name()) @@ -224,6 +267,21 @@ def burn_fun(): print_device_str(ser_read_data) +# 配置要上传的镜像配置 +def get_upload_cfg(key:str): + if(key=='all'): + return key + info_list=load_flash_info(calc_flash_info_name()) + # print(info_list) + for item in info_list: + index=item["ImgType"].find(key.upper()) + if(index>=0): + return f"{item['Offset']} {item['Size']}" + myprint(f'Can not found key:{key}') + return "all" + + + def global_def(): global init_str @@ -235,78 +293,84 @@ def global_def(): global ser global time_stamp_start global time_stamp_end - global upload - global encrypt + global function_type + global upload_key init_str="WQKL" serial_com=sys.argv[1] b_rate=115200 nb_rate=1500000 - arg_num=len(sys.argv) - - if(arg_num==2): - if(sys.argv[1].endswith('.bin')): - iot_flash_file=sys.argv[1] - return - elif(arg_num==3): - if(sys.argv[1].endswith('.bin')): - iot_flash_file=sys.argv[1] - encrypt=1 - return - elif(arg_num==5): - if(sys.argv[3]=='upload'): - upload=True - elif(sys.argv[3]=='download'): - upload=False - iot_flash_file=sys.argv[4] - elif(arg_num==4): + def set_upload(): + global function_type if(sys.argv[3].endswith('.bin')): if(os.path.exists(sys.argv[3])): # 存在这个文件 是下载 - upload=False + function_type='download' else: # 不存在这个文件 是上传 - upload=True + function_type='upload' else: print("param err.") sys.exit(-1) - iot_flash_file=sys.argv[3] - if(upload): + if(arg_num==2): + if(sys.argv[1].endswith('.bin')): + function_type='convert' + elif(arg_num==3): + if(sys.argv[2].endswith('.bin')): + function_type='ram' + elif(arg_num==4): + set_upload() + upload_key="all" + elif(arg_num==5): + set_upload() + upload_key=sys.argv[4] + + if(function_type=='upload'): ram_file=f'bootram_{sys.argv[2]}.bin' - else: + iot_flash_file=sys.argv[3] + elif(function_type=='download'): ram_file=f'{sys.argv[2]}_ram_build.bin' + iot_flash_file=sys.argv[3] + elif(function_type=='ram'): + ram_file=sys.argv[2] + iot_flash_file=sys.argv[2] + return + elif(function_type=='convert'): + iot_flash_file=sys.argv[1] + return ram_file=os.path.join(bin_path(),ram_file) def print_help(): help=f''' - Automatically determine whether to upload or download:" - {sys.argv[0]} [com] [kl1/kl3] [upload.bin/download.bin] - Manually specify whether to upload or download: - {sys.argv[0]} [com] [kl1/kl3] [upload/download] [upload.bin/download.bin] - Convert bin file: - {sys.argv[0]} [file.bin] + 自动判断上传还是下载: + {sys.argv[0]} [com] [kl1/kl3] [upload.bin/download.bin] + 下载ram文件: + {sys.argv[0]} [com] [ram.bin] + 转换bin文件: + {sys.argv[0]} [file.bin] ''' print(help) # 如果不指定上传还是下载 脚本会根据输入文件是否存在来决定上传还是下载 -# kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] -# kunlun.py [com] [kl1/kl3] [upload/download] [upload.bin/download.bin] +# kunlun.py [com] [kl1/kl3] [upload.bin/download.bin] +# kunlun.py [com] [ram.bin] # kunlun.py [file.bin] if __name__ == '__main__': - upload=None + function_type=None # 功能类型 "download" 下载,"upload" 上传,"ram" 下载ram,"convert"" 转换bin time_stamp_start, time_stamp_end = 0, 0 init_str, serial_com, b_rate, nb_rate, ram_file, iot_flash_file, ser = None, None, None, None, None, None, None - encrypt=0 + upload_key=None global_def() - if(iot_flash_file is None): + if(function_type is None): print("param err.") print_help() sys.exit(-1) log_init(calc_log_file_name()) - if(ram_file is None): - bin_to_hex_file(iot_flash_file,calc_hex_name(),encrypt) + if(function_type=='convert'): + bin_to_hex_file(iot_flash_file,calc_hex_name()) sys.exit(0) + try: ser = serial.Serial(port=serial_com, baudrate=b_rate, timeout=0.3) except Exception: @@ -318,9 +382,15 @@ if __name__ == '__main__': init_send(ser, init_str) burn_ram_bin(modem, ram_file) - if(upload): + if(function_type=='upload'): upload_fun() - else: + elif(function_type=='download'): iot_flash_file=bin_file_decrypt(iot_flash_file) burn_fun() clear_tmp() + elif(function_type=='ram'): + recv_ser_data(ser) + read_input(ser) + +# if __name__ == "__main__": +# read_input(None)