输入通道改为实际使用通道

编译后自动添加到数据库
This commit is contained in:
ranchuan
2023-09-11 17:46:30 +08:00
parent 7baa63de05
commit 9f7ac06880
4 changed files with 189 additions and 11 deletions

View File

@@ -6,7 +6,7 @@
#define BUILD_DATE "2023-09-09 15:39:19"
#define BUILD_DATE "2023-09-11 10:28:07"
#define SOFT_VERSION "0.01"

View File

@@ -2,6 +2,7 @@ import shutil
import sys
import os
import prebuild as time
import mysql
@@ -12,6 +13,11 @@ BOOT_FILE_SRC = "./Objects/boot/coder_2channel_boot"
APP_FILE_DST = "./python/file/coder_2channel_app"
BOOT_FILE_DST = "./Objects/coder_2channel_boot"
# 定义app和boot的数据库目录
SQL_APP_PATH = "coder_stm32f1_app"
SQL_BOOT_PATH = "coder_stm32f1_boot"
# 找到指定后缀的文件
def find_type(fix:str):
@@ -67,8 +73,7 @@ def crc32(data:bytearray):
# app扇区地址为 0x08020000
# 运行地址为 0x08020000
# 运行地址为 0x08004000
# 添加一个128byte为本程序添加的数据头
@@ -142,6 +147,9 @@ def main():
with open(dst,"wb") as f:
f.write(data)
print(dst+' create app file success.')
sql=mysql.sql()
if(sql.init(SQL_APP_PATH)==True):
sql.insert(dst)
boot=BOOT_FILE_SRC+".bin"
boot_dst=BOOT_FILE_DST+"_"+date+".bin"
if os.path.exists(boot):
@@ -156,6 +164,9 @@ def main():
with open(boot_dst,"wb") as f:
f.write(d)
print(boot_dst+" create boot file success.")
sql=mysql.sql()
if(sql.init(SQL_BOOT_PATH)==True):
sql.insert(boot_dst)
else:
print("please build bootloader to create boot file")
if __name__=="__main__":

166
source/mysql.py Normal file
View File

@@ -0,0 +1,166 @@
import pymysql as mysql
import datetime
from datetime import datetime, timedelta
import hashlib
import os
def connect():
try:
db=mysql.connect(host='124.70.178.159',user='admin',passwd='Rc5345750.',port=3306)
print("connect mysql success.")
return db
except Exception as e:
print("can not connect service.")
return None
# 获取北京时间
def get_date():
now_time = datetime.utcnow()
utc_time = now_time + timedelta(hours=8) # UTC只是比北京时间提前了8个小时
utc_time = utc_time.strftime("%Y%m%d")
return utc_time
# 获取北京时间
def get_time():
now_time = datetime.utcnow()
utc_time = now_time + timedelta(hours=8) # UTC只是比北京时间提前了8个小时
utc_time = utc_time.strftime("%Y-%m-%d %H:%M:%S")
return utc_time
# 获取数据md5
def md5(data:bytearray):
m=hashlib.md5(data).hexdigest()
# print(m)
return m
# 获得主板sn号
def get_computer_sn():
sn = os.popen("wmic bios get serialnumber").readlines()
return sn[2].strip()
class sql:
def __init__(self) -> None:
self.download_path="download/"
if not os.path.exists(self.download_path):
os.makedirs(self.download_path)
# 初始化返回True成功
def init(self,table_name:str):
self.db=connect()
self.table_name=table_name
if(self.db!=None):
self.cur=self.db.cursor()
self.cur.execute("use andy_data")
self.cur.execute("select version()")
a=self.cur.fetchone()
print(a)
self.create_table(self.table_name)
return True
return False
# 创建表
def create_table(self,table_name:str):
cmd="""CREATE TABLE IF NOT EXISTS `{d1}`(
`id` INT UNSIGNED AUTO_INCREMENT,
`time` VARCHAR(30) NOT NULL,
`name` VARCHAR(256) NOT NULL,
`md5` VARCHAR(33) NOT NULL,
`data` MEDIUMBLOB NOT NULL,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(d1=table_name)
self.cur.execute(cmd)
# 插入数据
def insert(self,file_name:str):
s=file_name.split('.')
if(len(s)<2):
print("file name without type suffix,will not insert.")
return
with open(file_name,"rb") as f:
d=f.read()
md=md5(d)
lis=self.show()
if(len(lis)>0):
if(lis[-1][3]==md):
print("the same file was saved,will not insert.")
return
try:
cmd="INSERT INTO {d1} (time,name,md5,data) VALUES (%s,%s,%s,%s);".format(d1=self.table_name)
self.db.begin()
self.cur.execute(cmd,([get_time(),file_name,md,d]))
self.db.commit()
print("insert file success.")
except Exception as e:
self.db.rollback()
print(str(e))
# 查看数据
def show(self):
cmd= "select id,time,name,md5 from {d1};".format(d1=self.table_name)
self.cur.execute(cmd)
a=self.cur.fetchall()
# for i in a:
# print(i[0],i[1],i[2],i[3])
return a
# 下载指定文件,返回文件路径
def download(self,id:int):
ack,name=self.exists(id)
if(ack==True):
print("the same file is exists,will not download.")
return name
cmd="select name,data from {d1} WHERE id={d2};".format(d1=self.table_name,d2=id)
self.cur.execute(cmd)
a=self.cur.fetchall()
for i in a:
ss=i[0].replace('\\','/')
ss=ss.split('/')[-1].split('.')
name=self.download_path+ss[0]+' -'+str(id)+'.'+ss[1]
with open(name,'+bw') as f:
f.write(i[1])
return name
print("can not find the file with id:",id)
return ""
# 获取md5
def get_md5(self,id:int):
cmd="select md5 from {d1} WHERE id={d2};".format(d1=self.table_name,d2=id)
self.cur.execute(cmd)
a=self.cur.fetchall()[0]
return a[0]
# 扫描文件
def scan_files(self):
path = self.download_path
if not os.path.exists(path):
os.makedirs(path)
list=os.listdir(path)
return list
# 判断文件是否存在
def exists(self,id:int):
for i in self.scan_files():
s=i.split('.')[-2].split('-')[-1]
if(int(s)==id):
with open(self.download_path+i,"rb") as f:
md=md5(f.read())
if(md==self.get_md5(id)):
return True,i
return False,""
if __name__ == "__main__":
s=sql()
if(s.init("test_data")==True):
s.insert("file\\check_result.csv")
s.show()
file=s.download(1)
print("save file:",file)

View File

@@ -22,17 +22,17 @@
// 定义输入通道别名
//#define INPUT_STOP_KEY 0
//#define INPUT_CHECK_KEY 1
//#define INPUT_MARK_END 2
//#define INPUT_POS_LEFT 6
//#define INPUT_POS_RIGHT 7
// 调试用输入通道别名
#define INPUT_STOP_KEY 0
#define INPUT_CHECK_KEY 1
#define INPUT_MARK_END 2
#define INPUT_POS_LEFT 3
#define INPUT_POS_RIGHT 4
#define INPUT_POS_LEFT 6
#define INPUT_POS_RIGHT 7
// 调试用输入通道别名
//#define INPUT_STOP_KEY 0
//#define INPUT_CHECK_KEY 1
//#define INPUT_MARK_END 2
//#define INPUT_POS_LEFT 3
//#define INPUT_POS_RIGHT 4
// 定义输出通道别名
@@ -188,6 +188,7 @@ static void process_stop(void *arg)
{
self_def *s=arg;
s->stop_state=1;
SET_OUTPUT(OUTPUT_SHELL_BAFFLE,1);
SET_LED_ERR();
DBG_LOG("stop key pressed.");
}