import serial import serial.tools.list_ports import threading import time import socket # 把tcp封装为串口 class utcp: is_open=False def __init__(self,port:int)->None: self.ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ser.bind(("",port)) self.ser.settimeout(10) self.ser.listen(128) print("wait for mcu connect.") self.client,self.client_addr=self.ser.accept() print("client:",self.client_addr) self.is_open=True def read(self,len:int): return self.client.recv(len) def write(self,data:bytearray): return self.client.send(data) def close(self): self.client.close() self.ser.close() self.is_open=False class port: def __init__(self) -> None: pass def open(self,name:str,bsp:int): if(name!="utcp"): self.ser = serial.Serial(port=name, baudrate=bsp,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,timeout=None) else: self.ser=utcp(9527) def start_recv(self): self.thread_ = threading.Thread(target=self.recv, args=()) self.thread_.start() def recv(self,num:int): d=bytes() try: d=self.ser.read(num) except Exception as a: print("port closed") return # print("recv:",d.hex(" ")) def send(self,data:bytearray): self.ser.write(data) # 测试上位机 def ecode_test(self,times:int): print("检测赋码系统耗时测试:") tick=0 while(tick