387 lines
11 KiB
Python
387 lines
11 KiB
Python
import json
|
|
import csv
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import datetime
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import sys
|
|
import pymysql as mysql
|
|
from PyQt5.QtCore import *
|
|
from PyQt5.QtGui import *
|
|
from PyQt5.QtWidgets import *
|
|
|
|
|
|
|
|
|
|
|
|
__INFO_NAME="scheme_export_info.txt"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class quest_text(QObject):
|
|
def __init__(self,title:str,info:str):
|
|
QObject.__init__(self)
|
|
# self.app = QApplication(sys.argv)
|
|
self.w=QDialog()
|
|
self.w.resize(320,80)
|
|
self.w.setWindowTitle(title)
|
|
self.w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
|
|
self.w.setWindowModality(Qt.WindowModality.ApplicationModal)
|
|
self.text=info
|
|
self.text_edit = QTextEdit(self.w)
|
|
self.text_edit.setObjectName(u"text")
|
|
self.text_edit.setGeometry(QRect(10, 10, 300, 30))
|
|
self.text_edit.setFrameShape(QFrame.Shape.Box)
|
|
self.text_edit.setMidLineWidth(1)
|
|
self.text_edit.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
|
|
self.text_edit.setPlainText(self.text)
|
|
self.text_edit.moveCursor(QTextCursor.MoveOperation.End, QTextCursor.MoveMode.MoveAnchor)
|
|
self.ok_but_init()
|
|
self.cancel_but_init()
|
|
def ok_but_init(self):
|
|
self.ok_but=QPushButton(self.w)
|
|
self.ok_but.setObjectName(u"ok_but")
|
|
self.ok_but.setGeometry(QRect(210, 50, 90, 28))
|
|
self.ok_but.clicked.connect(self.ok_but_clicked)
|
|
self.ok_but.setText("确认")
|
|
self.ok_but.setCheckable(True)
|
|
self.ok_but.setChecked(True)
|
|
self.ok_state=False
|
|
def cancel_but_init(self):
|
|
self.cancel_but=QPushButton(self.w)
|
|
self.cancel_but.setObjectName(u"cancel_but")
|
|
self.cancel_but.setGeometry(QRect(100, 50, 90, 28))
|
|
self.cancel_but.clicked.connect(self.cancel_but_clicked)
|
|
self.cancel_but.setText("取消")
|
|
self.ok_but.setCheckable(True)
|
|
self.ok_state=False
|
|
def ok_but_clicked(self):
|
|
self.text=self.text_edit.toPlainText()
|
|
self.ok_state=True
|
|
self.w.done(QDialog.DialogCode.Accepted)
|
|
self.w.close()
|
|
def cancel_but_clicked(self):
|
|
self.text=self.text_edit.toPlainText()
|
|
self.ok_state=False
|
|
self.w.done(QDialog.DialogCode.Accepted)
|
|
self.w.close()
|
|
def show(self):
|
|
self.w.exec()
|
|
return self.ok_state,self.text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _connect_sql():
|
|
try:
|
|
db=mysql.connect(host='47.108.119.117',user='ymp',passwd='Abc@12345',port=15319)
|
|
cur=db.cursor()
|
|
cur.execute("use datacenter")
|
|
print("connect mysql success.")
|
|
return db,cur
|
|
except Exception as e:
|
|
print("can not connect service.")
|
|
return None,None
|
|
|
|
|
|
|
|
|
|
# 生成一个任务的参数
|
|
def _scheme_task_to_retinfo(j:json):
|
|
texts=[]
|
|
length=len(j["TestStandard"])
|
|
for i in j["ReturnInfo"]:
|
|
texts.append(i)
|
|
return texts
|
|
|
|
# 生成任务参数序列
|
|
def _scheme_tasks_to_retinfo(j:json):
|
|
# texts=[]
|
|
# for i in j["TaskArray"]:
|
|
# texts+=__scheme_task_to_retinfo(i)
|
|
texts=j["CheckerRtvName"]
|
|
# print(texts)
|
|
return texts
|
|
|
|
|
|
def _json_extract_retinfo(name:str):
|
|
with open(name,"rb") as f:
|
|
json_obj=json.loads(f.read())
|
|
return _scheme_tasks_to_retinfo(json_obj),json_obj["PlanID"]
|
|
return None,None
|
|
|
|
|
|
|
|
def _get_date():
|
|
now_time = datetime.utcnow()
|
|
utc_time = now_time + timedelta(hours=8)
|
|
utc_time = utc_time.strftime("%Y%m%d")
|
|
return utc_time
|
|
|
|
|
|
# 保存设置
|
|
def _save_info(info):
|
|
info_name=__INFO_NAME
|
|
with open(info_name,"w+") as f:
|
|
# print("saving info:",date,id,save_name)
|
|
f.write(info[0]+'\n')
|
|
f.write(str(info[1])+'\n')
|
|
f.write(info[2]+'\n')
|
|
f.write(info[3]+'\n')
|
|
f.write(info[4]+'\n')
|
|
|
|
def _get_info():
|
|
info_name=__INFO_NAME
|
|
if os.path.exists(info_name):
|
|
info_f=open(info_name,'r')
|
|
lines=info_f.readlines()
|
|
date=lines[0].strip()
|
|
id=int(lines[1].strip())
|
|
name=lines[2].strip()
|
|
date_start=lines[3].strip()
|
|
date_end=lines[4].strip()
|
|
info_f.close()
|
|
else:
|
|
date="20230101"
|
|
id=1
|
|
name="方案参数校准"
|
|
date_start="2023-01-01"
|
|
date_end="2023-01-01"
|
|
# print("get info:",date,id,name)
|
|
return (date,id,name,date_start,date_end)
|
|
|
|
# 获取本次生成方案的id号
|
|
def _get_scheme_id():
|
|
info=_get_info()
|
|
date=info[0]
|
|
id=info[1]
|
|
if(date!=_get_date()):
|
|
date=_get_date()
|
|
id=1
|
|
else:
|
|
id+=1
|
|
_save_info((date,id,info[2],info[3],info[4]))
|
|
return id
|
|
|
|
def _reflush_scheme_id(id):
|
|
# 日bit0~bit4 月bit5~bit8 年bit9~bit15
|
|
sid=(id>>0)&0x7f
|
|
modele=(id>>7)&0x1f
|
|
chip=(id>>12)&0xf
|
|
day=(id>>16)&0x1f
|
|
month=(id>>21)&0xf
|
|
year=((id>>25)&0x7f)+2022
|
|
now_time = datetime.utcnow()
|
|
utc_time = now_time + timedelta(hours=8)
|
|
sid=_get_scheme_id()
|
|
day=utc_time.day
|
|
month=utc_time.month
|
|
year=utc_time.year-2022
|
|
id=(sid&0x7f)|((modele&0x1f)<<7)|((chip&0xf)<<12)|((day&0x1f)<<16)|((month&0xf)<<21)|((year&0x7f)<<25)
|
|
return id,sid
|
|
|
|
|
|
def _main():
|
|
save_name="check_data.csv"
|
|
title,=_json_extract_retinfo("代工厂数据采集EJ旧68uF22um08211.json")
|
|
save_f=open(save_name,'+w')
|
|
save_f.write(','.join(title)+'\n')
|
|
err_num=0
|
|
with open("EJ09C测试数据.csv",newline="") as f:
|
|
data=csv.reader(f)
|
|
for row in data:
|
|
if(row[3]=='0'):
|
|
save_f.write(row[4]+'\n')
|
|
else:
|
|
err_num+=1
|
|
print(row)
|
|
print("检测异常数量为:",err_num)
|
|
|
|
|
|
class sch_data(object):
|
|
def __init__(self):
|
|
pass
|
|
# self.scheme_name=""
|
|
# self.date_start=""
|
|
# self.date_end=""
|
|
def select_scheme(self):
|
|
fileName,fileType = QFileDialog.getOpenFileNames(None, "选取文件", os.getcwd(),
|
|
"检测方案(*.json)")
|
|
if(len(fileName)==0):
|
|
print("user cancelled.")
|
|
return False
|
|
self.scheme_name=fileName[0]
|
|
print(fileName,fileType)
|
|
info=_get_info()
|
|
quest=quest_text("请输入要获取数据的日期",info[3]+','+info[4])
|
|
ack,date_interval=quest.show()
|
|
if(ack!=True):
|
|
print("user cancelled.")
|
|
return False
|
|
date_interval=date_interval.split(',')
|
|
self.date_start=date_interval[0]+" 00:00:00"
|
|
self.date_end=date_interval[1]+" 23:59:59"
|
|
_save_info((info[0],info[1],info[2],date_interval[0],date_interval[1]))
|
|
return True
|
|
def datas_sql(self):
|
|
if(self.scheme_name==None):
|
|
return None,None
|
|
if(self.date_start==None) or (self.date_end==None):
|
|
return None,None
|
|
scheme_name=self.scheme_name
|
|
title,scheme_id=_json_extract_retinfo(scheme_name)
|
|
data_list=[]
|
|
for i in title:
|
|
t=[]
|
|
data_list.append(t)
|
|
db,cur=_connect_sql()
|
|
if(db!=None):
|
|
cmd="""SELECT id,create_time,addr,err_code,run,valuez FROM check_result_detail
|
|
where create_time > %s and create_time < %s
|
|
and result_id in (select pk from check_result where plan_id =%s) ;"""
|
|
cur.execute(cmd,(self.date_start,self.date_end,scheme_id))
|
|
check_data=cur.fetchall()
|
|
for row in check_data:
|
|
# 这里过滤检测异常的结果
|
|
# if(row[3]==0):
|
|
err=str(row[3])
|
|
if((err=='3') or (err=='20') or (err=='208') or (err=='255')):
|
|
continue
|
|
s=row[5].split(',')
|
|
for i in range(len(s)):
|
|
data_list[i].append(int(s[i]))
|
|
self._check_data=check_data
|
|
self._titles=title
|
|
return title,data_list,len(check_data)
|
|
|
|
def export_check_data(self,path:str):
|
|
if(self._check_data==None):
|
|
return
|
|
if(self._titles==None):
|
|
return
|
|
if not os.path.exists(path):
|
|
os.makedirs(path)
|
|
save_name=self.scheme_name.split('/')[-1].split('.')[0]
|
|
save_name=os.path.join(path,save_name)
|
|
with open(save_name+".csv","w+") as f:
|
|
title_list=["时间","错误码","执行结果"]
|
|
title_list+=self._titles
|
|
title_list.append('\n')
|
|
s=','.join(title_list)
|
|
f.write(s)
|
|
for row in self._check_data:
|
|
time=row[1].strftime("%Y-%m-%d %H:%M:%S")
|
|
err=str(row[3])
|
|
run="=\""+row[4]+"\""
|
|
data=row[5]
|
|
s=','.join([time,err,run,data,'\n'])
|
|
f.write(s)
|
|
print("export check data end.")
|
|
|
|
|
|
def export_scheme(self,path:str,ret_limit):
|
|
if not os.path.exists(path):
|
|
os.makedirs(path)
|
|
name=self.scheme_name
|
|
info=_get_info()
|
|
date=info[0]
|
|
id=info[1]
|
|
save_name=name.split('/')[-1].split('.')[0]
|
|
quest=quest_text("请输入要导出的文件名",save_name)
|
|
ack,save_name=quest.show()
|
|
if(ack!=True):
|
|
print("user cancelled.")
|
|
return
|
|
_save_info((date,id,save_name,info[3],info[4]))
|
|
index=0
|
|
with open(name,"r",encoding="utf-8") as f:
|
|
json_obj=json.loads(f.read())
|
|
for i in json_obj["TaskArray"]:
|
|
for j in i["TestStandard"]:
|
|
j["Max"]=ret_limit[index][0]
|
|
j["Min"]=ret_limit[index][1]
|
|
index+=1
|
|
json_obj["PlanID"],sid=_reflush_scheme_id(json_obj["PlanID"])
|
|
# 刷新方案id号之后更新日期
|
|
date=_get_info()[0]
|
|
# 保存到指定路径下
|
|
save_name=os.path.join(path,save_name)
|
|
save_name=save_name+"_"+date+"{d:02d}".format(d=sid)
|
|
json_obj["PlanBrief"]=save_name
|
|
json_str=json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '),ensure_ascii=False)
|
|
save_f=open(save_name+".json",'+wb')
|
|
save_f.write(json_str.encode("utf-8"))
|
|
save_f.close()
|
|
print("scheme export end.")
|
|
|
|
|
|
|
|
|
|
# 显示所有方案文件的方案id
|
|
|
|
|
|
# 找到指定后缀的文件
|
|
def find_type(path:str,fix:str):
|
|
# path = os.getcwd()
|
|
# print("path",path)
|
|
list=os.listdir(path)
|
|
# print("list",list)
|
|
file_list=[]
|
|
for i in list:
|
|
ps=os.path.join(path, i)
|
|
if os.path.isdir(ps):
|
|
# print("path join",ps)
|
|
file_list+=find_type(ps,fix)
|
|
else:
|
|
if(ps[-len(fix):]==fix):
|
|
file_list.append(ps)
|
|
# print("file_list", file_list)
|
|
return file_list
|
|
|
|
|
|
|
|
def show_scheme_id(file_list):
|
|
id_list=[]
|
|
for i in file_list:
|
|
with open(i,"rb") as f:
|
|
json_obj=json.loads(f.read())
|
|
try:
|
|
id_list.append((i,json_obj["PlanID"]))
|
|
except Exception as e:
|
|
id_list.append((i,0))
|
|
return id_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if(len(sys.argv)>=3):
|
|
path=sys.argv[1]
|
|
file_list=find_type(path,'.json')
|
|
id_list=show_scheme_id(file_list)
|
|
for i in id_list:
|
|
if(int(i[1])==int(sys.argv[2])):
|
|
print("-->",i[1],i[0])
|
|
else:
|
|
print(" ",i[1],i[0])
|
|
else:
|
|
print("python scheme_data.py [path] [plan_id]")
|