diff --git a/ReadMe.txt b/ReadMe.txt index 146d2c1..d3a3dbf 100644 --- a/ReadMe.txt +++ b/ReadMe.txt @@ -47,3 +47,8 @@ 实现对检测数据的分析,自动填充上下限,一键导出方案 2023.9.18 实现根据方案自动从数据库下载检测数据并分析 +2023.9.20 + updata添加dhcp功能,cfg.json 视为配置文件 + updata添加串口控制台功能,配合U盘,自动把文件复制到设备中 + + diff --git a/coder_2ch/coder_main.py b/coder_2ch/coder_main.py index a2d7eeb..c69706e 100644 --- a/coder_2ch/coder_main.py +++ b/coder_2ch/coder_main.py @@ -552,7 +552,7 @@ class coder(QObject): except Exception as e: pass if(ack!=True): - self.ch_dlog=chk.check_dlog(self.widget,self.slave_num) + self.ch_dlog=chk.check_dlog(self.widget,self.slave_num,"检测结果查看") self.ch_dlog.ack_list_init(acks_list) self.ch_dlog.show() return ack diff --git a/updata/console_uart.py b/updata/console_uart.py new file mode 100644 index 0000000..857b763 --- /dev/null +++ b/updata/console_uart.py @@ -0,0 +1,186 @@ +import sys +import os +import shutil +from PyQt5.QtCore import * +from PyQt5.QtGui import * +from PyQt5.QtWidgets import * +import threading +import serial +import serial.tools.list_ports +import time + + + + + + + + + +class console_dlg(QObject): + + recv_str_signal = pyqtSignal([str]) + def __init__(self,father:QDialog,title:str): + QObject.__init__(self) + self.ser=None + self.cmd_list=[] + self.w=QDialog(father) + self.w.resize(1200,500) + self.w.setWindowTitle(title) + self.w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose) + self.w.setWindowModality(Qt.WindowModality.ApplicationModal) + self.com_init() + self.text_init() + self.com_but_init() + self.runcmd_but_init() + def text_init(self): + self.console_text = QTextBrowser(self.w) + self.console_text.setObjectName(u"str_list") + self.console_text.setGeometry(QRect(20, 70, 1150, 430)) + self.console_text.setFrameShape(QFrame.Shape.Box) + self.console_text.setMidLineWidth(1) + self.console_text.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) + def com_init(self): + self.com = QComboBox(self.w) + self.com.setObjectName(u"com") + self.com.setGeometry(QRect(85, 10, 300, 25)) + self.com.setEditable(True) + ports_list = list(serial.tools.list_ports.comports()) + for comport in ports_list: + # print(comport.name,comport.description) + self.com.addItem(comport.name+":"+comport.description) + self.com.currentIndexChanged.connect(self.com_changed) + self.com_label = QLabel(self.w) + self.com_label.setObjectName(u"label") + self.com_label.setGeometry(QRect(30, 16, 72, 15)) + self.com_label.setText("COM口:") + # 初始化打开端口按钮 + def com_but_init(self): + self.com_but = QPushButton(self.w) + self.com_but.setObjectName(u"com_but") + self.com_but.setGeometry(QRect(590, 10, 93, 28)) + self.com_but.setText("打开端口") + self.com_but.clicked.connect(self.com_but_clicked) + # 初始化执行命令按钮 + def runcmd_but_init(self): + self.runcmd_but = QPushButton(self.w) + self.runcmd_but.setObjectName(u"runcmd_but") + self.runcmd_but.setGeometry(QRect(700, 10, 93, 28)) + self.runcmd_but.setText("执行命令") + self.runcmd_but.clicked.connect(self.start_send_cmds) + def com_changed(self,index:int): + print("com changed") + if(self.ser!=None): + self.ser.close() + def com_but_clicked(self): + print("com but clicked") + item=self.com.itemText(self.com.currentIndex()) + com=item.split(":")[0] + if(self.ser==None): + try: + bsp=int(115200) + self.ser = serial.Serial(port=com, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE,timeout=None) + print(str(self.ser)) + t = threading.Thread(target=self._recv, args=()) + t.start() + except Exception as e: + print("err:",str(e)) + self.ser=None + else: + self.ser.close() + def _recv(self): + self.com_but.setText("关闭端口") + self.recv_str_signal.connect(self.item_append) + while(self.ser!=None): + try: + data=self.ser.readline() + except Exception as e: + print("readline err:",str(e)) + break + try: + self.recv_str_signal.emit(data.decode("utf-8").strip()) + except Exception as e: + pass + self.ser=None + self.recv_str_signal.disconnect(self.item_append) + try: + self.com_but.setText("打开端口") + except Exception as e: + pass + def send_str(self,text:str): + if(self.ser!=None): + try: + self.ser.write(text.encode("utf-8")+b"\r\n") + except Exception as e: + print(str(e)) + def item_append(self,text:str): + index,ack=self.type_of_str(text) + if(index==0): + print("linux started.") + self.start_send_cmds() + elif(index==1): + print("app started.") + if(index!=-1): + txt="" +text+ " " + else: + txt=text + try: + self.console_text.append(txt) + self.console_text.moveCursor(QTextCursor.MoveOperation.End) + except Exception as e: + print(str(e)) + if((index==2) or (index==3))and (ack==True): + # 准备好接收下一个指令 + self.send_cmdlist() + # 查找字幕类型,如果完全相等则返回True + def type_of_str(self,text:str): + str_list=["ATK-MP157 login: root","Booting fw image checker_m4.axf","root@ATK-MP157:~#","root@ATK-MP157:/run/media/sda1/updata#","sda: sda1"] + for i in range(len(str_list)): + if(text.find(str_list[i])!=-1): + if(str_list[i]==text): + return i,True + else: + return i,False + return -1,False + def _item_append_green_str(self,text:str): + txt="" +text+ " " + try: + self.console_text.append(txt) + self.console_text.moveCursor(QTextCursor.MoveOperation.End) + except Exception as e: + print(str(e)) + def start_send_cmds(self): + self._item_append_green_str("开始发送命令行,请不要关闭窗口。") + self.cmd_list.append("cd /run/media/sda1/updata") + self.cmd_list.append("systemctl stop atk-qtapp-start.service") + self.cmd_list.append("mkdir /home/root/config") + self.cmd_list.append("cp checker_host.elf /usr/local/QDesktop-fb") + self.cmd_list.append("chmod 777 /usr/local/QDesktop-fb") + self.cmd_list.append("cp checker_slave_app_can.bin /home/root/config/checker_slave.bin") + self.cmd_list.append("cp cfg.json /home/root/config/cfg.json") + self.cmd_list.append("cp checker_m4.axf /lib/firmware/checker_m4.axf") + self.cmd_list.append("cp stm32mp157d-atk.dtb /boot/stm32mp157d-atk.dtb") + self.cmd_list.append("sync") + self.cmd_list.append("systemctl restart atk-qtapp-start.service") + self.send_cmdlist() + def send_cmdlist(self): + if(len(self.cmd_list)>0): + self._item_append_green_str("发送下一个命令,还剩 "+str(len(self.cmd_list))) + self.send_str(self.cmd_list[0]) + self.cmd_list.pop(0) + else: + self._item_append_green_str("命令行发送完成。") + def show(self): + self.w.exec() + if(self.ser!=None): + self.ser.close() + + + + + + + + + diff --git a/updata/dhcp/dhcp.py b/updata/dhcp/dhcp.py new file mode 100644 index 0000000..9361598 --- /dev/null +++ b/updata/dhcp/dhcp.py @@ -0,0 +1,595 @@ +#!/usr/bin/python3 +import time +import threading +import struct +import queue +import collections +import traceback +import random +import socket +import os +import sys + +from PyQt5.QtCore import * +from PyQt5.QtGui import * +from PyQt5.QtWidgets import * + +from dhcp.listener import * + +def get_host_ip_addresses(): + return gethostbyname_ex(gethostname())[2] + + +class WriteBootProtocolPacket(object): + + message_type = 2 # 1 for client -> server 2 for server -> client + hardware_type = 1 + hardware_address_length = 6 + hops = 0 + + transaction_id = None + + seconds_elapsed = 0 + bootp_flags = 0 # unicast + + client_ip_address = '0.0.0.0' + your_ip_address = '0.0.0.0' + next_server_ip_address = '0.0.0.0' + relay_agent_ip_address = '0.0.0.0' + + client_mac_address = None + magic_cookie = '99.130.83.99' + + parameter_order = [] + + def __init__(self, configuration): + for i in range(256): + names = ['option_{}'.format(i)] + if i < len(options) and hasattr(configuration, options[i][0]): + names.append(options[i][0]) + for name in names: + if hasattr(configuration, name): + setattr(self, name, getattr(configuration, name)) + + def to_bytes(self): + result = bytearray(236) + + result[0] = self.message_type + result[1] = self.hardware_type + result[2] = self.hardware_address_length + result[3] = self.hops + + result[4:8] = struct.pack('>I', self.transaction_id) + + result[ 8:10] = shortpack(self.seconds_elapsed) + result[10:12] = shortpack(self.bootp_flags) + + result[12:16] = inet_aton(self.client_ip_address) + result[16:20] = inet_aton(self.your_ip_address) + result[20:24] = inet_aton(self.next_server_ip_address) + result[24:28] = inet_aton(self.relay_agent_ip_address) + + result[28:28 + self.hardware_address_length] = macpack(self.client_mac_address) + + result += inet_aton(self.magic_cookie) + + if self.options: + for option in self.options: + value = self.get_option(option) + #print(option, value) + if value is None: + continue + result += bytes([option, len(value)]) + value + result += bytes([255]) + return bytes(result) + + def get_option(self, option): + if option < len(options) and hasattr(self, options[option][0]): + value = getattr(self, options[option][0]) + elif hasattr(self, 'option_{}'.format(option)): + value = getattr(self, 'option_{}'.format(option)) + else: + return None + function = options[option][2] + if function and value is not None: + value = function(value) + return value + + @property + def options(self): + done = list() + # fulfill wishes + if self.parameter_order: + for option in self.parameter_order: + if option < len(options) and hasattr(self, options[option][0]) or hasattr(self, 'option_{}'.format(option)): + # this may break with the specification because we must try to fulfill the wishes + if option not in done: + done.append(option) + # add my stuff + for option, o in enumerate(options): + if o[0] and hasattr(self, o[0]): + if option not in done: + done.append(option) + for option in range(256): + if hasattr(self, 'option_{}'.format(option)): + if option not in done: + done.append(option) + return done + + def __str__(self): + return str(ReadBootProtocolPacket(self.to_bytes())) + +class DelayWorker(object): + + def __init__(self): + self.closed = False + self.queue = queue.Queue() + self.thread = threading.Thread(target = self._delay_response_thread) + self.thread.start() + + def _delay_response_thread(self): + while not self.closed: + if self.closed: + break + try: + p = self.queue.get(timeout=1) + t, func, args, kw = p + now = time.time() + if now < t: + time.sleep(0.01) + self.queue.put(p) + else: + func(*args, **kw) + except queue.Empty: + continue + + def do_after(self, seconds, func, args = (), kw = {}): + self.queue.put((time.time() + seconds, func, args, kw)) + + def close(self): + self.closed = True + +class Transaction(object): + + def __init__(self, server): + self.server = server + self.configuration = server.configuration + self.packets = [] + self.done_time = time.time() + self.configuration.length_of_transaction + self.done = False + self.do_after = self.server.delay_worker.do_after + + def is_done(self): + return self.done or self.done_time < time.time() + + def close(self): + self.done = True + + def receive(self, packet): + # packet from client <-> packet.message_type == 1 + if packet.message_type == 1 and packet.dhcp_message_type == 'DHCPDISCOVER': + self.do_after(self.configuration.dhcp_offer_after_seconds, + self.received_dhcp_discover, (packet,), ) + elif packet.message_type == 1 and packet.dhcp_message_type == 'DHCPREQUEST': + self.do_after(self.configuration.dhcp_acknowledge_after_seconds, + self.received_dhcp_request, (packet,), ) + elif packet.message_type == 1 and packet.dhcp_message_type == 'DHCPINFORM': + self.received_dhcp_inform(packet) + else: + return False + return True + + def received_dhcp_discover(self, discovery): + if self.is_done(): return + self.configuration.debug('discover:\n {}'.format(str(discovery).replace('\n', '\n\t'))) + self.send_offer(discovery) + + def send_offer(self, discovery): + # https://tools.ietf.org/html/rfc2131 + offer = WriteBootProtocolPacket(self.configuration) + offer.parameter_order = discovery.parameter_request_list + mac = discovery.client_mac_address + ip = offer.your_ip_address = self.server.get_ip_address(discovery) + # offer.client_ip_address = + offer.transaction_id = discovery.transaction_id + # offer.next_server_ip_address = + offer.relay_agent_ip_address = discovery.relay_agent_ip_address + offer.client_mac_address = mac + offer.client_ip_address = discovery.client_ip_address or '0.0.0.0' + offer.bootp_flags = discovery.bootp_flags + offer.dhcp_message_type = 'DHCPOFFER' + offer.client_identifier = mac + self.server.broadcast(offer) + + def received_dhcp_request(self, request): + if self.is_done(): return + self.server.client_has_chosen(request) + self.acknowledge(request) + self.close() + + def acknowledge(self, request): + ack = WriteBootProtocolPacket(self.configuration) + ack.parameter_order = request.parameter_request_list + ack.transaction_id = request.transaction_id + # ack.next_server_ip_address = + ack.bootp_flags = request.bootp_flags + ack.relay_agent_ip_address = request.relay_agent_ip_address + mac = request.client_mac_address + ack.client_mac_address = mac + requested_ip_address = request.requested_ip_address + ack.client_ip_address = request.client_ip_address or '0.0.0.0' + ack.your_ip_address = self.server.get_ip_address(request) + ack.dhcp_message_type = 'DHCPACK' + self.server.broadcast(ack) + + def received_dhcp_inform(self, inform): + self.close() + self.server.client_has_chosen(inform) + +class DHCPServerConfiguration(object): + + dhcp_offer_after_seconds = 10 + dhcp_acknowledge_after_seconds = 10 + length_of_transaction = 40 + + bind_address = '' + network = '192.168.173.0' + broadcast_address = '255.255.255.255' + subnet_mask = '255.255.255.0' + router = None # list of ips + # 1 day is 86400 + ip_address_lease_time = 300 # seconds + domain_name_server = None # list of ips + + host_file = 'hosts.csv' + + debug = lambda *args, **kw: None + + def load(self, file): + with open(file) as f: + exec(f.read(), self.__dict__) + + def adjust_if_this_computer_is_a_router(self): + ip_addresses = get_host_ip_addresses() + for ip in reversed(ip_addresses): + if ip.split('.')[-1] == '1': + self.router = [ip] + self.domain_name_server = [ip] + self.network = '.'.join(ip.split('.')[:-1] + ['0']) + self.broadcast_address = '.'.join(ip.split('.')[:-1] + ['255']) + #self.ip_forwarding_enabled = True + #self.non_local_source_routing_enabled = True + #self.perform_mask_discovery = True + + def all_ip_addresses(self): + ips = ip_addresses(self.network, self.subnet_mask) + for i in range(5): + next(ips) + return ips + + def network_filter(self): + return NETWORK(self.network, self.subnet_mask) + +def ip_addresses(network, subnet_mask): + import socket, struct + subnet_mask = struct.unpack('>I', socket.inet_aton(subnet_mask))[0] + network = struct.unpack('>I', socket.inet_aton(network))[0] + network = network & subnet_mask + start = network + 1 + end = (network | (~subnet_mask & 0xffffffff)) + return (socket.inet_ntoa(struct.pack('>I', i)) for i in range(start, end)) + +class ALL(object): + def __eq__(self, other): + return True + def __repr__(self): + return self.__class__.__name__ +ALL = ALL() + +class GREATER(object): + def __init__(self, value): + self.value = value + def __eq__(self, other): + return type(self.value)(other) > self.value + +class NETWORK(object): + def __init__(self, network, subnet_mask): + self.subnet_mask = struct.unpack('>I', inet_aton(subnet_mask))[0] + self.network = struct.unpack('>I', inet_aton(network))[0] + def __eq__(self, other): + ip = struct.unpack('>I', inet_aton(other))[0] + return ip & self.subnet_mask == self.network and \ + ip - self.network and \ + ip - self.network != ~self.subnet_mask & 0xffffffff + +class CASEINSENSITIVE(object): + def __init__(self, s): + self.s = s.lower() + def __eq__(self, other): + return self.s == other.lower() + +class CSVDatabase(object): + + delimiter = ';' + + def __init__(self, file_name): + self.file_name = file_name + self.file('a').close() # create file + + def file(self, mode = 'r'): + return open(self.file_name, mode) + + def get(self, pattern): + pattern = list(pattern) + return [line for line in self.all() if pattern == line] + + def add(self, line): + with self.file('a') as f: + f.write(self.delimiter.join(line) + '\n') + + def delete(self, pattern): + lines = self.all() + lines_to_delete = self.get(pattern) + self.file('w').close() # empty file + for line in lines: + if line not in lines_to_delete: + self.add(line) + + def all(self): + with self.file() as f: + return [list(line.strip().split(self.delimiter)) for line in f] + +class Host(object): + + def __init__(self, mac, ip, hostname, last_used): + self.mac = mac.upper() + self.ip = ip + self.hostname = hostname + self.last_used = int(last_used) + + @classmethod + def from_tuple(cls, line): + mac, ip, hostname, last_used = line + last_used = int(last_used) + return cls(mac, ip, hostname, last_used) + + @classmethod + def from_packet(cls, packet): + return cls(packet.client_mac_address, + packet.requested_ip_address or packet.client_ip_address, + packet.host_name or '', + int(time.time())) + + @staticmethod + def get_pattern(mac = ALL, ip = ALL, hostname = ALL, last_used = ALL): + return [mac, ip, hostname, last_used] + + def to_tuple(self): + return [self.mac, self.ip, self.hostname, str(int(self.last_used))] + + def to_pattern(self): + return self.get_pattern(ip = self.ip, mac = self.mac) + + def __hash__(self): + return hash(self.key) + + def __eq__(self, other): + return self.to_tuple() == other.to_tuple() + + def has_valid_ip(self): + return self.ip and self.ip != '0.0.0.0' + + +class HostDatabase(object): + def __init__(self, file_name): + self.db = CSVDatabase(file_name) + + def get(self, **kw): + pattern = Host.get_pattern(**kw) + return list(map(Host.from_tuple, self.db.get(pattern))) + + def add(self, host): + self.db.add(host.to_tuple()) + + def delete(self, host = None, **kw): + if host is None: + pattern = Host.get_pattern(**kw) + else: + pattern = host.to_pattern() + self.db.delete(pattern) + + def all(self): + return list(map(Host.from_tuple, self.db.all())) + + def replace(self, host): + self.delete(host) + self.add(host) + +def sorted_hosts(hosts): + hosts = list(hosts) + hosts.sort(key = lambda host: (host.hostname.lower(), host.mac.lower(), host.ip.lower())) + return hosts + +class DHCPServer(QObject): + + # 分配新的ip地址[MAC,IP,HOST_NAME] + new_ip_addr_signal = pyqtSignal([str,str,str]) + + # 服务已开启 + server_start_signal = pyqtSignal([]) + + # 服务已结束 + server_end_signal = pyqtSignal([]) + + def __init__(self, configuration = None): + QObject.__init__(self) + if configuration == None: + configuration = DHCPServerConfiguration() + self.configuration = configuration + self.socket = socket(type = SOCK_DGRAM) + self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + self.socket.bind((self.configuration.bind_address, 67)) + self.delay_worker = DelayWorker() + self.closed = False + self.transactions = collections.defaultdict(lambda: Transaction(self)) # id: transaction + self.hosts = HostDatabase(self.configuration.host_file) + self.time_started = time.time() + + def close(self): + self.socket.close() + self.closed = True + self.delay_worker.close() + for transaction in list(self.transactions.values()): + transaction.close() + + def update(self, timeout = 0): + try: + reads = select.select([self.socket], [], [], timeout)[0] + except ValueError: + # ValueError: file descriptor cannot be a negative integer (-1) + return + for socket in reads: + try: + packet = ReadBootProtocolPacket(*socket.recvfrom(4096)) + except OSError: + # OSError: [WinError 10038] An operation was attempted on something that is not a socket + pass + else: + self.received(packet) + for transaction_id, transaction in list(self.transactions.items()): + if transaction.is_done(): + transaction.close() + self.transactions.pop(transaction_id) + + def received(self, packet): + if not self.transactions[packet.transaction_id].receive(packet): + self.configuration.debug('received:\n {}'.format(str(packet).replace('\n', '\n\t'))) + + def client_has_chosen(self, packet): + self.configuration.debug('client_has_chosen:\n {}'.format(str(packet).replace('\n', '\n\t'))) + host = Host.from_packet(packet) + if not host.has_valid_ip(): + return + client=host.to_tuple() + print("client:",client) + self.new_ip_addr_signal.emit(client[0],client[1],client[2]) + self.hosts.replace(host) + + def is_valid_client_address(self, address): + if address is None: + return False + a = address.split('.') + s = self.configuration.subnet_mask.split('.') + n = self.configuration.network.split('.') + return all(s[i] == '0' or a[i] == n[i] for i in range(4)) + + def get_ip_address(self, packet): + mac_address = packet.client_mac_address + requested_ip_address = packet.requested_ip_address + known_hosts = self.hosts.get(mac = CASEINSENSITIVE(mac_address)) + assigned_addresses = set(host.ip for host in self.hosts.get()) + ip = None + if known_hosts: + # 1. choose known ip address + for host in known_hosts: + if self.is_valid_client_address(host.ip): + ip = host.ip + print('known ip:', ip) + if ip is None and self.is_valid_client_address(requested_ip_address) and ip not in assigned_addresses: + # 2. choose valid requested ip address + ip = requested_ip_address + print('valid ip:', ip) + if ip is None: + # 3. choose new, free ip address + chosen = False + network_hosts = self.hosts.get(ip = self.configuration.network_filter()) + for ip in self.configuration.all_ip_addresses(): + if not any(host.ip == ip for host in network_hosts): + chosen = True + break + if not chosen: + # 4. reuse old valid ip address + network_hosts.sort(key = lambda host: host.last_used) + ip = network_hosts[0].ip + assert self.is_valid_client_address(ip) + print('new ip:', ip) + if not any([host.ip == ip for host in known_hosts]): + print('add', mac_address, ip, packet.host_name) + self.hosts.replace(Host(mac_address, ip, packet.host_name or '', time.time())) + return ip + + @property + def server_identifiers(self): + return get_host_ip_addresses() + + def broadcast(self, packet): + self.configuration.debug('broadcasting:\n {}'.format(str(packet).replace('\n', '\n\t'))) + for addr in self.server_identifiers: + broadcast_socket = socket(type = SOCK_DGRAM) + broadcast_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + broadcast_socket.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) + packet.server_identifier = addr + broadcast_socket.bind((addr, 67)) + try: + data = packet.to_bytes() + broadcast_socket.sendto(data, ('255.255.255.255', 68)) + broadcast_socket.sendto(data, (addr, 68)) + finally: + broadcast_socket.close() + + def run(self): + self.server_start_signal.emit() + while not self.closed: + try: + self.update(1) + except KeyboardInterrupt: + break + except: + traceback.print_exc() + self.server_end_signal.emit() + + def run_in_thread(self): + thread = threading.Thread(target = self.run) + thread.start() + return thread + + def debug_clients(self): + for line in self.ips.all(): + line = '\t'.join(line) + if line: + self.configuration.debug(line) + + def get_all_hosts(self): + return sorted_hosts(self.hosts.get()) + + def get_current_hosts(self): + return sorted_hosts(self.hosts.get(last_used = GREATER(self.time_started))) + + + + +def creat_dhcp_server(): + configuration = DHCPServerConfiguration() + configuration.debug = print + configuration.adjust_if_this_computer_is_a_router() + configuration.router #+= ['192.168.0.1'] + configuration.ip_address_lease_time = 60 + configuration.load(os.path.join(os.path.dirname(sys.argv[0]), 'dhcpgui.conf')) + server = DHCPServer(configuration) + for ip in server.configuration.all_ip_addresses(): + assert ip == server.configuration.network_filter() + return server + + + +if __name__ == '__main__': + configuration = DHCPServerConfiguration() + configuration.debug = print + configuration.adjust_if_this_computer_is_a_router() + configuration.router #+= ['192.168.0.1'] + configuration.ip_address_lease_time = 60 + configuration.load(os.path.join(os.path.dirname(sys.argv[0]), 'dhcpgui.conf')) + server = DHCPServer(configuration) + for ip in server.configuration.all_ip_addresses(): + assert ip == server.configuration.network_filter() + server.run() diff --git a/updata/dhcp/listener.py b/updata/dhcp/listener.py new file mode 100644 index 0000000..523adfd --- /dev/null +++ b/updata/dhcp/listener.py @@ -0,0 +1,259 @@ +#!/usr/bin/python3 +from socket import * + +import struct +import base64 +import select + +# see https://en.wikipedia.org/wiki/Dynamic_Host_Configuration_Protocol +# section DHCP options + +def inet_ntoaX(data): + return ['.'.join(map(str, data[i:i + 4])) for i in range(0, len(data), 4)] + +def inet_atonX(ips): + return b''.join(map(inet_aton, ips)) + +dhcp_message_types = { + 1 : 'DHCPDISCOVER', + 2 : 'DHCPOFFER', + 3 : 'DHCPREQUEST', + 4 : 'DHCPDECLINE', + 5 : 'DHCPACK', + 6 : 'DHCPNAK', + 7 : 'DHCPRELEASE', + 8 : 'DHCPINFORM', +} +reversed_dhcp_message_types = dict() +for i, v in dhcp_message_types.items(): + reversed_dhcp_message_types[v] = i + +shortunpack = lambda data: (data[0] << 8) + data[1] +shortpack = lambda i: bytes([i >> 8, i & 255]) + + +def macunpack(data): + s = base64.b16encode(data) + return ':'.join([s[i:i+2].decode('ascii') for i in range(0, 12, 2)]) + +def macpack(mac): + return base64.b16decode(mac.replace(':', '').replace('-', '').encode('ascii')) + +def unpackbool(data): + return data[0] + +def packbool(bool): + return bytes([bool]) + +options = [ +# RFC1497 vendor extensions + ('pad', None, None), + ('subnet_mask', inet_ntoa, inet_aton), + ('time_offset', None, None), + ('router', inet_ntoaX, inet_atonX), + ('time_server', inet_ntoaX, inet_atonX), + ('name_server', inet_ntoaX, inet_atonX), + ('domain_name_server', inet_ntoaX, inet_atonX), + ('log_server', inet_ntoaX, inet_atonX), + ('cookie_server', inet_ntoaX, inet_atonX), + ('lpr_server', inet_ntoaX, inet_atonX), + ('impress_server', inet_ntoaX, inet_atonX), + ('resource_location_server', inet_ntoaX, inet_atonX), + ('host_name', lambda d: d.decode('ASCII'), lambda d: d.encode('ASCII')), + ('boot_file_size', None, None), + ('merit_dump_file', None, None), + ('domain_name', None, None), + ('swap_server', inet_ntoa, inet_aton), + ('root_path', None, None), + ('extensions_path', None, None), +# IP Layer Parameters per Host + ('ip_forwarding_enabled', unpackbool, packbool), + ('non_local_source_routing_enabled', unpackbool, packbool), + ('policy_filer', None, None), + ('maximum_datagram_reassembly_size', shortunpack, shortpack), + ('default_ip_time_to_live', lambda data: data[0], lambda i: bytes([i])), + ('path_mtu_aging_timeout', None, None), + ('path_mtu_plateau_table', None, None), +# IP Layer Parameters per Interface + ('interface_mtu', None, None), + ('all_subnets_are_local', unpackbool, packbool), + ('broadcast_address', inet_ntoa, inet_aton), + ('perform_mask_discovery', unpackbool, packbool), + ('mask_supplier', None, None), + ('perform_router_discovery', None, None), + ('router_solicitation_address', inet_ntoa, inet_aton), + ('static_route', None, None), +# Link Layer Parameters per Interface + ('trailer_encapsulation_option', None, None), + ('arp_cache_timeout', None, None), + ('ethernet_encapsulation', None, None), +# TCP Parameters + ('tcp_default_ttl', None, None), + ('tcp_keep_alive_interval', None, None), + ('tcp_keep_alive_garbage', None, None), +# Application and Service Parameters Part 1 + ('network_information_service_domain', None, None), + ('network_informtaion_servers', inet_ntoaX, inet_atonX), + ('network_time_protocol_servers', inet_ntoaX, inet_atonX), + ('vendor_specific_information', None, None), + ('netbios_over_tcp_ip_name_server', inet_ntoaX, inet_atonX), + ('netbios_over_tcp_ip_datagram_distribution_server', inet_ntoaX, inet_atonX), + ('netbios_over_tcp_ip_node_type', None, None), + ('netbios_over_tcp_ip_scope', None, None), + ('x_window_system_font_server', inet_ntoaX, inet_atonX), + ('x_window_system_display_manager', inet_ntoaX, inet_atonX), +# DHCP Extensions + ('requested_ip_address', inet_ntoa, inet_aton), + ('ip_address_lease_time', lambda d: struct.unpack('>I', d)[0], lambda i: struct.pack('>I', i)), + ('option_overload', None, None), + ('dhcp_message_type', lambda data: dhcp_message_types.get(data[0], data[0]), (lambda name: bytes([reversed_dhcp_message_types.get(name, name)]))), + ('server_identifier', inet_ntoa, inet_aton), + ('parameter_request_list', list, bytes), + ('message', None, None), + ('maximum_dhcp_message_size', shortunpack, shortpack), + ('renewal_time_value', None, None), + ('rebinding_time_value', None, None), + ('vendor_class_identifier', None, None), + ('client_identifier', macunpack, macpack), + ('tftp_server_name', None, None), + ('boot_file_name', None, None), +# Application and Service Parameters Part 2 + ('network_information_service_domain', None, None), + ('network_information_servers', inet_ntoaX, inet_atonX), + ('', None, None), + ('', None, None), + ('mobile_ip_home_agent', inet_ntoaX, inet_atonX), + ('smtp_server', inet_ntoaX, inet_atonX), + ('pop_servers', inet_ntoaX, inet_atonX), + ('nntp_server', inet_ntoaX, inet_atonX), + ('default_www_server', inet_ntoaX, inet_atonX), + ('default_finger_server', inet_ntoaX, inet_atonX), + ('default_irc_server', inet_ntoaX, inet_atonX), + ('streettalk_server', inet_ntoaX, inet_atonX), + ('stda_server', inet_ntoaX, inet_atonX), + ] + +assert options[18][0] == 'extensions_path', options[18][0] +assert options[25][0] == 'path_mtu_plateau_table', options[25][0] +assert options[33][0] == 'static_route', options[33][0] +assert options[50][0] == 'requested_ip_address', options[50][0] +assert options[64][0] == 'network_information_service_domain', options[64][0] +assert options[76][0] == 'stda_server', options[76][0] + + +class ReadBootProtocolPacket(object): + + for i, o in enumerate(options): + locals()[o[0]] = None + locals()['option_{0}'.format(i)] = None + + del i, o + + def __init__(self, data, address = ('0.0.0.0', 0)): + self.data = data + self.address = address + self.host = address[0] + self.port = address[1] + + # wireshark = wikipedia = data[...] + + self.message_type = self.OP = data[0] + self.hardware_type = self.HTYPE = data[1] + self.hardware_address_length = self.HLEN = data[2] + self.hops = self.HOPS = data[3] + + self.XID = self.transaction_id = struct.unpack('>I', data[4:8])[0] + + self.seconds_elapsed = self.SECS = shortunpack(data[8:10]) + self.bootp_flags = self.FLAGS = shortunpack(data[8:10]) + + self.client_ip_address = self.CIADDR = inet_ntoa(data[12:16]) + self.your_ip_address = self.YIADDR = inet_ntoa(data[16:20]) + self.next_server_ip_address = self.SIADDR = inet_ntoa(data[20:24]) + self.relay_agent_ip_address = self.GIADDR = inet_ntoa(data[24:28]) + + self.client_mac_address = self.CHADDR = macunpack(data[28: 28 + self.hardware_address_length]) + index = 236 + self.magic_cookie = self.magic_cookie = inet_ntoa(data[index:index + 4]); index += 4 + self.options = dict() + self.named_options = dict() + while index < len(data): + option = data[index]; index += 1 + if option == 0: + # padding + # Can be used to pad other options so that they are aligned to the word boundary; is not followed by length byte + continue + if option == 255: + # end + break + option_length = data[index]; index += 1 + option_data = data[index: index + option_length]; index += option_length + self.options[option] = option_data + if option < len(options): + option_name, function, _ = options[option] + if function: + option_data = function(option_data) + if option_name: + setattr(self, option_name, option_data) + self.named_options[option_name] = option_data + setattr(self, 'option_{}'.format(option), option_data) + + def __getitem__(self, key): + print(key, dir(self)) + return getattr(self, key, None) + + def __contains__(self, key): + return key in self.__dict__ + + @property + def formatted_named_options(self): + return "\n".join("{}:\t{}".format(name.replace('_', ' '), value) for name, value in sorted(self.named_options.items())) + + def __str__(self): + return """Message Type: {self.message_type} +client MAC address: {self.client_mac_address} +client IP address: {self.client_ip_address} +your IP address: {self.your_ip_address} +next server IP address: {self.next_server_ip_address} +{self.formatted_named_options} +""".format(self = self) + + def __gt__(self, other): + return id(self) < id(other) + +data = base64.b16decode(b'02010600f7b41ad100000000c0a800640000000000000000000000007c7a914bca6c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000638253633501053604c0a800010104ffffff000304c0a800010604c0a80001ff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'.upper()) +assert data[0] == 2 +p = ReadBootProtocolPacket(data) +assert p.message_type == 2 +assert p.hardware_type == 1 +assert p.hardware_address_length == 6 +assert p.hops == 0 +assert p.transaction_id == 4155775697 +assert p.seconds_elapsed == 0 +assert p.bootp_flags == 0 +assert p.client_ip_address == '192.168.0.100' +assert p.your_ip_address == '0.0.0.0' +assert p.next_server_ip_address == '0.0.0.0' +assert p.relay_agent_ip_address == '0.0.0.0' +assert p.client_mac_address.lower() == '7c:7a:91:4b:ca:6c' +assert p.magic_cookie == '99.130.83.99' +assert p.dhcp_message_type == 'DHCPACK' +assert p.options[53] == b'\x05' +assert p.server_identifier == '192.168.0.1' +assert p.subnet_mask == '255.255.255.0' +assert p.router == ['192.168.0.1'] +assert p.domain_name_server == ['192.168.0.1'] +str(p) + +if __name__ == '__main__': + s1 = socket(type = SOCK_DGRAM) + s1.setsockopt(SOL_IP, SO_REUSEADDR, 1) + s1.bind(('', 67)) + #s2 = socket(type = SOCK_DGRAM) + #s2.setsockopt(SOL_IP, SO_REUSEADDR, 1) + #s2.bind(('', 68)) + while 1: + reads = select.select([s1], [], [], 1)[0] + for s in reads: + packet = ReadBootProtocolPacket(*s.recvfrom(4096)) + print(packet) diff --git a/updata/dhcpgui.conf b/updata/dhcpgui.conf new file mode 100644 index 0000000..ff29d8a --- /dev/null +++ b/updata/dhcpgui.conf @@ -0,0 +1,88 @@ +# This config file is Python code. +# It is executed when starting dhcpgui.pyw. +# Most important configuration comes first. + +########### Timings ########### +# Set the time the DHCP server waits in seconds. +dhcp_offer_after_seconds = 1 +dhcp_acknowledge_after_seconds = 1 + +# This is the time in seconds after which the DHCP- +# server forgets that it communicates with a client +length_of_transaction = 40 + +########### Network ########### +# This is the network address +network = '192.168.80.0' + +# This is the subnet mask +subnet_mask = '255.255.255.0' + +# Currently there is no range configuration. +# Assigned IP addresses will range from 5 to 250. + +# This is the address of the router. +# router = None # Do not tell clients about routers. +# router = [] # Tell clients there is no router. +# router = ['192.168.137.1'] # 192.168.137.1 is a router. +router = None + +# This are the addresses of the DNS-server. +# domain_name_server = None # Do not tell clients about DNS-Servers. +# domain_name_server = [] # Tell clients there is no DNS-server. +# domain_name_server = ['192.168.137.1'] # 192.168.137.1 is a DNS-Server. +domain_name_server = None # list of ips + +# This is the time in seconds after which the client +# should have asked for a new IP address. +# 1 day is 86400 seconds. +ip_address_lease_time = 300 + +# This is the broadcast address +# If the network is 192.168.137.0 +# the broadcast address could also be 192.168.137.255 +broadcast_address = '255.255.255.255' + +# Listen to DHCP requests on this interface +# bind_address = '192.168.137.10' # Listen for requests on this specific interface +bind_address = '192.168.80.80' # Listen for requests on all interfaces + +########### Memory ########### +# This is the path to a file to the DHCP-servers memory. +# MAC, IP and host name will be stored there +# in CSV format. +# You can delete the file and the DHCP-server will +# assign different IP-addresses. +# It also contains the entries not issued by the DHCP-server. +host_file = 'hosts.csv' + +########### Options ########### +# You can set all options that a DHCP-server can serve. +# There are 253 of them. +# See the RFC 2132 for a list of options: +# https://tools.ietf.org/html/rfc2132 +# How do you set options: +# +# 1. You can set options by name. +# You can find the name in listener.py. +# Example: subnet_mask +# +# 2. You can set options by number. +# You can find the number in the RFC 2132. +# Named options are preferred over unnamed options. +# Example: option_1 +# Option 1 is the subnet mask. +# Because subnet_mask = '255.255.255.0' is in this +# file, the option_1 = '255.255.255.0' statement +# is ignored. +# +# If you see "3.17. Domain Name" you can use +# "Domain Name".lower().replace(" ", "_") +# to convert it to the variable name. +# +# Some options do not have conversion functions +# assigned to them so you need to use bytes. +# Example: domain_name = b'hello.kitty.tv' +# Again, see listener.py + + diff --git a/updata/updata.py b/updata/updata.py index c19eb18..f344d3e 100644 --- a/updata/updata.py +++ b/updata/updata.py @@ -14,8 +14,11 @@ import encodings.idna import base64 import memory_pic import ctypes -import mysql import select_list +import dhcp.dhcp as dhcp +import mysql +import console_uart + ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID("myappid") STR_RED="\033[1;31m" @@ -103,15 +106,19 @@ class updata_dlg(QObject): QObject.__init__(self) self.app = QApplication(sys.argv) self.widget = QWidget() - self.widget.resize(703, 409) + self.widget.resize(820, 410) self.widget.setWindowTitle("批检仪程序升级") self.widget.setWindowFlags(Qt.WindowType.WindowStaysOnTopHint) self.addrs="" + self.dhcp_server=None self.but_y=60 self.but_y_step=40 self.file_list_init() self.slave_list_init() self.save_but_init() + self.dhcp_but_init() + self.console_but_init() + self.console_but_init() self.cmd_but_init() self.refresh_but_init() self.sstate_but_init() @@ -193,6 +200,23 @@ class updata_dlg(QObject): self.save_but.setToolTip("请先选中要升级的主板和文件,然后点击此按钮发送到设备中。") # self.save_but.setToolTipDuration(1) + # 初始化DHCP服务器按钮 + def dhcp_but_init(self): + self.dhcp_but = QPushButton(self.widget) + self.dhcp_but.setObjectName(u"dhcp_but") + self.dhcp_but.setGeometry(QRect(700, 60, 93, 28)) + self.dhcp_but.setText("打开DHCP") + self.dhcp_but.clicked.connect(self.dhcp_but_clicked) + self.dhcp_but.setToolTip("如果没有搜索到从机,则打开DHCP服务器。") + + # 初始化控制台按钮 + def console_but_init(self): + self.console_but = QPushButton(self.widget) + self.console_but.setObjectName(u"console_but") + self.console_but.setGeometry(QRect(700, 100, 93, 28)) + self.console_but.setText("串口控制台") + self.console_but.clicked.connect(self.console_but_clicked) + self.console_but.setToolTip("通过设备的串口控制台升级程序,这种方式需要使用到U盘。") # 初始化发送命令按钮 def cmd_but_init(self): @@ -379,7 +403,10 @@ class updata_dlg(QObject): elif(i[-4:]==".dtb"): dst_list.append("/run/media/mmcblk2p2/"+i) elif(i[-5:]==".json"): - dst_list.append("/home/root/config/"+"checker_ye_cfg.json") + if(i!="cfg.json"): + dst_list.append("/home/root/config/"+"checker_ye_cfg.json") + else: + dst_list.append("/home/root/config/"+"cfg.json") elif(i[-4:]==".lua"): dst_list.append("/home/root/config/"+"judge.lua") elif(i[-4:]==".axf"): @@ -401,6 +428,35 @@ class updata_dlg(QObject): elif(s=="20通道"): self.addrs="1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20 " print("addrs:",self.addrs) + + def console_but_clicked(self): + print("console_but_clicked.") + dlg=console_uart.console_dlg(self.widget,"串口控制台") + dlg.show() + print("Console end.") + def dhcp_but_clicked(self): + print("dhcp_but clicked.") + if(self.dhcp_server==None): + self.dhcp_but.setEnabled(False) + self.dhcp_server=dhcp.creat_dhcp_server() + self.dhcp_server.new_ip_addr_signal.connect(self.add_slave_slot) + self.dhcp_server.server_start_signal.connect(self.dhcp_start_slot) + self.dhcp_server.server_end_signal.connect(self.dhcp_end_slot) + self.dhcp_server.run_in_thread() + else: + self.dhcp_but.setEnabled(False) + self.dhcp_server.new_ip_addr_signal.disconnect(dlg.add_slave_slot) + self.dhcp_server.close() + def dhcp_start_slot(self): + self.dhcp_but.setText("关闭DHCP") + self.dhcp_but.setEnabled(True) + def dhcp_end_slot(self): + self.dhcp_but.setText("打开DHCP") + self.dhcp_but.setEnabled(True) + self.dhcp_server.server_start_signal.disconnect(self.dhcp_start_slot) + self.dhcp_server.server_end_signal.disconnect(self.dhcp_end_slot) + self.dhcp_server=None + # 发送文件按钮按下 def save_but_clicked(self): print("save_but_clicked.") @@ -528,7 +584,11 @@ class updata_dlg(QObject): # 开始运行 def run(self): self.widget.show() - sys.exit(self.app.exec()) + a=self.app.exec() + print("run end.") + if(self.dhcp_server!=None): + self.dhcp_server.close() + # sys.exit(a) # 扫描文件 def scan_file(self): @@ -557,6 +617,14 @@ class updata_dlg(QObject): list_str.append(i[0]+','+i[1]) self.slave_list.addItems(list_str) + # 添加单个从机地址 + def add_slave_slot(self,mac:str,ip:str,name:str): + for i in range(self.slave_list.count()): + item=self.slave_list.item(i).text() + if(item.split(",")[0]==ip): + return + self.slave_list.addItem(ip+",dhcp_find") + # 扫描指定类型的文件 def find_type(self,types:list): path = self.getpath()+"file\\" @@ -733,4 +801,3 @@ if __name__ == "__main__": lock=locker() dlg=updata_dlg() dlg.run() -