Files
python_tools/analysis/scheme_data.py

298 lines
8.7 KiB
Python

import json
import csv
import matplotlib.pyplot as plt
import numpy as np
import datetime
from datetime import datetime, timedelta
import os
import sys
import pymysql as mysql
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
__INFO_NAME="scheme_export_info.txt"
class quest_text(QObject):
def __init__(self,title:str,info:str):
QObject.__init__(self)
# self.app = QApplication(sys.argv)
self.w=QDialog()
self.w.resize(320,80)
self.w.setWindowTitle(title)
self.w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.w.setWindowModality(Qt.WindowModality.ApplicationModal)
self.text=info
self.text_edit = QTextEdit(self.w)
self.text_edit.setObjectName(u"text")
self.text_edit.setGeometry(QRect(10, 10, 300, 30))
self.text_edit.setFrameShape(QFrame.Shape.Box)
self.text_edit.setMidLineWidth(1)
self.text_edit.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
self.text_edit.setPlainText(self.text)
self.text_edit.moveCursor(QTextCursor.MoveOperation.End, QTextCursor.MoveMode.MoveAnchor)
self.ok_but_init()
self.cancel_but_init()
def ok_but_init(self):
self.ok_but=QPushButton(self.w)
self.ok_but.setObjectName(u"ok_but")
self.ok_but.setGeometry(QRect(210, 50, 90, 28))
self.ok_but.clicked.connect(self.ok_but_clicked)
self.ok_but.setText("确认")
self.ok_but.setCheckable(True)
self.ok_but.setChecked(True)
self.ok_state=False
def cancel_but_init(self):
self.cancel_but=QPushButton(self.w)
self.cancel_but.setObjectName(u"cancel_but")
self.cancel_but.setGeometry(QRect(100, 50, 90, 28))
self.cancel_but.clicked.connect(self.cancel_but_clicked)
self.cancel_but.setText("取消")
self.ok_but.setCheckable(True)
self.ok_state=False
def ok_but_clicked(self):
self.text=self.text_edit.toPlainText()
self.ok_state=True
self.w.done(QDialog.DialogCode.Accepted)
self.w.close()
def cancel_but_clicked(self):
self.text=self.text_edit.toPlainText()
self.ok_state=False
self.w.done(QDialog.DialogCode.Accepted)
self.w.close()
def show(self):
self.w.exec()
return self.ok_state,self.text
def _connect_sql():
try:
db=mysql.connect(host='47.108.119.117',user='ymp',passwd='Abc@12345',port=15319)
cur=db.cursor()
cur.execute("use datacenter")
print("connect mysql success.")
return db,cur
except Exception as e:
print("can not connect service.")
return None,None
# 生成一个任务的参数
def _scheme_task_to_retinfo(j:json):
texts=[]
length=len(j["TestStandard"])
for i in j["ReturnInfo"]:
texts.append(i)
return texts
# 生成任务参数序列
def _scheme_tasks_to_retinfo(j:json):
# texts=[]
# for i in j["TaskArray"]:
# texts+=__scheme_task_to_retinfo(i)
texts=j["CheckerRtvName"]
# print(texts)
return texts
def _json_extract_retinfo(name:str):
with open(name,"rb") as f:
json_obj=json.loads(f.read())
return _scheme_tasks_to_retinfo(json_obj),json_obj["PlanID"]
return None,None
def _get_date():
now_time = datetime.utcnow()
utc_time = now_time + timedelta(hours=8)
utc_time = utc_time.strftime("%Y%m%d")
return utc_time
# 保存设置
def _save_info(info):
info_name=__INFO_NAME
with open(info_name,"w+") as f:
# print("saving info:",date,id,save_name)
f.write(info[0]+'\n')
f.write(str(info[1])+'\n')
f.write(info[2]+'\n')
f.write(info[3]+'\n')
f.write(info[4]+'\n')
def _get_info():
info_name=__INFO_NAME
if os.path.exists(info_name):
info_f=open(info_name,'r')
lines=info_f.readlines()
date=lines[0].strip()
id=int(lines[1].strip())
name=lines[2].strip()
date_start=lines[3].strip()
date_end=lines[4].strip()
info_f.close()
else:
date="20230101"
id=1
name="方案参数校准"
date_start="2023-01-01"
date_end="2023-01-01"
# print("get info:",date,id,name)
return (date,id,name,date_start,date_end)
# 获取本次生成方案的id号
def _get_scheme_id():
info=_get_info()
date=info[0]
id=info[1]
if(date!=_get_date()):
date=_get_date()
id=1
else:
id+=1
_save_info((date,id,info[2],info[3],info[4]))
return id
def _reflush_scheme_id(id):
# 日bit0~bit4 月bit5~bit8 年bit9~bit15
sid=(id>>0)&0x7f
modele=(id>>7)&0x1f
chip=(id>>12)&0xf
day=(id>>16)&0x1f
month=(id>>21)&0xf
year=((id>>25)&0x7f)+2022
now_time = datetime.utcnow()
utc_time = now_time + timedelta(hours=8)
sid=_get_scheme_id()
day=utc_time.day
month=utc_time.month
year=utc_time.year-2022
id=(sid&0x7f)|((modele&0x1f)<<7)|((chip&0xf)<<12)|((day&0x1f)<<16)|((month&0xf)<<21)|((year&0x7f)<<25)
return id,sid
def _main():
save_name="check_data.csv"
title,=_json_extract_retinfo("代工厂数据采集EJ旧68uF22um08211.json")
save_f=open(save_name,'+w')
save_f.write(','.join(title)+'\n')
err_num=0
with open("EJ09C测试数据.csv",newline="") as f:
data=csv.reader(f)
for row in data:
if(row[3]=='0'):
save_f.write(row[4]+'\n')
else:
err_num+=1
print(row)
print("检测异常数量为:",err_num)
class sch_data(object):
def __init__(self):
pass
# self.scheme_name=""
# self.date_start=""
# self.date_end=""
def select_scheme(self):
fileName,fileType = QFileDialog.getOpenFileNames(None, "选取文件", os.getcwd(),
"检测方案(*.json)")
self.scheme_name=fileName[0]
if(len(fileName)==0):
print("user cancelled.")
return False
print(fileName,fileType)
info=_get_info()
quest=quest_text("请输入要获取数据的日期",info[3]+','+info[4])
ack,date_interval=quest.show()
if(ack!=True):
print("user cancelled.")
return False
date_interval=date_interval.split(',')
self.date_start=date_interval[0]+" 00:00:00"
self.date_end=date_interval[1]+" 23:59:59"
_save_info((info[0],info[1],info[2],date_interval[0],date_interval[1]))
return True
def datas_sql(self):
if(self.scheme_name==None):
return None,None
if(self.date_start==None) or (self.date_end==None):
return None,None
scheme_name=self.scheme_name
title,scheme_id=_json_extract_retinfo(scheme_name)
data_list=[]
for i in title:
t=[]
data_list.append(t)
db,cur=_connect_sql()
if(db!=None):
cmd="""SELECT id,create_time,addr,err_code,valuez FROM check_result_detail
where create_time > %s and create_time < %s
and result_id in (select pk from check_result where plan_id =%s) ;"""
cur.execute(cmd,(self.date_start,self.date_end,scheme_id))
check_data=cur.fetchall()
for row in check_data:
if(row[3]==0):
s=row[4].split(',')
for i in range(len(s)):
data_list[i].append(int(s[i]))
return title,data_list,len(check_data)
def export_scheme(self,ret_limit):
name=self.scheme_name
info=_get_info()
date=info[0]
id=info[1]
save_name=info[2]
quest=quest_text("请输入要导出的文件名",save_name)
ack,save_name=quest.show()
if(ack!=True):
print("user cancelled.")
return
_save_info((date,id,save_name,info[3],info[4]))
index=0
with open(name,"r",encoding="utf-8") as f:
json_obj=json.loads(f.read())
for i in json_obj["TaskArray"]:
for j in i["TestStandard"]:
j["Max"]=ret_limit[index][0]
j["Min"]=ret_limit[index][1]
index+=1
json_obj["PlanID"],sid=_reflush_scheme_id(json_obj["PlanID"])
# 刷新方案id号之后更新日期
date=_get_info()[0]
save_name=save_name+"_"+date+"{d:02d}".format(d=sid)
json_obj["PlanBrief"]=save_name
json_str=json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '),ensure_ascii=False)
save_f=open(save_name+".json",'+wb')
save_f.write(json_str.encode("utf-8"))
save_f.close()
if __name__ == "__main__":
_main()