import pymysql as mysql import datetime from datetime import datetime, timedelta import hashlib import os def connect(): try: db=mysql.connect(host='124.70.178.159',user='admin',passwd='Rc5345750.',port=3306) print("connect mysql success.") return db except Exception as e: print("can not connect service.") return None # 获取北京时间 def get_date(): now_time = datetime.utcnow() utc_time = now_time + timedelta(hours=8) # UTC只是比北京时间提前了8个小时 utc_time = utc_time.strftime("%Y%m%d") return utc_time # 获取北京时间 def get_time(): now_time = datetime.utcnow() utc_time = now_time + timedelta(hours=8) # UTC只是比北京时间提前了8个小时 utc_time = utc_time.strftime("%Y-%m-%d %H:%M:%S") return utc_time # 获取数据md5 def md5(data:bytearray): m=hashlib.md5(data).hexdigest() # print(m) return m # 获得主板sn号 def get_computer_sn(): sn = os.popen("wmic bios get serialnumber").readlines() return sn[2].strip() class sql: def __init__(self) -> None: self.download_path="download/" if not os.path.exists(self.download_path): os.makedirs(self.download_path) # 初始化返回True成功 def init(self,table_name:str): self.db=connect() self.table_name=table_name if(self.db!=None): self.cur=self.db.cursor() self.cur.execute("use andy_data") self.cur.execute("select version()") a=self.cur.fetchone() print(a) self.create_table(self.table_name) return True return False # 创建表 def create_table(self,table_name:str): cmd="""CREATE TABLE IF NOT EXISTS `{d1}`( `id` INT UNSIGNED AUTO_INCREMENT, `time` VARCHAR(30) NOT NULL, `name` VARCHAR(256) NOT NULL, `md5` VARCHAR(33) NOT NULL, `data` MEDIUMBLOB NOT NULL, `info` VARCHAR(512), PRIMARY KEY ( `id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(d1=table_name) self.cur.execute(cmd) # 插入数据 def insert(self,file_name:str,info:str=""): s=file_name.split('.') if(len(s)<2): print("file name without type suffix,will not insert.") return with open(file_name,"rb") as f: d=f.read() md=md5(d) lis=self.show() if(len(lis)>0): if(lis[-1][3]==md): print("the same file was saved,will not insert.") return try: cmd="INSERT INTO {d1} (time,name,md5,data,info) VALUES (%s,%s,%s,%s,%s);".format(d1=self.table_name) self.db.begin() self.cur.execute(cmd,([get_time(),file_name,md,d,info])) self.db.commit() print("insert file success.") except Exception as e: self.db.rollback() print(str(e)) # 查看数据 def show(self): cmd= "select id,time,name,md5 from {d1};".format(d1=self.table_name) self.cur.execute(cmd) a=self.cur.fetchall() # for i in a: # print(i[0],i[1],i[2],i[3]) return a # 下载指定文件,返回文件路径 def download(self,id:int): ack,name=self.exists(id) if(ack==True): print("the same file is exists,will not download.") return name cmd="select name,data from {d1} WHERE id={d2};".format(d1=self.table_name,d2=id) self.cur.execute(cmd) a=self.cur.fetchall() for i in a: ss=i[0].replace('\\','/') ss=ss.split('/')[-1].split('.') name=self.download_path+ss[0]+' -'+str(id)+'.'+ss[1] with open(name,'+bw') as f: f.write(i[1]) return name print("can not find the file with id:",id) return "" # 获取md5 def get_md5(self,id:int): cmd="select md5 from {d1} WHERE id={d2};".format(d1=self.table_name,d2=id) self.cur.execute(cmd) a=self.cur.fetchall()[0] return a[0] # 扫描文件 def scan_files(self): path = self.download_path if not os.path.exists(path): os.makedirs(path) list=os.listdir(path) return list # 判断文件是否存在 def exists(self,id:int): for i in self.scan_files(): s=i.split('.')[-2].split('-')[-1] if(int(s)==id): with open(self.download_path+i,"rb") as f: md=md5(f.read()) if(md==self.get_md5(id)): return True,i return False,"" if __name__ == "__main__": s=sql() if(s.init("test_data")==True): s.insert("file\\check_result.csv") s.show() file=s.download(1) print("save file:",file)