Files
c_soft/create_signal_fun.py
2025-06-26 17:54:49 +08:00

286 lines
7.7 KiB
Python

import os
import sys
import dataclasses
SIGNAL_INDEX=0
TMP_DIR="build"
@dataclasses.dataclass
class signal_fun_t:
name:str
text:str
struct:str
@dataclasses.dataclass
class slot_fun_t:
name:str
text:str
struct:str
# 找到文件夹中指定后缀的文件
def find_file(path:str,fix:str):
file_list=[]
if(os.path.exists(path)):
for file_name in os.listdir(path):
if file_name.endswith(fix):
file_list.append(os.path.join(path, file_name))
else:
print(f"{path} 不存在")
return file_list
# 找到文件夹列表中指定后缀的文件
def find_file_list(path_list:list,fix:str):
file_list=[]
for path in path_list:
file_list+=find_file(path,fix)
return file_list
# 找到 signal 关键字 定义的行
def find_signal_slot_def(file):
list_signal=[]
list_slot=[]
with open(file,encoding="utf-8") as f:
list_str=f.readlines()
for i in list_str:
if(i[0:6]=="signal"):
list_signal.append(i)
if(i[0:4]=="slot"):
list_slot.append(i)
return list_signal,list_slot
# 截取参数列表中的变量名
def split_par_name(par_str:str):
ret_str=""
if(par_str.count('*')>0):
ret_str=par_str.split("*")[1].strip()
else:
ret_str=par_str.split(" ")[1]
return ret_str
# 生成一个结构体描述
def def_struct_str(signal_fun:str,pars:list):
s_pars=""
for index,item in enumerate(pars):
if(index==0):
t=item.replace("*"," ").split()[-1]
s_pars+=f"void* {t};\n"
else:
s_pars+=f" {item};\n"
struct_str=f"""
typedef struct {'{'}
slot_list_with_pars slot_;
{s_pars}{'}'}{signal_fun}_args;
"""
return struct_str
# 获取参数列表中的变量名
def get_pars_names(pars:list):
names=[]
for item in pars:
t=item.replace("*"," ")
names.append(t.split(" ")[-1])
return names
# 生成一个信号函数描述
def def_signal_fun_str(signal_fun:str,pars:list):
pars_str=""
for index,item in enumerate(get_pars_names(pars)):
if(index>0):
pars_str+=f" pars->{item} = {item};\n"
else:
first_par=item
pars_str+=f"pars->{item} = slot_p->slot_obj;\n"
signal_fun_str=f"""
void {signal_fun}({','.join(pars)}){'{'}
{signal_fun}_args* pars = NULL;
slot_list* slot_p = {first_par}->__sig_obj.slot_head;
while (slot_p) {'{'}
if (slot_p->signal_func == {signal_fun}) {'{'}
pars = calloc(1, sizeof({signal_fun}_args));
pars->slot_.func = slot_p->func;
{pars_str} if (slot_p->thread) {'{'}
// 异步调用
send_slot_fun(slot_p->thread, (slot_list_with_pars*)pars);
{'}'} else {'{'}
// 同步调用
slot_p->func(pars);
free(pars);
{'}'}
{'}'}
slot_p = slot_p->next;
{'}'}
{'}'}
"""
return signal_fun_str
# 生成一个槽函数描述
def def_slot_fun_str(slot_fun:str,pars:list):
pars_str=""
pars_names=get_pars_names(pars)
for index,item in enumerate(pars_names):
if(index<len(pars)-1):
pars_str+=f"self->{item},"
else:
pars_str+=f"self->{item}"
unsued_pars=""
for item in pars_names:
unsued_pars+=f" (void){item};\n"
slot_fun_str=f"""
__attribute__((weak)) void {slot_fun}({','.join(pars)}){'{'}
{unsued_pars}{'}'}
static void {slot_fun}_caller(void *args){'{'}
{slot_fun}_args *self = args;
{slot_fun}({pars_str});
{'}'}
"""
return slot_fun_str
# 生成一个信号函数的实现
def def_signal_fun(line:str):
# 删除多余空格
line=' '.join(line.split())
# print(line)
list_str=line.split('(')
fun_name=list_str[0].split(' ')[1]
param_str=list_str[1].split(')')[0]
params=[]
# 有","则至少有两个参数否则可能有一个参数,可能没有
if(param_str.count(',')>0):
params=param_str.split(',')
for i in range(len(params)):
params[i]=params[i].strip()
else:
t_str=param_str.strip()
if(len(t_str)>0)and(t_str!="void"):
params.append(t_str)
# print(fun_name,params)
struct_str=def_struct_str(fun_name,params)
fun_str=def_signal_fun_str(fun_name,params)
# print(fun_str)
return signal_fun_t(fun_name,fun_str,struct_str)
# 生成一个槽函数的实现
def def_slot_fun(line:str):
# 删除多余空格
line=' '.join(line.split())
# print(line)
list_str=line.split('(')
fun_name=list_str[0].split(' ')[1]
param_str=list_str[1].split(')')[0]
params=[]
# 有","则至少有两个参数否则可能有一个参数,可能没有
if(param_str.count(',')>0):
params=param_str.split(',')
for i in range(len(params)):
params[i]=params[i].strip()
else:
t_str=param_str.strip()
if(len(t_str)>0)and(t_str!="void"):
params.append(t_str)
# print(fun_name,params)
struct_str=def_struct_str(fun_name,params)
fun_str=def_slot_fun_str(fun_name,params)
# print(fun_str)
return slot_fun_t(fun_name,fun_str,struct_str)
# 生成槽函数指针数组
def gen_slot_fun_array(slot_fun_list:list) -> str:
item_list=[]
for item in slot_fun_list:
item_list.append(f" .func = {item.name}_caller,\n .name = \"{item.name}\"\n")
table_str="""
// 定义一个数据结构来保存槽封装函数与槽函数的关系
typedef struct {
void (*func)(void*);
char *name;
} func_name;
static const func_name g_func_table[] = {
"""
for item in item_list:
table_str+=f" {'{'}\n{item} {'}'},\n"
table_str+="};\n"
table_str+="""
void* signal_find_slot_func(const char* name) {
for (int i = 0; i < sizeof(g_func_table) / sizeof(func_name); i++) {
if (strcmp(name, g_func_table[i].name) == 0) {
return g_func_table[i].func;
}
}
return 0;
}
"""
return table_str
# 遍历路径中的 .h 文件 返回找到的文件和信号声明
def ergodic_signal_fun(path_list:list):
signal_def_list=[]
slot_def_list=[]
list_file=find_file_list(path_list,".h")
signal_header=[]
for i in list_file:
list_signal,list_slot=find_signal_slot_def(i)
if(len(list_signal)>0 or len(list_slot)>0):
signal_header.append(i)
for j in list_signal:
signal_def_list.append(def_signal_fun(j))
for k in list_slot:
slot_def_list.append(def_slot_fun(k))
return signal_header,signal_def_list,slot_def_list
# 判断是否需要重新生成
def check_rebuild(dst:str,src:list):
if(not os.path.exists(dst)):
return True
dst_time=os.path.getmtime(dst)
src_time=[]
if(len(src)==0):
return True
for i in src:
src_time.append(os.path.getmtime(i))
src_time.sort()
if(src_time[-1]>dst_time):
return True
return False
# 创建 moc 文件
def moc_file_create(out_file_path,scan_path_list):
list_file,signal_list,slot_list=ergodic_signal_fun(scan_path_list)
# 不需要重新生成
if(not check_rebuild(out_file_path,list_file)):
return
print(f"生成 {out_file_path}")
with open(out_file_path,"w+") as f:
f.write("#include \"stdlib.h\"\n")
f.write("#include \"string.h\"\n")
f.write("#include \"mysignal.h\"\n")
for i in list_file:
f.write("#include \""+i.split("/")[-1]+"\"\n")
f.write("\n\n\n\n\n\n")
for item in signal_list:
f.write(item.struct)
f.write(item.text)
f.write("\n\n\n\n\n\n")
for item in slot_list:
f.write(item.struct)
f.write(item.text)
f.write("\n\n\n\n\n\n")
f.write(gen_slot_fun_array(slot_list))
if __name__=="__main__":
moc_file_create(f"{TMP_DIR}/mod_test.c",["test"])