Files
python_tools/updata/udp.py

181 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import threading
from PyQt5.QtCore import *
STR_RED="\033[1;31m"
STR_BLUE="\033[1;34m"
STR_YELLOW="\033[1;33m"
STR_END="\033[0m"
class myudp(QObject):
dst_ip_list=[]
# 进度信号ip1~100
rate_signal =pyqtSignal([str,int])
# 结束信号ip成败描述
end_signal = pyqtSignal([str,bool,str])
# 数据信号,有的命令有返回数据
data_signal = pyqtSignal([str,str])
def __init__(self,start,end):
QObject.__init__(self)
self.start=start
self.end=end
def test_comm(self,ip):
# 1.创建socket套接字
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # AF_INET表示使用ipv4,默认不变SOCK_DGRAM表示使用UDP通信协议
# 2.绑定端口port
local_addr = ("", 0) # 默认本机任何ip 指定端口号7878
udp.bind(local_addr) # 绑定端口
udp.settimeout(1.5)
# 3.收发数据
send_data = "whos\n"
udp.sendto(send_data.encode("utf-8"), (ip, 7777)) # 编码成全球统一数据格式用元组表示接收方ip和port
try:
recv_data = udp.recvfrom(1024) # 定义单次最大接收字节数
recv_msg = recv_data[0] # 接收的元组形式的数据有两个元素,第一个为发送信息
send_addr = recv_data[1] # 元组第二个元素为发信息方的ip以及port
# print ("收到的信息为:", recv_msg.decode("utf-8")) # 默认从windows发出的数据解码要用”gbk”保证中文不乱码
# print ("发送方地址为:", str(send_addr)) # 强转为字符串输出地址,保证不乱码
str=recv_msg.decode("utf-8").replace("\n","|")
print("test_comm",str)
list=str.split(",")
str=list[1].replace(":","_")
self.dst_ip_list.append((send_addr[0],str))
except Exception as reason:
pass
# print(str(reason))
# 5.关闭套接字
udp.close()
# 查找从机
def find(self,ip_prefix):
self.dst_ip_list.clear()
threads = []
print("find ip {} to {}".format(self.start,self.end))
for i in range(self.start,self.end):
ip=ip_prefix+".{}".format(i)
# print(ip)
# 需要注意的一点是,当创建的元组中只有一个字符串类型的元素时,该元素后面必须要加一个逗号,,否则 Python 解释器会将它视为字符串
t = threading.Thread(target=self.test_comm, args=(ip,))
threads.append(t)
t.start()
#等待所有线程任务结束。
for t in threads:
t.join()
# 去重
self.dst_ip_list=list(set(self.dst_ip_list))
# 发送命令recv_num返回次数
def send(self,ip,cmd,recv_num,timeout):
resoult=True
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp=udp
self.send_on=True
local_addr = ("", 0)
udp.bind(local_addr)
udp.settimeout(timeout)
udp.sendto(cmd.encode("utf-8"), (ip, 7777))
try:
index=0
while index<recv_num:
if(self.send_on==False):
return False
recv_data = udp.recvfrom(1024)
recv_msg = recv_data[0]
send_addr = recv_data[1]
out=recv_msg.decode("utf-8")
out_list=out.split(",")
ret_list=out_list[0].split(":")
if(ret_list[0]=="ack"):
index+=1
if(ret_list[1]=="1"):
out=("\n"+out).replace("\n",STR_YELLOW+"|"+STR_BLUE)
out=out+STR_END
elif(ret_list[1]=="0"):
out=("\n"+out).replace("\n",STR_YELLOW+"|"+STR_RED)
out=out+STR_END
resoult=False
elif(ret_list[0]=="data"):
index+=1
self.data_signal.emit(ip,out[5:])
elif(ret_list[0]=="rate"):
rate=int(ret_list[1])
self.rate_signal.emit(ip,rate)
else:
out=("\n"+out).replace("\n",STR_YELLOW+"|"+STR_END)
if(ret_list[0]!="rate"):
print(send_addr[0],out)
except Exception as reason:
print(STR_RED,ip,"|"+str(reason),STR_END)
resoult=False
udp.close()
return resoult
# 发送命令列表
def send_list(self,ip,cmd_list):
for i in range(len(cmd_list)):
resoult=self.send(ip,cmd_list[i][0],cmd_list[i][1],cmd_list[i][2])
if(resoult!=True):
err_str="cmd="+cmd_list[i][0]+" failed."
print(STR_RED,ip,"|"+err_str,STR_END)
self.end_signal.emit(ip,False,err_str)
return
# self.rate_signal.emit(ip,(i*100)//len(cmd_list))
self.end_signal.emit(ip,True,"success")
# 广播命令在find之后调用,str 要发送的数据recv_num,返回次数
def bordcast(self,cmd_list):
threads = []
print("bordcast:",cmd_list)
print("ip_list:",self.dst_ip_list)
for i in self.dst_ip_list:
ip=i[0]
t = threading.Thread(target=self.send_list, args=(ip,cmd_list))
threads.append(t)
t.start()
for t in threads:
t.join()
def close(self):
if(self.udp is not None):
self.udp.close()
if(self.send_on is not None):
self.send_on=False
# 定义自动发送的命令列表,命令,返回次数,超时
cmd_list=[]
# cmd_list.append(("local setlog filefilter tcp_client",1,1))
# cmd_list.append(("mcu scheme 1,2,3,4,5,6,7,8,9,10",1,30))
# cmd_list.append(("mcu bootinfo 1,2,3,4,5,6,7,8,9,10",1,5))
# cmd_list.append(("mcu start 10",1,37))
# cmd_list.append(("mcu updata 1,2,3,4,5,6,7,8,9,10",1,900))
# cmd_list.append(("whos",1,2))
# cmd_list.append(("mcu updata 10 /home/root/config/JQChecker_0008.bin",1,900))
# cmd_list.append(("local setcfg hostip 192.168.60.202",1,2))
# cmd_list.append(("local setcfg usetcp true",1,2))
# cmd_list.append(("local setcfg uselua true",1,2))
# cmd_list.append(("local setcfg save",1,2))
cmd_list.append(("mcu start 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20",2,50))
# 扫描从机ip地址
# 返回元组列表iplocal_id
def main():
udp=myudp(1,255)
udp.find("192.168.60")
print(udp.dst_ip_list)
udp.bordcast(cmd_list)
if __name__ == "__main__":
main()