updata.py 后台下载服务器文件

updata 文件列表添加右键菜单,查看文件详情
This commit is contained in:
andy
2023-10-29 09:19:11 +08:00
parent ca66d05f2d
commit 2660834312
4 changed files with 270 additions and 28 deletions

View File

@@ -92,4 +92,7 @@
每次赋码结果采用不同的颜色
2023.10.27
修改updata工具按钮描述
2023.10.28
updata.py 后台下载服务器文件
2023.10.29
updata 文件列表添加右键菜单,查看文件详情

144
updata/file_detail.py Normal file
View File

@@ -0,0 +1,144 @@
import os
import sys
import json
# a=os.path.split("E:\\abc/def.bin")
# a=os.path.splitext("E:\\abc/def/")
# print(a)
def _bytes_to_int(d:bytearray):
ret=0
num=len(d)
for i in d:
ret>>=8
ret|=i<<(8*(num-1))
return ret
def _bytes_to_str(d:bytearray):
d=d.replace(b'\0xff',b'\0x00')
text=d.strip(b'\0x00').decode("utf-8")
return text
def _detail_elf(name:str):
with open(name,"rb") as f:
data=f.read()
off_index=16+4*4
sh_off=_bytes_to_int(data[off_index:off_index+4])
off_index+=2*4+3*2
sh_size=_bytes_to_int(data[off_index:off_index+2])
off_index+=2
sh_num=_bytes_to_int(data[off_index:off_index+2])
addr=sh_off+sh_num*sh_size
size=len(data)
ret=[" |文件类型:批检仪/赋码仪主板程序"]
ret+=[" |打包文件为:"]
while (addr<size):
d_size=_bytes_to_int(data[addr:addr+4])
item_name=data[addr+4:addr+256].decode("utf-8")
ret.append(" |-"+item_name)
addr+=d_size
def get_file(name:str):
addr=sh_off+sh_num*sh_size
while (addr<size):
d_size=_bytes_to_int(data[addr:addr+4])
item_name=_bytes_to_str(data[addr+4:addr+256])
if(item_name==name):
return data[addr+256:addr+d_size]
addr+=d_size
return bytearray()
def decode_info():
d=get_file("info.json")
j=json.loads(d)
ret.append(" |编译时间:{d}".format(d=j["build_date"]))
ret.append(" |软件版本:{d}".format(d=j["soft_version"]))
decode_info()
return ret
def _detail_axf(name:str):
ret=[" |文件类型:批检仪/赋码仪主板协处理器程序"]
return ret
def _detail_bin(name:str):
ret=[" |文件类型:批检仪/赋码仪小板程序"]
return ret
def _detail_json(name:str):
file_name=os.path.split(name)
def scheme_decode_id(id:int):
# 日bit0~bit4 月bit5~bit8 年bit9~bit15
sid=(id>>0)&0x7f
model=(id>>7)&0x1f
chip=(id>>12)&0xf
day=(id>>16)&0x1f
month=(id>>21)&0xf
year=((id>>25)&0x7f)+2022
s="{y}-{m}-{d},chip:{chip},model:{model},id:{id}".format(y=year,m=month,d=day,chip=chip,model=model,id=sid)
return s
with open(name,"rb") as f:
j=json.loads(f.read())
if(file_name!="cfg.json"):
ret=[" |文件类型:检测方案文件"]
ret.append(" |方案描述:{d}".format(d=j["PlanBrief"]))
ret.append(" |方案ID:{id}".format(id=j["PlanID"]))
ret.append(" |方案ID解析:{s}".format(s=scheme_decode_id(j["PlanID"])))
else:
ret=[" |文件类型:批检仪/赋码仪主板配置文件"]
return ret
def _detail_sh(name:str):
ret=[" |文件类型:批检仪/赋码仪主板程序启动脚本"]
return ret
def _detail_lua(name:str):
ret=[" |文件类型:批检仪/赋码仪检测结果判定脚本"]
return ret
def _detail_pkt(name:str):
ret=[" |文件类型:MCU程序在线升级文件"]
with open(name,"rb") as f:
data=f.read()
ret.append(" |打包时间:{d}".format(d=_bytes_to_str(data[4:4+20])))
ret.append(" |主机接口:{d}".format(d=_bytes_to_str(data[40:40+8])))
ret.append(" |设备类型:{d}".format(d=_bytes_to_str(data[48:48+12])))
return ret
def _detail_py(name:str):
return ["暂未实现"]
def _detail_service(name:str):
return ["暂未实现"]
def _detail_dtb(name:str):
return ["暂未实现"]
_fun_table={
".elf":_detail_elf,
".axf":_detail_axf,
".bin":_detail_bin,
".json":_detail_json,
".sh":_detail_sh,
".lua":_detail_lua,
".pkt":_detail_pkt,
".py":_detail_py,
".service":_detail_service,
".dtb":_detail_dtb
}
def detail(path:str):
tailfix=os.path.splitext(path)
file_name=os.path.split(path)
fun=_fun_table[tailfix[-1]]
ret=[file_name[-1]]
if(fun is not None):
ret+=fun(path)
return ret
ret+=["未找到此类文件的解析器"]
return ret
if __name__ == "__main__":
print(_bytes_to_int([0x12,0x34,0x56,0x78]))
print(_bytes_to_int([0x12,0x34]))

48
updata/float_lable.py Normal file
View File

@@ -0,0 +1,48 @@
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import sys
class floatBox(QWidget):
def __init__(self,parent:QWidget=None,items:list=None,x:int=0,y:int=0):
QWidget.__init__(self,parent)
self.setMouseTracking(True)
self.setWindowFlags(Qt.WindowType.FramelessWindowHint|Qt.WindowType.NoDropShadowWindowHint|Qt.WindowType.Popup)
# self.setWindowFlags(Qt.WindowType.FramelessWindowHint|Qt.WindowType.Popup)
self.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
# self.setBackgroundRole()
self.setStyleSheet ("background-color: rgb(245, 180, 245);")
y_size=self._add_items(items)
x_size=500
desk=QApplication.desktop()
if(y+y_size>desk.height()):
y=desk.height()-y_size
if(x+x_size>desk.width()):
x=desk.width()-x_size
self.setGeometry(QRect(x, y, x_size, y_size))
def _add_items(self,items:list):
if(items is None):
items=["没有要显示的提示"]
for i in range(len(items)):
lable=QLabel(self)
lable.setText(items[i])
lable.setGeometry(QRect(5,5+i*25,500,25))
return len(items)*25+10
if __name__ == "__main__":
app = QApplication(sys.argv)
widget = QWidget()
widget.resize(870, 410)
widget.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
label=floatBox(items=["aaaaaa","bbbbbbbb","cccccccc"])
widget.destroyed.connect(label.close)
widget.show()
label.show()
a=app.exec()

View File

@@ -18,6 +18,8 @@ import select_list
import dhcp.dhcp as dhcp
import mysql
import console_uart
import file_detail
import float_lable
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID("myappid")
@@ -104,7 +106,8 @@ class updata_dlg(QObject):
rate_signal =pyqtSignal([str,int])
# 结束信号ip成败描述
end_signal = pyqtSignal([str,bool,str])
# 数据库下载文件结束信号
sql_download_end_signal =pyqtSignal([])
def __init__(self):
QObject.__init__(self)
@@ -136,9 +139,11 @@ class updata_dlg(QObject):
self.ip_prefix_init()
self.ip_hand_init()
self.channel_init()
self.widget.destroyed.connect(self.quit)
self.scan_file()
self.scan_slave()
self.infotext_init()
self.widget.destroyed.connect(self.quit)
self.sql_download_end_signal.connect(self.scan_file)
Logo = QPixmap()
Logo.loadFromData(base64.b64decode(memory_pic.icon_ico))
@@ -165,7 +170,42 @@ class updata_dlg(QObject):
self.file_list_label.setText("默认文件列表(选中以发送):")
self.file_list_label.setToolTip("请选择要升级的文件,只有选中的文件会发送到主板中。")
self.file_list.setToolTip("请选择要升级的文件,只有选中的文件会发送到主板中。")
# self.file_list_label.setToolTipDuration(1)
ack=self.file_list.setProperty("contextMenuPolicy",Qt.ContextMenuPolicy.CustomContextMenu)
self.file_list.customContextMenuRequested.connect(self.file_item_menu_slot)
def file_item_menu_slot(self,pos:QPoint):
print("show pop menu")
menu=QMenu(self.file_list)
action=QAction("删除选中条目",self.file_list)
action.triggered.connect(self.file_item_delete)
menu.addAction(action)
action=QAction("查看选中条目详情",self.file_list)
action.triggered.connect(self.file_item_detail)
menu.addAction(action)
action=QAction("发送选中条目到主板",self.file_list)
action.triggered.connect(self.save_but_clicked)
menu.addAction(action)
menu.addAction(action)
action=QAction("刷新文件列表",self.file_list)
action.triggered.connect(self.scan_file)
menu.addAction(action)
menu.exec(self.file_list.mapToGlobal(pos))
def file_item_delete(self):
print("delete slected items.")
items=self.file_list.selectedItems()
for i in items:
os.remove("file\\"+i.text())
self.scan_file()
def file_item_detail(self):
print("show detail with slected items.")
items=self.file_list.selectedItems()
details=[]
for i in items:
details+=file_detail.detail("file/"+i.text())
if(len(details)==0):
return
pos=QCursor.pos()
lab=float_lable.floatBox(self.widget,details,x=pos.x(),y=pos.y())
lab.show()
# 初始化从机列表
def slave_list_init(self):
@@ -260,7 +300,7 @@ class updata_dlg(QObject):
self.refresh_but.setObjectName(u"save_but")
self.refresh_but.setGeometry(QRect(590, self.but_y, 150, 28))
self.but_y+=self.but_y_step
self.refresh_but.setText("刷新IP地址")
self.refresh_but.setText("刷新主板IP地址")
self.refresh_but.clicked.connect(self.refresh_but_clicked)
self.refresh_but.setToolTip("点击此按钮刷新主板列表。")
# self.refresh_but.setToolTipDuration(1)
@@ -308,6 +348,17 @@ class updata_dlg(QObject):
self.download_but.setText("从服务器下载文件")
self.download_but.clicked.connect(self.download_but_clicked)
# 提示信息
def infotext_init(self):
self.infotext=QLabel(self.widget)
self.infotext.setGeometry(QRect(25, 380, 800, 15))
self.infotext.setText("欢迎使用云铭公司产线设备维护工具.")
def set_infotext(self,text:str):
try:
self.infotext.setText(text)
except Exception as e:
pass
# ip前缀
def ip_prefix_init(self):
self.ip_prefix = QLineEdit(self.widget)
@@ -378,7 +429,7 @@ class updata_dlg(QObject):
if(sp in types):
file_list.append(i.text())
if(len(file_list)!=1):
self.show_msg("请选择一个并且只选择一个 "+str(types)+" 文件")
self.set_infotext("请选择一个并且只选择一个 "+str(types)+" 文件")
return ""
return file_list[0]
@@ -387,7 +438,7 @@ class updata_dlg(QObject):
file_list=[]
items=self.file_list.selectedItems()
if(len(items)==0):
self.show_msg("请选择至少一个文件")
self.set_infotext("请选择至少一个文件")
return []
have_elf=False
have_bin=False
@@ -395,17 +446,17 @@ class updata_dlg(QObject):
for i in items:
if(i.text()[-4:]==".elf"):
if(have_elf==True):
self.show_msg("只可选择一个 .elf 文件")
self.set_infotext("只可选择一个 .elf 文件")
return []
have_elf=True
if(i.text()[-4:]==".bin"):
if(have_bin==True):
self.show_msg("只可选择一个 .bin 文件")
self.set_infotext("只可选择一个 .bin 文件")
return []
have_bin=True
if(i.text()[-4:]==".lua"):
if(have_lua==True):
self.show_msg("只可选择一个 .lua 文件")
self.set_infotext("只可选择一个 .lua 文件")
return []
have_lua=True
file_list.append(i.text())
@@ -415,7 +466,7 @@ class updata_dlg(QObject):
slave_list=[]
items=self.slave_list.selectedItems()
if(len(items)==0):
self.show_msg("请选择至少一个ip地址")
self.set_infotext("请选择至少一个主板ip地址")
return []
for i in items:
str_list=i.text().split(",")
@@ -632,10 +683,16 @@ class updata_dlg(QObject):
print("not select any item,break.")
return
item=item.split("|")
file_name=sql.download(int(item[0]))
self.set_infotext("正在从服务器下载文件,可能需要一些时间,请耐心等待.")
t = threading.Thread(target=self.sql_download, args=(sql,int(item[0]),))
t.start()
# 从数据库下载文件,在后台运行
def sql_download(self,sql:mysql.sql,index:int):
file_name=sql.download(index)
dst="file/"+file_name.split("/")[-1]
shutil.copy(file_name,dst)
self.scan_file()
self.sql_download_end_signal.emit()
self.set_infotext("已下载文件:"+dst)
# 开始运行
def run(self):
@@ -644,7 +701,6 @@ class updata_dlg(QObject):
print("run end.")
if(self.dhcp_server!=None):
self.dhcp_server.close()
# sys.exit(a)
# 扫描文件
def scan_file(self):
@@ -653,24 +709,15 @@ class updata_dlg(QObject):
# 扫描从机
def scan_slave(self):
self.set_infotext("正在刷新主板IP地址")
u=udp.myudp(1,255)
ip_prefix=self.ip_prefix.text()
u.find(ip_prefix)
ip_list=u.dst_ip_list
# if(len(ip_list)==0):
# ret=self.show_question("提示","未扫描到从机,是否自动添加?")
# if(ret==True):
# ip_list.append((ip_prefix+".81","local_id_1"))
# ip_list.append((ip_prefix+".82","local_id_2"))
# ip_list.append((ip_prefix+".83","local_id_3"))
# ip_list.append((ip_prefix+".84","local_id_4"))
# ip_list.append((ip_prefix+".85","local_id_5"))
# ip_list.append((ip_prefix+".86","local_id_6"))
# ip_list.append((ip_prefix+".87","local_id_7"))
# ip_list.append((ip_prefix+".88","local_id_8"))
list_str=[]
for i in u.dst_ip_list:
list_str.append(i[0]+','+i[1])
if(len(list_str)==0):
self.set_infotext("未找到主板IP地址,请确保主板已正常连接并运行")
self.slave_list.addItems(list_str)
# 添加单个从机地址