实现检测数据分析,上下限自动填充,一键导出方案
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -7,3 +7,4 @@ __pycache__/
|
||||
download/
|
||||
*.csv
|
||||
quest_info.txt
|
||||
scheme_export_info.txt
|
@@ -43,3 +43,5 @@
|
||||
整合python目录的代码
|
||||
2023.9.15
|
||||
新增analysis工具
|
||||
2023.9.17
|
||||
实现对检测数据的分析,自动填充上下限,一键导出方案
|
@@ -1,51 +1,177 @@
|
||||
import json
|
||||
import csv
|
||||
#coding:utf-8
|
||||
|
||||
# 导入matplotlib模块并使用Qt5Agg
|
||||
import typing
|
||||
from PyQt5.QtCore import QObject
|
||||
import matplotlib
|
||||
matplotlib.use('Qt5Agg')
|
||||
# 使用 matplotlib中的FigureCanvas (在使用 Qt5 Backends中 FigureCanvas继承自QtWidgets.QWidget)
|
||||
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
||||
from PyQt5.QtCore import *
|
||||
from PyQt5.QtGui import *
|
||||
from PyQt5.QtWidgets import *
|
||||
import matplotlib.pyplot as plt
|
||||
import sys
|
||||
import scheme_data
|
||||
import numpy as np
|
||||
import threading
|
||||
|
||||
|
||||
# 生成一个任务的参数
|
||||
def scheme_task_to_retinfo(j:json):
|
||||
texts=[]
|
||||
length=len(j["TestStandard"])
|
||||
for i in j["ReturnInfo"]:
|
||||
texts.append(i)
|
||||
return texts
|
||||
|
||||
# 生成任务参数序列
|
||||
def scheme_tasks_to_retinfo(j:json):
|
||||
# texts=[]
|
||||
# for i in j["TaskArray"]:
|
||||
# texts+=scheme_task_to_retinfo(i)
|
||||
texts=j["CheckerRtvName"]
|
||||
print(texts)
|
||||
return texts
|
||||
plt.rcParams['font.sans-serif']=['SimHei']
|
||||
plt.rcParams['axes.unicode_minus']=False
|
||||
|
||||
|
||||
def json_extract_retinfo(name:str):
|
||||
with open(name,"rb") as f:
|
||||
json_obj=json.loads(f.read())
|
||||
return scheme_tasks_to_retinfo(json_obj)
|
||||
# 定义一个可以嵌入到qt的figure
|
||||
class QFigure(QObject):
|
||||
def __init__(self,title:str) -> None:
|
||||
QObject.__init__(self)
|
||||
self.__figure = plt.figure(figsize=(14,7))
|
||||
self.__canvas = FigureCanvas(self.__figure)
|
||||
self.__ax = self.__figure.add_axes([0.1,0.1,0.8,0.8])
|
||||
self.__ax.set_title(title)
|
||||
|
||||
def draw(self,x,y,lable:str=None,limit_max:int=None,limit_min:int=None):
|
||||
line,=self.__ax.plot(x,y,)
|
||||
if(lable!=None):
|
||||
line.set_label(lable)
|
||||
self.__ax.legend()
|
||||
if(limit_max!=None):
|
||||
self.__ax.axhline(y=limit_max,color='g',linestyle='--')
|
||||
if(limit_min!=None):
|
||||
self.__ax.axhline(y=limit_min,color='r',linestyle='--')
|
||||
# start, end = self.__ax.get_ylim()
|
||||
# self.__ax.yaxis.set_ticks(np.arange(start, end,(end-start)/20),minor=True)
|
||||
# ticks=self.__ax.yaxis.get_ticklocs()
|
||||
# self.__ax.yaxis.set_ticks(minor=True)
|
||||
# self.__ax.yaxis.set_ticks(ticks)
|
||||
self.__ax.grid(visible=True,which="major",axis="y")
|
||||
self.__canvas.draw()
|
||||
def lable(self,lablex,labley):
|
||||
self.__ax.set_ylabel(labley)
|
||||
self.__ax.set_xlabel(lablex)
|
||||
def canvas(self):
|
||||
return self.__canvas
|
||||
|
||||
|
||||
# 定义一个任务需要的参数布局
|
||||
class TParamLayout(QObject):
|
||||
def __init__(self) -> None:
|
||||
QObject.__init__(self)
|
||||
self.__layout=QFormLayout()
|
||||
self.__items=[]
|
||||
def add_item(self,name:str,value:str="",tooltip:str=None):
|
||||
line_edit = QLineEdit()
|
||||
line_edit.setObjectName(value)
|
||||
line_edit.setText(value)
|
||||
line_edit_label = QLabel()
|
||||
line_edit_label.setObjectName(name)
|
||||
line_edit_label.setText(name)
|
||||
line_edit_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
if(tooltip!=None):
|
||||
line_edit_label.setToolTip(tooltip)
|
||||
line_edit.setToolTip(tooltip)
|
||||
self.__layout.addRow(line_edit_label,line_edit)
|
||||
self.__items.append((line_edit_label,line_edit))
|
||||
def item_value(self,name:str):
|
||||
for i in self.__items:
|
||||
if(i[0].text()==name):
|
||||
return i[1].text()
|
||||
return None
|
||||
def values(self):
|
||||
vals=[]
|
||||
for i in self.__items:
|
||||
vals.append((i[0].text(),i[1].text()))
|
||||
return vals
|
||||
def layout(self):
|
||||
return self.__layout
|
||||
|
||||
class Analysis(QWidget):
|
||||
def __init__(self,parent=None):
|
||||
super(Analysis,self).__init__(parent)
|
||||
self.setWindowTitle("检测数据分析")
|
||||
|
||||
self.__import_but = QPushButton("导入方案",self)
|
||||
self.__import_but.clicked.connect(self.import_but_clicked)
|
||||
self.__import_but.setGeometry(QRect(0,0,100,27))
|
||||
self.__export_but = QPushButton("导出方案")
|
||||
self.__export_but.clicked.connect(self.export_but_clicked)
|
||||
|
||||
self.__scroll=QScrollArea(self)
|
||||
self.__scroll.setGeometry(0,30,1230,700)
|
||||
# 设置布局
|
||||
self.__layout = QFormLayout()
|
||||
self.__layout.addRow(' ',self.__export_but)
|
||||
self.__items=[]
|
||||
|
||||
def main():
|
||||
save_name="check_data.csv"
|
||||
title=json_extract_retinfo("代工厂数据采集EJ旧68uF22um08211.json")
|
||||
save_f=open(save_name,'+w')
|
||||
save_f.write(','.join(title)+'\n')
|
||||
err_num=0
|
||||
with open("EJ09C测试数据.csv",newline="") as f:
|
||||
data=csv.reader(f)
|
||||
for row in data:
|
||||
if(row[3]=='0'):
|
||||
save_f.write(row[4]+'\n')
|
||||
else:
|
||||
err_num+=1
|
||||
print(row)
|
||||
print("检测异常数量为:",err_num)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
def addItem(self,data,name:str,params:list()):
|
||||
length=len(data)
|
||||
if(length<1000):
|
||||
print("data length too less.")
|
||||
return
|
||||
# 排序,从小到大
|
||||
sort_list=np.sort(data)
|
||||
dat_count=np.bincount(data)
|
||||
avg=round(np.average(data))
|
||||
max=sort_list[-1]
|
||||
min=sort_list[0]
|
||||
mid=sort_list[len(sort_list)//2]
|
||||
std=round(np.std(data))
|
||||
# 把众数和原始值对齐
|
||||
dat_count=dat_count[min:]
|
||||
dat_count_max=np.max(dat_count)
|
||||
dat_zoom=length//dat_count_max
|
||||
dat_count=np.multiply(dat_count,dat_zoom)
|
||||
# 去掉极大值,去掉极小值,使用99.8%的数据来统计区间
|
||||
limit_max_t=sort_list[-(length//1000)]
|
||||
limit_min_t=sort_list[(length//1000)]
|
||||
limit_max=limit_max_t+(limit_max_t-limit_min_t)//5
|
||||
limit_min=limit_min_t-(limit_max_t-limit_min_t)//5
|
||||
|
||||
# 使用3倍标准差来统计区间
|
||||
# limit_max=avg+3*std
|
||||
# limit_min=avg-3*std
|
||||
if(limit_min<0):
|
||||
limit_min=0
|
||||
figure = QFigure(name)
|
||||
figure.lable("序号","数值")
|
||||
tplayout=TParamLayout()
|
||||
for i in params:
|
||||
tplayout.add_item(i+":Max",str(max))
|
||||
tplayout.add_item(i+":Min",str(min))
|
||||
tplayout.add_item(i+":Avg",str(avg))
|
||||
tplayout.add_item(i+":Mid",str(mid))
|
||||
tplayout.add_item(i+":Std",str(std))
|
||||
tplayout.add_item(i+":LimitMax",str(limit_max))
|
||||
tplayout.add_item(i+":LimitMin",str(limit_min))
|
||||
self.__layout.addRow(figure.canvas(),tplayout.layout())
|
||||
figure.draw(range(len(data)),data,lable="原始值",limit_max=limit_max,limit_min=limit_min)
|
||||
y=np.add(range(len(dat_count)),min)
|
||||
figure.draw(dat_count,y,lable="权重")
|
||||
self.__items.append((figure,tplayout))
|
||||
|
||||
# 连接的绘制的方法
|
||||
def import_but_clicked(self):
|
||||
# t = threading.Thread(target=self.__anslysis, args=())
|
||||
# t.start()
|
||||
# def __anslysis(self):
|
||||
self.__import_but.setEnabled(False)
|
||||
widget=QWidget()
|
||||
widget.setGeometry(QRect(0,0,1200,20000))
|
||||
self.titles,datas=scheme_data.datas()
|
||||
for i in range(len(self.titles)):
|
||||
self.addItem(datas[i],self.titles[i],[self.titles[i]])
|
||||
widget.setLayout(self.__layout)
|
||||
self.__scroll.setWidget(widget)
|
||||
def export_but_clicked(self):
|
||||
ret_limit=[]
|
||||
for i,t in zip(self.__items,self.titles):
|
||||
max=i[1].item_value(t+":LimitMax")
|
||||
min=i[1].item_value(t+":LimitMin")
|
||||
ret_limit.append((max,min))
|
||||
scheme_data.export_scheme(ret_limit)
|
||||
# 运行程序
|
||||
if __name__ == '__main__':
|
||||
app = QApplication(sys.argv)
|
||||
main_window = Analysis()
|
||||
main_window.show()
|
||||
app.exec()
|
130
analysis/scheme_data.py
Normal file
130
analysis/scheme_data.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import json
|
||||
import csv
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import datetime
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
|
||||
# 生成一个任务的参数
|
||||
def __scheme_task_to_retinfo(j:json):
|
||||
texts=[]
|
||||
length=len(j["TestStandard"])
|
||||
for i in j["ReturnInfo"]:
|
||||
texts.append(i)
|
||||
return texts
|
||||
|
||||
# 生成任务参数序列
|
||||
def __scheme_tasks_to_retinfo(j:json):
|
||||
# texts=[]
|
||||
# for i in j["TaskArray"]:
|
||||
# texts+=__scheme_task_to_retinfo(i)
|
||||
texts=j["CheckerRtvName"]
|
||||
# print(texts)
|
||||
return texts
|
||||
|
||||
|
||||
def __json_extract_retinfo(name:str):
|
||||
with open(name,"rb") as f:
|
||||
json_obj=json.loads(f.read())
|
||||
return __scheme_tasks_to_retinfo(json_obj)
|
||||
return None
|
||||
|
||||
|
||||
|
||||
def __get_date():
|
||||
now_time = datetime.utcnow()
|
||||
utc_time = now_time + timedelta(hours=8)
|
||||
utc_time = utc_time.strftime("%Y%m%d")
|
||||
return utc_time
|
||||
|
||||
# 获取本次生成方案的id号
|
||||
def __get_scheme_id():
|
||||
info_name="scheme_export_info.txt"
|
||||
id=1
|
||||
if os.path.exists(info_name):
|
||||
info_f=open(info_name,'r')
|
||||
lines=info_f.readlines()
|
||||
if(__get_date()==lines[0].strip()):
|
||||
id=int(lines[1].strip())+1
|
||||
info_f.close()
|
||||
with open(info_name,"w+") as f:
|
||||
f.write(__get_date()+'\n')
|
||||
f.write(str(id)+'\n')
|
||||
return id
|
||||
|
||||
def __reflush_scheme_id(id):
|
||||
# 日bit0~bit4 月bit5~bit8 年bit9~bit15
|
||||
sid=(id>>0)&0x7f
|
||||
modele=(id>>7)&0x1f
|
||||
chip=(id>>12)&0xf
|
||||
day=(id>>16)&0x1f
|
||||
month=(id>>21)&0xf
|
||||
year=((id>>25)&0x7f)+2022
|
||||
now_time = datetime.utcnow()
|
||||
utc_time = now_time + timedelta(hours=8)
|
||||
sid=__get_scheme_id()
|
||||
day=utc_time.day
|
||||
month=utc_time.month
|
||||
year=utc_time.year-2022
|
||||
id=(sid&0x7f)|((modele&0x1f)<<7)|((chip&0xf)<<12)|((day&0x1f)<<16)|((month&0xf)<<21)|((year&0x7f)<<25)
|
||||
return id,sid
|
||||
|
||||
|
||||
def main():
|
||||
save_name="check_data.csv"
|
||||
title=__json_extract_retinfo("代工厂数据采集EJ旧68uF22um08211.json")
|
||||
save_f=open(save_name,'+w')
|
||||
save_f.write(','.join(title)+'\n')
|
||||
err_num=0
|
||||
with open("EJ09C测试数据.csv",newline="") as f:
|
||||
data=csv.reader(f)
|
||||
for row in data:
|
||||
if(row[3]=='0'):
|
||||
save_f.write(row[4]+'\n')
|
||||
else:
|
||||
err_num+=1
|
||||
print(row)
|
||||
print("检测异常数量为:",err_num)
|
||||
|
||||
|
||||
def datas():
|
||||
title=__json_extract_retinfo("代工厂数据采集EJ旧68uF22um08211.json")
|
||||
data_list=[]
|
||||
for i in title:
|
||||
t=[]
|
||||
data_list.append(t)
|
||||
with open("EJ09C测试数据.csv",newline="") as f:
|
||||
data=csv.reader(f)
|
||||
for row in data:
|
||||
if(row[3]=='0'):
|
||||
s=row[4].split(',')
|
||||
for i in range(len(s)):
|
||||
data_list[i].append(int(s[i]))
|
||||
return title,data_list
|
||||
|
||||
def export_scheme(ret_limit):
|
||||
name="代工厂数据采集EJ旧68uF22um08211.json"
|
||||
save_name="代工厂参数标定测试EJ"
|
||||
index=0
|
||||
with open(name,"r",encoding="utf-8") as f:
|
||||
json_obj=json.loads(f.read())
|
||||
for i in json_obj["TaskArray"]:
|
||||
for j in i["TestStandard"]:
|
||||
j["Max"]=ret_limit[index][0]
|
||||
j["Min"]=ret_limit[index][1]
|
||||
json_obj["PlanID"],sid=__reflush_scheme_id(json_obj["PlanID"])
|
||||
save_name=save_name+"_"+__get_date()+"{d:02d}".format(d=sid)
|
||||
json_obj["PlanBrief"]=save_name
|
||||
json_str=json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '),ensure_ascii=False)
|
||||
save_f=open(save_name+".json",'+wb')
|
||||
save_f.write(json_str.encode("utf-8"))
|
||||
save_f.close()
|
||||
if __name__ == "__main__":
|
||||
# main()
|
||||
|
||||
x=[1,2,3,4,5,6,7]
|
||||
avg=np.average(x)
|
||||
print(type(x),type(avg))
|
||||
sub=np.abs(x-avg)
|
||||
print(sub)
|
Reference in New Issue
Block a user