import os import sys import datetime from datetime import datetime, timedelta import prottcp as prot from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * import threading import time import serial import serial.tools.list_ports import coder_params as cpars import coder_check as chk # 批检仪赋码上位机 STR_RED="\033[1;31m" STR_BLUE="\033[1;34m" STR_END="\033[0m" # 获取北京时间 def get_date(): now_time = datetime.utcnow() utc_time = now_time + timedelta(hours=8) # UTC只是比北京时间提前了8个小时 utc_time = utc_time.strftime("%Y%m%d") return utc_time # 获取北京时间 def get_time(): now_time = datetime.utcnow() utc_time = now_time + timedelta(hours=8) # UTC只是比北京时间提前了8个小时 utc_time = utc_time.strftime("%Y-%m-%d %H:%M:%S") return utc_time class coder(QObject): def __init__(self) -> None: QObject.__init__(self) self.app = QApplication(sys.argv) self.code_id=0 self.run_times=0 self.slave_num=10 self.code_list_lock=[] self.save_file="code_list.csv" self.ser_is_open = False self.recv_handler=None self.autoinc_id=False self.openignore_flag=False self.autotest_is_open=0 self.uid_length=16 self.code_list_backcolor=False self.ser=prot.protu() self.widget = QWidget() self.widget.resize(1500, 900) self.widget.setWindowTitle("批检仪赋码工具") self.com_but_init() self.combsp_init() self.com_init() self.code_list_init() self.dot_fat_init() self.channel_init() self.sig_list_init() self.moterup_init() self.infotext_init() self.autoinc_init() self.check_but_init() self.posend_but_init() self.autotest_but_init() self.stop_but_init() self.code_but_init() self.moterdown_init() self.openignore_init() self.recv_handler_table_init() self.device_type_init() self.widget.destroyed.connect(self.quit) def quit(self): self.close_serial() print("app exit.") # 程序退出 sys.exit(1) # 初始化 def init(self,detonator_fat:bytearray,chip_fat:bytearray,signature_code:bytearray,slave_num:int): self.set_params(detonator_fat,chip_fat,signature_code,slave_num) date=get_date().encode('utf-8') self.date=date[-5:] self.year=date[:4] self.save_file=date.decode("utf-8")+".csv" if(os.path.exists(self.save_file)): with open(self.save_file) as f: lines=f.readlines() length=len(lines) self.code_id=length for i in lines: self.code_list_add(i) print("code id start as:",self.code_id) self.code_list_to_bottom() # 设置雷管厂,芯片厂,特征码 def set_params(self,detonator_fat:bytearray,chip_fat:bytearray,signature_code:bytearray,slave_num:int): self.detonator_fat=detonator_fat self.chip_fat=chip_fat self.slave_num=slave_num self.signature_code=signature_code info="已更换编码参数,现在的编码参数为:"+"雷管厂:"+detonator_fat.decode("utf-8")+",芯片厂:"+chip_fat.decode("utf-8") info+=",特征码:"+signature_code.decode("utf-8")+",通道数:"+str(slave_num) self.set_infotext(info) # 选择波特率 def combsp_init(self): self.combsp = QComboBox(self.widget) self.combsp.setObjectName(u"combsp") self.combsp.setGeometry(QRect(465, 10, 85, 25)) self.combsp.setEditable(True) self.combsp.currentIndexChanged.connect(self.com_changed) for i in cpars.uartbsp_list(): self.combsp.addItem(i) self.combsp_label = QLabel(self.widget) self.combsp_label.setObjectName(u"label") self.combsp_label.setGeometry(QRect(405, 16, 72, 15)) self.combsp_label.setText("波特率:") # 选择雷管厂家 def dot_fat_init(self): self.dot_fat = QComboBox(self.widget) self.dot_fat.setObjectName(u"dot_fat") self.dot_fat.setGeometry(QRect(645, 10, 400, 25)) self.dot_fat.setEditable(False) self.dot_fat.currentIndexChanged.connect(self.params_changed) for i in cpars.dot_fat_list(): self.dot_fat.addItem(i) self.dot_fat_label = QLabel(self.widget) self.dot_fat_label.setObjectName(u"label") self.dot_fat_label.setGeometry(QRect(570, 16, 72, 15)) self.dot_fat_label.setText("雷管厂家:") # com口 def com_init(self): self.com = QComboBox(self.widget) self.com.setObjectName(u"com") self.com.setGeometry(QRect(95, 10, 300, 25)) self.com.setEditable(True) self.com.currentIndexChanged.connect(self.com_changed) # self.com.addItem("utcp:7777") ports_list = list(serial.tools.list_ports.comports()) for comport in ports_list: # print(comport.name,comport.description) self.com.addItem(comport.name+":"+comport.description) self.com_label = QLabel(self.widget) self.com_label.setObjectName(u"label") self.com_label.setGeometry(QRect(30, 16, 72, 15)) self.com_label.setText("COM口:") # 特征码 def sig_list_init(self): self.sig_list = QComboBox(self.widget) self.sig_list.setObjectName(u"sig_list") self.sig_list.setGeometry(QRect(1285, 10, 85, 25)) self.sig_list.setEditable(False) self.sig_list.currentIndexChanged.connect(self.params_changed) for i in cpars.code_signature_code_list(): self.sig_list.addItem(i) self.sig_list_label = QLabel(self.widget) self.sig_list_label.setObjectName(u"label") self.sig_list_label.setGeometry(QRect(1220, 16, 72, 15)) self.sig_list_label.setText("特征码:") # 通道数 def channel_init(self): self.channel = QComboBox(self.widget) self.channel.setObjectName(u"channel") self.channel.setGeometry(QRect(1125, 10, 85, 25)) self.channel.setEditable(True) self.channel.currentIndexChanged.connect(self.params_changed) for i in cpars.code_channel_list(): self.channel.addItem(i) self.channel_label = QLabel(self.widget) self.channel_label.setObjectName(u"label") self.channel_label.setGeometry(QRect(1055, 16, 72, 15)) self.channel_label.setText("通道数:") # 提示信息 def infotext_init(self): self.infotext=QLabel(self.widget) self.infotext.setGeometry(QRect(30, 865, 1200, 15)) self.infotext.setText("请打开串口以开始赋码") def set_infotext(self,text:str): try: self.infotext.setText(text) except Exception as e: pass # 初始化打开端口按钮 def com_but_init(self): self.com_but = QPushButton(self.widget) self.com_but.setObjectName(u"com_but") self.com_but.setGeometry(QRect(1370, 90, 93, 28)) self.com_but.setText("打开端口") self.com_but.clicked.connect(self.com_but_clicked) # 电机上升 def moterup_init(self): self.moterup=QPushButton(self.widget) self.moterup.setObjectName(u"moteerup") self.moterup.setGeometry(QRect(1250, 90, 93, 28)) self.moterup.setText("电机上升") self.moterup.clicked.connect(self.cmd_moter_up) # 自动增加id号 def autoinc_init(self): self.autoinc=QPushButton(self.widget) self.autoinc.setObjectName(u"autoinc") self.autoinc.setGeometry(QRect(1250, 130, 93, 28)) self.autoinc.clicked.connect(self.autoinc_clicked) self.autoinc_clicked() # 开始检测 def check_but_init(self): self.check_but=QPushButton(self.widget) self.check_but.setObjectName(u"check_but") self.check_but.setGeometry(QRect(1250, 170, 93, 28)) self.check_but.clicked.connect(self.check_but_clicked) self.check_but.setText("开始检测") # 到位感应 def posend_but_init(self): self.posend_but=QPushButton(self.widget) self.posend_but.setObjectName(u"posend_but") self.posend_but.setGeometry(QRect(1250, 210, 93, 28)) self.posend_but.clicked.connect(self.posend_but_clicked) self.posend_but.setText("到位感应") # 自动测试 def autotest_but_init(self): self.autotest_but=QPushButton(self.widget) self.autotest_but.setObjectName(u"autotest_but") self.autotest_but.setGeometry(QRect(1250, 250, 93, 28)) self.autotest_but.clicked.connect(self.autotest_but_clicked) self.autotest_but.setText("开自动测试") # 急停 def stop_but_init(self): self.stop_but=QPushButton(self.widget) self.stop_but.setObjectName(u"stop_but") self.stop_but.setGeometry(QRect(1250, 290, 93, 28)) self.stop_but.clicked.connect(self.stop_but_clicked) self.stop_state=True self.stop_but_clicked() # 发送赋码指令 def code_but_init(self): self.code_but=QPushButton(self.widget) self.code_but.setObjectName(u"code_but") self.code_but.setGeometry(QRect(1250, 330, 93, 28)) self.code_but.clicked.connect(self.code_but_clicked) self.code_but.setText("赋码") # 电机下降 def moterdown_init(self): self.moterdown=QPushButton(self.widget) self.moterdown.setObjectName(u"moteerup") self.moterdown.setGeometry(QRect(1250, 370, 93, 28)) self.moterdown.setText("电机下降") self.moterdown.clicked.connect(self.cmd_moter_down) # 忽略接触异常 def openignore_init(self): self.openignore=QPushButton(self.widget) self.openignore.setObjectName(u"moteerup") self.openignore.setGeometry(QRect(1250, 410, 93, 28)) self.openignore.clicked.connect(self.openignore_clicked) self.openignore_clicked() # 初始化设备类型选择框 def device_type_init(self): self.device_type_list = QComboBox(self.widget) self.device_type_list.setObjectName(u"device_type_list") self.device_type_list.setGeometry(QRect(1250, 50, 93+120, 28)) self.device_type_list.setEditable(False) self.device_type_list.currentIndexChanged.connect(self.device_type_changed) for i in self.recv_handler_table.items(): self.device_type_list.addItem(i[0]) # self.device_type_list_label = QLabel(self.widget) # self.device_type_list_label.setObjectName(u"label") # self.device_type_list_label.setGeometry(QRect(1220, 16, 72, 15)) # self.device_type_list_label.setText("特征码:") self.device_type_changed(0) # 注码结果列表 def code_list_init(self): self.code_list=QTableWidget(self.widget) self.code_list.setObjectName(u"code_list") self.code_list.setGeometry(QRect(30, 50, 1200, 800)) self.code_list.setColumnCount(6) self.code_list.setHorizontalHeaderLabels(list(["创建时间","通道号","注码结果","管壳码","UID","密码"])) self.code_list.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch) # 添加数据 def code_list_add(self,code_line:str): stat=self.code_list_backcolor if(stat==True): bkcolor=QColor() bkcolor.setRgb(150,150,150,255) else: bkcolor=QColor() bkcolor.setRgb(255,255,255,255) code_strs=code_line.split(",") errcode=int(code_strs[2]) code_strs[2]=cpars.code_errinfo(errcode) RowCont=self.code_list.rowCount() self.code_list.insertRow(RowCont) for i in range(6): self.code_list.setItem(RowCont,i,QTableWidgetItem(code_strs[i])) self.code_list.item(RowCont,i).setBackground(bkcolor) if(errcode!=0): self.code_list.item(RowCont,2).setBackground(Qt.GlobalColor.red) # 滚动到底部 def code_list_to_bottom(self): self.code_list.scrollToBottom() def com_but_clicked(self): print("com but clicked") if(self.ser_is_open==False): item=self.com.itemText(self.com.currentIndex()) bsp=self.combsp.itemText(self.combsp.currentIndex()) com=item.split(":")[0] if(com!="utcp"): item=com+":"+bsp self.open_serial(item) else: self.close_serial() def set_port_state(self,state:bool): self.ser_is_open=state if(state==True): self.com_but.setText("关闭端口") else: self.com_but.setText("打开端口") # 修改id自增状态 def autoinc_clicked(self): if(self.autoinc_id==True): self.autoinc_id=False self.autoinc.setText("打开ID自增") self.set_infotext("已关闭ID自增") else: self.autoinc_id=True self.autoinc.setText("关闭ID自增") self.set_infotext("已打开ID自增") def openignore_clicked(self): if(self.openignore_flag==False): self.openignore_flag=True self.openignore.setText("拦截接触异常") self.set_infotext("当前为忽略接触异常,接触异常时也会跑码。") else: self.openignore_flag=False self.openignore.setText("忽略接触异常") self.set_infotext("当前为拦截接触异常,接触异常时不会跑码。") def check_but_clicked(self): print("send start check cmd.") self.cmd_user(0x90,bytearray([2])) def posend_but_clicked(self): print("send in place cmd.") self.cmd_user(0x90,bytearray([3])) def stop_but_clicked(self): print("send stop cmd.") if(self.stop_state==False): # 发送急停命令 self.cmd_user(0x90,bytearray([0])) self.stop_but.setText("运行") self.stop_state=True else: # 发送启动命令 self.cmd_user(0x90,bytearray([1])) self.stop_but.setText("急停") self.stop_state=False def autotest_but_clicked(self): if(self.autotest_is_open==0): self.autotest_but.setText("启动中...") self.autotest_is_open=2 self.autotest_but.setEnabled(False) self.start_autotest() elif(self.autotest_is_open==1): self.autotest_but.setText("关闭中...") self.autotest_is_open=3 self.autotest_but.setEnabled(False) def code_but_clicked(self): self.set_infotext("发送赋码指令") self.cmd_code(self.slave_num) # 无效化编码参数设置项 def params_active(self,power:bool): self.dot_fat.setEnabled(power) self.sig_list.setEnabled(power) self.channel.setEnabled(power) self.device_type_list.setEnabled(power) # 更换管壳码生成参数 def params_changed(self,index:int): try: dot_fat=self.dot_fat.itemText(self.dot_fat.currentIndex()).split(":")[0].encode("utf-8") chip_fat=b"AD" sig_code=self.sig_list.itemText(self.sig_list.currentIndex()).encode("utf-8") slave_num=self.channel.itemText(self.channel.currentIndex()) self.set_params(dot_fat,chip_fat,sig_code,int(slave_num)) except Exception as e: pass # 设备类型发生了改变 def device_type_changed(self,index:int): device_type=self.device_type_list.itemText(index) self.recv_handler=self.recv_handler_table[device_type][0] self.uid_length=self.recv_handler_table[device_type][1] try: for i in self.recv_handler_table: if(i!=device_type): self.recv_handler_table[i][3]() self.recv_handler_table[device_type][2]() except Exception as err: print(str(err)) self.set_infotext("设备已切换为:"+device_type) # 更换端口时关闭之前的 def com_changed(self,index:int): print("com changed") self.set_infotext("串口参数有变动,将自动关闭打开的串口") self.close_serial() # 打开通信接口 def open_serial(self,com:str): if(self.ser.init(com)==True): self.ser.recv_signal.connect(self.recv_slot) self.ser.start_recv() self.set_port_state(True) print("open serial.") self.set_infotext("打开串口成功,开始监听请求赋码指令") else: self.set_infotext("打开串口失败,是否有其他程序占用了串口?") print("open serial failed.") def close_serial(self): try: self.ser.recv_signal.disconnect(self.recv_slot) except Exception as a: pass self.ser.close() self.set_infotext("串口已关闭") self.set_port_state(False) def recv_slot(self,cmd:int,data:bytearray,err:str): # print("recv:","cmd:",hex(cmd),"data:",data.hex(' '),"errstr:",err) if(self.run_times<=0): print("run times end.") self.set_infotext("运行次数已结束,不再响应检测赋码指令") return self.run_times-=1 if (self.recv_handler!=None): self.recv_handler(cmd,data,err) # 2通道赋码设备的处理函数 def recv_deal_coder2ch(self,cmd:int,data:bytearray,err:str): ack=False if(cmd==0x8b): self.params_active(False) self.set_infotext("已接收请求赋码命令") self.cmd_code(self.slave_num) elif(cmd==0x82): self.set_infotext("已接收赋码结果") ack=self.decode_code(data) if(ack!=True): print("code failed.") self.set_infotext("赋码出现异常,本次赋码结果被丢弃,请排查异常后重试") self.params_active(True) elif(cmd==0x8c): s="设备信息:{d1},{d2}".format(d1=int(data[0]),d2=data[1:].decode("utf-8")) print(s) self.set_infotext(s) elif(cmd==0x8a): # print("recv keepliver .data:",data.hex(' ')) if(data[0]==0): # 只在正常的心跳数据做回复 self.cmd_user(0x8a,bytearray([0])) elif(cmd==0x91): # 输入通道状态 print("input:channel({d1})={d2}".format(d1=int(data[0]),d2=int(data[1]))) elif(cmd==0x92): # 输出通道状态 print("output:channel({d1})={d2}".format(d1=int(data[0]),d2=int(data[1]))) # 批检仪的处理函数 def recv_code_after_check_checker(self,cmd:int,data:bytearray,err:str): ack=False if(cmd==0x81): self.set_infotext("已接收检测结果") ack=self.decode_check(data) ack=True if(ack==True): self.cmd_code(self.slave_num) else: print("check failed.") self.set_infotext("检测出现异常,未进行赋码,请排查异常后重试") self.cmd_moter_up() self.params_active(True) elif(cmd==0x82): self.set_infotext("已接收赋码结果") ack=self.decode_code(data) if(ack!=True): print("code failed.") self.set_infotext("赋码出现异常,本次赋码结果被丢弃,请排查异常后重试") self.cmd_moter_up() self.params_active(True) elif(cmd==0x37): print("key press.") self.params_active(False) self.set_infotext("已接收请求检测命令") self.cmd_moter_down() time.sleep(2) self.cmd_check(self.slave_num) # 批检仪检测时赋码处理函数 def recv_code_with_check_checker(self,cmd:int,data:bytearray,err:str): ack=False if(cmd==0x82): self.set_infotext("已接收赋码结果") ack=self.decode_code(data) if(ack!=True): print("code failed.") self.set_infotext("赋码出现异常,本次赋码结果被丢弃,请排查异常后重试") self.cmd_moter_up() self.params_active(True) elif(cmd==0x37): print("key press.") self.params_active(False) self.set_infotext("已接收请求赋码命令") self.cmd_moter_down() time.sleep(2) self.cmd_code(self.slave_num) # 初始化处理函数列表 def recv_handler_table_init(self): self.recv_handler_table={"批检仪检测后赋码":(self.recv_code_after_check_checker,16,self.change_enter_checker,self.change_exit_checker), "批检仪检测时赋码":(self.recv_code_with_check_checker,15,self.change_enter_checker,self.change_exit_checker), "赋码控制器":(self.recv_deal_coder2ch,15,self.change_enter_coder2ch,self.change_exit_coder2ch)} # 切换设备类型时进行的额外处理 def change_enter_coder2ch(self): self.check_but.setEnabled(True) self.posend_but.setEnabled(True) self.autotest_but.setEnabled(True) self.stop_but.setEnabled(True) def change_exit_coder2ch(self): self.check_but.setEnabled(False) self.posend_but.setEnabled(False) self.autotest_but.setEnabled(False) self.stop_but.setEnabled(False) def change_enter_checker(self): self.moterup.setEnabled(True) def change_exit_checker(self): self.moterup.setEnabled(False) def run(self,times:int): self.run_times=times*4 self.widget.show() ret=self.app.exec() self.close_serial() sys.exit(ret) # 生成管壳码 def calc_shell_code(self,id:int): shell=bytearray() shell+=self.detonator_fat shell+=self.date shell+=self.signature_code shell+="{d:05d}".format(d=id).encode("utf-8") return shell # 生成n通道的注码数据 def creat_code_data(self,id:int,num:int): d=bytearray() d.append(num&0xff) d+=self.year for i in range(num): d.append(i&0xff) d+=self.calc_shell_code(id+i) return d # 发送检测命令 def cmd_check(self,num:int): d=bytearray() d.append(num&0xff) d.append(0) self.ser.send(1,d) print("check start.") # 电机上升 def cmd_moter_up(self): d=bytearray() d.append(0x02) try: self.ser.send(0x40,d) except Exception as e: self.set_infotext("发送命令失败,是否没有打开串口?") print("moter up.") # 电机下降 def cmd_moter_down(self): d=bytearray() d.append(0x03) try: self.ser.send(0x40,d) except Exception as e: self.set_infotext("发送命令失败,是否没有打开串口?") print("moter down.") # 发送赋码指令 def cmd_code(self,num:int): try: self.ser.send(2,self.creat_code_data(self.code_id,num)) except Exception as e: self.set_infotext("发送命令失败,是否没有打开串口?") print("code start.") # 发送任意命令 def cmd_user(self,cmd:int,data:bytearray): try: self.ser.send(cmd,data) except Exception as e: self.set_infotext("发送命令失败,是否没有打开串口?") # 解析检测结果 def decode_check(self,data:bytearray): num=data[0] ack=True acks_list=[] for i in range(num): ack_i=data[6*i+3] str_start="" if(ack_i!=0): # 接触异常则忽略 if(ack_i!=3): ack=False elif(self.openignore_flag==False): ack=False str_start=STR_RED print(str_start+"addr:",int(data[6*i+2]),"ack:",ack_i,STR_END) acks_list.append(str(ack_i)) try: self.ch_dlog.close() self.co_dlog.close() except Exception as e: pass if(ack!=True): self.ch_dlog=chk.check_dlog(self.widget,self.slave_num,"检测结果查看") self.ch_dlog.ack_list_init(acks_list) self.ch_dlog.show() return ack # 解析注码结果 def decode_code(self,data:bytearray): num=data[0] ack=True code_list=[] # 生成注码结果数据 for i in range(num): item_len=13+self.uid_length+8+2 ack_i=data[item_len*i+2] d=data[item_len*i+3:item_len*i+3+item_len-1] try: shell_code=d[:13].decode("utf-8") uid_code=d[13:13+self.uid_length].decode("utf-8") psw_code=d[13+self.uid_length:13+self.uid_length+8].decode("utf-8") except Exception as e: shell_code="unknown" uid_code="unknown" psw_code="unknown" # print("psw:",d.decode("utf-8")) code_list.append(((i+1),ack_i,shell_code,uid_code,psw_code)) # 统计错误信息 length=len(code_list) if(len(self.code_list_lock)==0): # 第一次失败,保存失败数据 self.code_list_lock=code_list else: # 不是第一次失败,比对失败数据,把成功过的统计出来 if (len(self.code_list_lock)>length): self.code_list_lock=self.code_list_lock[:length] for i in range(length): # 长度不足的地方填充为本次的结果 if(i>=len(self.code_list_lock)): self.code_list_lock.append(code_list[i]) if(self.code_list_lock[i][0]==code_list[i][0]): if(code_list[i][1]==0): self.code_list_lock[i]=code_list[i] else: if(self.code_list_lock[i][1]!=0): self.code_list_lock[i]=code_list[i] # 校验注码结果 acks_list=[] for i in self.code_list_lock: str_start="" ack_i=i[1] if(ack_i!=0): # 接触异常则忽略 if((ack_i!=3) and (ack_i!=200)): ack=False elif(self.openignore_flag==False): ack=False str_start=STR_RED print(str_start+"addr:",i[0],"ack:",ack_i,STR_END) print("\t","shell_code:",i[2]) print("\t","uid_code:",i[3]) print("\t","psw_code:",i[4]) acks_list.append(str(ack_i)) if(ack==True): self.code_list_backcolor=not self.code_list_backcolor s="" time=get_time() for i in self.code_list_lock: s=time+","+str(i[0])+','+str(i[1])+','+i[2]+','+i[3]+','+i[4] self.code_list_add(s) with open(self.save_file,"+a") as f: f.write(s+'\n') self.code_list_to_bottom() print("save to file:",self.save_file) # 全部成功,清除错误记录 self.code_list_lock.clear() if(self.autoinc_id==True): self.code_id+=self.slave_num try: self.co_dlog.close() except Exception as e: pass if(ack!=True): self.co_dlog=chk.check_dlog(self.widget,self.slave_num,"注码结果查看") self.co_dlog.ack_list_init(acks_list) self.co_dlog.show() return ack def start_autotest(self): self.thread_ = threading.Thread(target=self.autotest, args=()) self.thread_.start() def autotest(self): self.autotest_but.setText("关自动测试") self.autotest_is_open=1 self.autotest_but.setEnabled(True) while(self.autotest_is_open==1): if(self.ser_is_open!=True): print("ser was closed,stop autotest.") break self.check_but_clicked() time.sleep(0.3) self.posend_but_clicked() time.sleep(11.3) self.autotest_but.setText("开自动测试") self.autotest_is_open=0 self.autotest_but.setEnabled(True) if __name__ == "__main__": c=coder() c.init(b'00',b'AD',b'A',int(cpars.code_channel_list()[0])) c.run(10000)