import os import sys import dataclasses SIGNAL_INDEX=0 TMP_DIR="build" @dataclasses.dataclass class signal_fun_t: name:str text:str struct:str @dataclasses.dataclass class slot_fun_t: name:str text:str struct:str # 找到文件夹中指定后缀的文件 def find_file(path:str,fix:str): file_list=[] if(os.path.exists(path)): for file_name in os.listdir(path): if file_name.endswith(fix): file_list.append(os.path.join(path, file_name)) else: print(f"{path} 不存在") return file_list # 找到文件夹列表中指定后缀的文件 def find_file_list(path_list:list,fix:str): file_list=[] for path in path_list: file_list+=find_file(path,fix) return file_list # 找到 signal 关键字 定义的行 def find_signal_slot_def(file): list_signal=[] list_slot=[] with open(file,encoding="utf-8") as f: list_str=f.readlines() for i in list_str: if(i[0:6]=="signal"): list_signal.append(i) if(i[0:4]=="slot"): list_slot.append(i) return list_signal,list_slot # 截取参数列表中的变量名 def split_par_name(par_str:str): ret_str="" if(par_str.count('*')>0): ret_str=par_str.split("*")[1].strip() else: ret_str=par_str.split(" ")[1] return ret_str # 生成一个结构体描述 def def_struct_str(signal_fun:str,pars:list): s_pars="" for index,item in enumerate(pars): if(index==0): t=item.replace("*"," ").split()[-1] s_pars+=f"void* {t};\n" else: s_pars+=f" {item};\n" struct_str=f""" typedef struct {'{'} slot_list_with_pars slot_; {s_pars}{'}'}{signal_fun}_args; """ return struct_str # 获取参数列表中的变量名 def get_pars_names(pars:list): names=[] for item in pars: t=item.replace("*"," ") names.append(t.split(" ")[-1]) return names # 生成一个信号函数描述 def def_signal_fun_str(signal_fun:str,pars:list): pars_str="" for index,item in enumerate(get_pars_names(pars)): if(index>0): pars_str+=f" pars->{item} = {item};\n" else: first_par=item pars_str+=f"pars->{item} = slot_p->slot_obj;\n" signal_fun_str=f""" void {signal_fun}({','.join(pars)}){'{'} {signal_fun}_args* pars = NULL; slot_list* slot_p = {first_par}->__sig_obj.slot_head; while (slot_p) {'{'} if (slot_p->signal_func == {signal_fun}) {'{'} pars = calloc(1, sizeof({signal_fun}_args)); pars->slot_.func = slot_p->func; {pars_str} if (slot_p->thread) {'{'} // 异步调用 send_slot_fun(slot_p->thread, (slot_list_with_pars*)pars); {'}'} else {'{'} // 同步调用 slot_p->func(pars); free(pars); {'}'} {'}'} slot_p = slot_p->next; {'}'} {'}'} """ return signal_fun_str # 生成一个槽函数描述 def def_slot_fun_str(slot_fun:str,pars:list): pars_str="" pars_names=get_pars_names(pars) for index,item in enumerate(pars_names): if(index{item}," else: pars_str+=f"self->{item}" unsued_pars="" for item in pars_names: unsued_pars+=f" (void){item};\n" slot_fun_str=f""" __attribute__((weak)) void {slot_fun}({','.join(pars)}){'{'} {unsued_pars}{'}'} static void {slot_fun}_caller(void *args){'{'} {slot_fun}_args *self = args; {slot_fun}({pars_str}); {'}'} """ return slot_fun_str # 生成一个信号函数的实现 def def_signal_fun(line:str): # 删除多余空格 line=' '.join(line.split()) # print(line) list_str=line.split('(') fun_name=list_str[0].split(' ')[1] param_str=list_str[1].split(')')[0] params=[] # 有","则至少有两个参数否则可能有一个参数,可能没有 if(param_str.count(',')>0): params=param_str.split(',') for i in range(len(params)): params[i]=params[i].strip() else: t_str=param_str.strip() if(len(t_str)>0)and(t_str!="void"): params.append(t_str) # print(fun_name,params) struct_str=def_struct_str(fun_name,params) fun_str=def_signal_fun_str(fun_name,params) # print(fun_str) return signal_fun_t(fun_name,fun_str,struct_str) # 生成一个槽函数的实现 def def_slot_fun(line:str): # 删除多余空格 line=' '.join(line.split()) # print(line) list_str=line.split('(') fun_name=list_str[0].split(' ')[1] param_str=list_str[1].split(')')[0] params=[] # 有","则至少有两个参数否则可能有一个参数,可能没有 if(param_str.count(',')>0): params=param_str.split(',') for i in range(len(params)): params[i]=params[i].strip() else: t_str=param_str.strip() if(len(t_str)>0)and(t_str!="void"): params.append(t_str) # print(fun_name,params) struct_str=def_struct_str(fun_name,params) fun_str=def_slot_fun_str(fun_name,params) # print(fun_str) return slot_fun_t(fun_name,fun_str,struct_str) # 生成槽函数指针数组 def gen_slot_fun_array(slot_fun_list:list) -> str: item_list=[] for item in slot_fun_list: item_list.append(f" .func = {item.name}_caller,\n .name = \"{item.name}\"\n") table_str=""" // 定义一个数据结构来保存槽封装函数与槽函数的关系 typedef struct { void (*func)(void*); char *name; } func_name; static const func_name g_func_table[] = { """ for item in item_list: table_str+=f" {'{'}\n{item} {'}'},\n" table_str+="};\n" table_str+=""" void* signal_find_slot_func(const char* name) { for (int i = 0; i < sizeof(g_func_table) / sizeof(func_name); i++) { if (strcmp(name, g_func_table[i].name) == 0) { return g_func_table[i].func; } } return 0; } """ return table_str # 遍历路径中的 .h 文件 返回找到的文件和信号声明 def ergodic_signal_fun(path_list:list): signal_def_list=[] slot_def_list=[] list_file=find_file_list(path_list,".h") signal_header=[] for i in list_file: list_signal,list_slot=find_signal_slot_def(i) if(len(list_signal)>0 or len(list_slot)>0): signal_header.append(i) for j in list_signal: signal_def_list.append(def_signal_fun(j)) for k in list_slot: slot_def_list.append(def_slot_fun(k)) return signal_header,signal_def_list,slot_def_list # 判断是否需要重新生成 def check_rebuild(dst:str,src:list): if(not os.path.exists(dst)): return True dst_time=os.path.getmtime(dst) src_time=[] if(len(src)==0): return True for i in src: src_time.append(os.path.getmtime(i)) src_time.sort() if(src_time[-1]>dst_time): return True return False # 创建 moc 文件 def moc_file_create(out_file_path,scan_path_list): list_file,signal_list,slot_list=ergodic_signal_fun(scan_path_list) # 不需要重新生成 if(not check_rebuild(out_file_path,list_file)): return print(f"生成 {out_file_path}") with open(out_file_path,"w+") as f: f.write("#include \"stdlib.h\"\n") f.write("#include \"string.h\"\n") f.write("#include \"mysignal.h\"\n") for i in list_file: f.write("#include \""+i.split("/")[-1]+"\"\n") f.write("\n\n\n\n\n\n") for item in signal_list: f.write(item.struct) f.write(item.text) f.write("\n\n\n\n\n\n") for item in slot_list: f.write(item.struct) f.write(item.text) f.write("\n\n\n\n\n\n") f.write(gen_slot_fun_array(slot_list)) if __name__=="__main__": moc_file_create(f"{TMP_DIR}/mod_test.c",["test"])