From aebc9a1739c21492b06d824f35e56198159b1ada Mon Sep 17 00:00:00 2001 From: piotrj Date: Wed, 10 Jan 2024 21:56:16 +0100 Subject: [PATCH] communication with subprocess on both windows and linux --- src/core.py | 152 +++++++++++++++++++++++++++++--------------------- src/librer.py | 51 +++++++++++------ src/record.py | 122 ++++++++++++++++++++++------------------ 3 files changed, 192 insertions(+), 133 deletions(-) diff --git a/src/core.py b/src/core.py index 50a6953..06ff886 100644 --- a/src/core.py +++ b/src/core.py @@ -31,12 +31,14 @@ from time import sleep, perf_counter,time,strftime,localtime from threading import Thread -from os import cpu_count,scandir,stat,sep,name as os_name,remove as os_remove +from os import cpu_count,scandir,stat,sep,name as os_name,remove as os_remove,kill,rename +from tempfile import mkdtemp windows = bool(os_name=='nt') if windows: from subprocess import CREATE_NO_WINDOW + from signal import SIGBREAK else: from os import getpgid, killpg @@ -50,7 +52,10 @@ import sys from collections import defaultdict from pathlib import Path as pathlib_Path -from signal import SIGTERM,SIGINT +from signal import SIGTERM,SIGINT,SIGABRT +if windows: + from signal import CTRL_C_EVENT + from pickle import dumps,loads from zstandard import ZstdCompressor,ZstdDecompressor from pympler.asizeof import asizeof @@ -65,6 +70,10 @@ VERSION_FILE='version.txt' +SCAN_DAT_FILE = 'scaninfo' +SEARCH_DAT_FILE = 'searchinfo' +SIGINT_FILE = 'signal' + def get_dev_labes_dict(): lsblk = subprocess_run(['lsblk','-fJ'],capture_output = True,text = True) lsblk.dict = json_loads(lsblk.stdout) @@ -192,30 +201,44 @@ def get_command(executable,parameters,full_file_path,shell): return res,' '.join(res) #'ignore','replace','backslashreplace' -def popen_win(command,shell): - return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=DEVNULL,shell=shell,text=True,universal_newlines=True,creationflags=CREATE_NO_WINDOW,close_fds=False,errors='ignore') +def popen_win(command,shell,stdin=DEVNULL): + return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=stdin,shell=shell,text=True,universal_newlines=True,creationflags=CREATE_NO_WINDOW,close_fds=False,errors='ignore') + +def popen_lin(command,shell,stdin=DEVNULL): + return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=stdin,shell=shell,text=True,universal_newlines=True,start_new_session=True,errors='ignore') + +uni_popen = (lambda command,shell=False,stdin=DEVNULL : popen_win(command,shell,stdin)) if windows else (lambda command,shell=False,stdin=DEVNULL : popen_lin(command,shell,stdin)) + +def send_signal(subproc,temp_dir,kind=0): + try: + signal_file = sep.join([temp_dir,SIGINT_FILE]) + #print(f'sending signal in file {signal_file}') -def popen_lin(command,shell): - return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=DEVNULL,shell=shell,text=True,universal_newlines=True,start_new_session=True,errors='ignore') + temp_signal_file = signal_file+ '_temp' + with open(temp_signal_file,'w') as tsf: + tsf.write(str(kind)) -uni_popen = (lambda command,shell=False : popen_win(command,shell)) if windows else (lambda command,shell=False : popen_lin(command,shell)) + rename(temp_signal_file, signal_file) -def kill_subprocess(subproc,signal=SIGTERM): + except Exception as se: + print(f'subprocess signal error: {se}') + +def kill_subprocess(subproc,print_func=print): try: pid = subproc.pid if windows: kill_cmd = ['taskkill', '/F', '/T', '/PID', str(pid)] - print(f'executing: {kill_cmd}') + #print_func( ('info',f'executing: {kill_cmd}') ) + print_func( ('info',f'killing pid: {pid}') ) subprocess_run(kill_cmd) - print(f'executing: {kill_cmd} - done') else: - print(f'killing process group') - killpg(getpgid(pid), signal) - print(f'killing process group done') + print_func( ('info',f'killing process group of pid {pid}') ) + killpg(getpgid(pid), SIGTERM) + #print_func( ('info',f'killing process group done') ) except Exception as ke: - print(f'kill_subprocess: {ke}') + print_func( ('error',f'kill_subprocess error: {ke}') ) def compress_with_header_update(header,data,compression,datalabel,zip_file): t0 = perf_counter() @@ -292,7 +315,7 @@ def __init__(self,log,label=None,scan_path=None,file_path=None): self.find_results = [] self.info_line = '' - self.info_line_current = '' + #self.info_line_current = '' self.abort_action = False @@ -411,7 +434,9 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check if is_file: self_header_ext_stats[ext]+=1 - self.info_line_current = entry_name + #self.info_line_current = entry_name + + #print_func(('scan-line',entry_name)) try: stat_res = stat(entry) @@ -419,7 +444,7 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check dev=stat_res.st_dev except Exception as e: #self.log.error('stat error:%s', e ) - print_func(f'stat error:{e}' ) + print_func( ('error',f'stat {entry_name} error:{e}') ) #size -1 <=> error, dev,in ==0 is_bind = False size=-1 @@ -433,7 +458,7 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check if dev_call: if dev_call!=dev: #self.log.info('devices mismatch:%s %s %s %s' % (path,entry_name,dev_call,dev) ) - print_func(f'devices mismatch:{path},{entry_name},{dev_call},{dev}' ) + print_func( ('info',f'devices mismatch:{path},{entry_name},{dev_call},{dev}') ) is_bind=True else: dev_call=dev @@ -475,25 +500,20 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check self_header.quant_files += local_folder_files_count self_header.quant_folders += local_folder_folders_count - print_func(('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,self.info_line_current)) + print_func( ('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,path) ) #t_now = perf_counter() #if t_now>self.progress_update_time+1.0: #self.progress_update_time = t_now except Exception as e: #self.log.error('scandir error:%s',e ) - print_func(f'scandir error:{e}' ) + print_func( ('error', f'scandir {path} error:{e}') ) - self.info_line_current = '' + #self.info_line_current = '' return (local_folder_size_with_subtree+local_folder_size,subitems) def scan(self,print_func,abort_list,cde_list,check_dev=True): - #self.info_line = 'Scanning filesystem' - - #results_queue_put = results_queue.append - #self.progress_update_time = perf_counter() - self.header.sum_size = 0 self.header.ext_stats=defaultdict(int) @@ -509,8 +529,6 @@ def scan(self,print_func,abort_list,cde_list,check_dev=True): self.header.scanning_time = time_end-time_start self.filenames = tuple(sorted(list(filenames_set))) - ######################### - #self.info_line = 'indexing filesystem names' self.filenames_helper = {fsname:fsname_index for fsname_index,fsname in enumerate(self.filenames)} @@ -521,14 +539,9 @@ def scan(self,print_func,abort_list,cde_list,check_dev=True): self.customdata_pool_index = 0 if cde_list: - #self.log.info('estimating CD pool') - print_func( ('info','estimating CD pool') ) - #self.info_line = 'estimating files pool for custom data extraction' - #print_func = 'estimating files pool for custom data extraction' + print_func( ('info','estimating files pool for custom data extraction') ) self.prepare_customdata_pool_rec(print_func,abort_list,self.scan_data,[]) - #self.info_line = '' - def prepare_customdata_pool_rec(self,print_func,abort_list,scan_like_data,parent_path): self_header = self.header scan_path = self_header.scan_path @@ -648,7 +661,8 @@ def threaded_cde(timeout_semi_list): full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep) command,command_info = get_command(executable,parameters,full_file_path,shell) - print_func( ('cde',command_info,size, files_cde_size_extracted,self_header.files_cde_errors_quant_all,files_cde_quant,self_header.files_cde_quant_sum,files_cde_size,self_header.files_cde_size_sum) ) + info_line = f'{full_file_path} ({bytes_to_str(size)})' + print_func( ('cde',info_line,size, files_cde_size_extracted,self_header.files_cde_errors_quant_all,files_cde_quant,self_header.files_cde_quant_sum,files_cde_size,self_header.files_cde_size_sum) ) timeout_val=time()+timeout if timeout else None ##################################### @@ -659,7 +673,7 @@ def threaded_cde(timeout_semi_list): except Exception as re: timeout_semi_list[0]=None returncode=201 - output = str(re) + output = f'Exception: {re}' else: subprocess_stdout_readline = subprocess.stdout.readline subprocess_poll = subprocess.poll @@ -682,6 +696,7 @@ def threaded_cde(timeout_semi_list): output = '\n'.join(output_list).strip() if not output: + output = 'No output collected.' returncode=203 ##################################### @@ -741,7 +756,7 @@ def threaded_cde(timeout_semi_list): if timeout_semi_list[0]: timeout_val,subprocess = timeout_semi_list[0] if any(abort_list) or (timeout_val and time()>timeout_val): - kill_subprocess(subprocess) + kill_subprocess(subprocess,print_func) self.killed=True abort_list[1]=False else: @@ -1191,7 +1206,7 @@ def prepare_info(self): sublist=[] for ext,ext_stat in sorted(self.header.ext_stats.items(),key = lambda x : x[1],reverse=True): - sublist.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(ext_stat).rjust(12)} {ext.ljust(longest)}') + sublist.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(ext_stat).rjust(12)} {ext}') info_list.append('') info_list.append('Files extensions statistics by quantity:') info_list.append('========================================') @@ -1199,9 +1214,9 @@ def prepare_info(self): sublist_size=[] for ext,ext_stat in sorted(self.header.ext_stats_size.items(),key = lambda x : x[1],reverse=True): - sublist_size.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(self.header.ext_stats[ext]).rjust(12)} {ext.ljust(longest)}') + sublist_size.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(self.header.ext_stats[ext]).rjust(12)} {ext}') info_list.append('') - info_list.append('Files extensions statistics by sum size:') + info_list.append('Files extensions statistics by space:') info_list.append('========================================') info_list.extend(sublist_size) except Exception as se: @@ -1273,6 +1288,7 @@ def __init__(self,db_dir,log): self.records_to_show=[] self.abort_action=False + self.abort_action_single=False self.search_record_nr=0 self.find_res_quant = 0 @@ -1502,7 +1518,7 @@ def abort(self): self.abort_action = True def abort_single(self): - #print('core abort') + #print('core abort single') self.abort_action_single = True def threaded_read_records(self,load_errors): @@ -1551,8 +1567,8 @@ def find_results_clean(self): stdout_files_cde_size_sum=0 ######################################################################################################################## - def create_new_record(self,settings_file,update_callback): - self.log.info(f'create_new_record {settings_file}') + def create_new_record(self,temp_dir,update_callback): + self.log.info(f'create_new_record') new_file_path = sep.join([self.db_dir,f'rep.{int(time())}.dat']) @@ -1560,7 +1576,8 @@ def create_new_record(self,settings_file,update_callback): command = self.record_exe() command.append('create') command.append(new_file_path) - command.append(settings_file) + #command.append(settings_file) + command.append(temp_dir) self.abort_action=False self.abort_action_single=False @@ -1584,7 +1601,7 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): #results_list_append = results_semi_list[0].find_results.append try: - subprocess = uni_popen(command) + subprocess = uni_popen(command,stdin=PIPE) except Exception as re: print('threaded_run run error',re) info_semi_list[0].append(f'threaded_run run error: {re}') @@ -1597,10 +1614,10 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): while True: if line := subprocess_stdout_readline(): try: + #print(line) if line[0]!='#': val = json_loads(line.strip()) - #print(f'{val=}') self.info_line = val kind = val[0] if kind == 'stage': @@ -1609,6 +1626,8 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): self.stdout_info_line_current = val[1] elif kind == 'info': self.stdout_info_line_current = val[1] + elif kind == 'scan-line': + self.stdout_info_line_current = val[1] else: if self.stage==0: #scan self.stdout_sum_size,self.stdout_quant_files,self.stdout_quant_folders,self.stdout_info_line_current = val[1:5] @@ -1643,7 +1662,7 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): elif self.stage==4: #end pass else: - info_semi_list[0].append(line.strip()) + info_semi_list[0]=line.strip() except Exception as e: print(f'threaded_run work error:{e} line:{line}') info_semi_list[0]=f'threaded_run work error:{e} line:{line}' @@ -1659,23 +1678,31 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): job = Thread(target=lambda : threaded_run(command,results_semi_list,info_semi_list,processes_semi_list),daemon=True) job.start() job_is_alive = job.is_alive - while job_is_alive(): - if self.abort_action: - self.info_line = 'Aborting ...' - - subprocess=processes_semi_list[0] - if subprocess: - try: - subprocess.kill() - except Exception as ke: - print('killing error:',ke) - break + ########################################### + while job_is_alive(): + subprocess=processes_semi_list[0] + if subprocess: + if self.abort_action: + self.info_line = 'Aborting ...' + send_signal(subprocess,temp_dir,0) + self.abort_action=False + if self.abort_action_single: + self.info_line = 'Aborting single ...' + send_signal(subprocess,temp_dir,1) + self.abort_action_single=False + + #try: + # subprocess.kill() + #except Exception as ke: + # print('killing error:',ke) + + #break sleep(0.01) self.info_line = f'scanning czy costam' - job.join() + ########################################### if not self.abort_action: new_record = self.create() @@ -1704,6 +1731,7 @@ def record_exe(self): return(['python3','./src/record.py']) def find_items_in_records(self, + temp_dir, range_par, size_min,size_max, t_min,t_max, @@ -1728,7 +1756,7 @@ def find_items_in_records(self, find_cd_search_kind,cd_expr,cd_case_sens, filename_fuzzy_threshold,cd_fuzzy_threshold) - searchinfofile = sep.join([self.db_dir,'searchinfo']) + searchinfofile = sep.join([temp_dir,SEARCH_DAT_FILE]) try: with open(searchinfofile, "wb") as f: f.write(ZstdCompressor(level=8,threads=1).compress(dumps(params))) @@ -1741,9 +1769,7 @@ def find_items_in_records(self, for record_nr,record in enumerate(records_to_process): curr_command_list = record_command_list[record_nr] = self.record_exe() - curr_command_list.extend(['search',record.file_path]) - - curr_command_list.append(searchinfofile) + curr_command_list.extend(['search',record.file_path,temp_dir]) if t_min: curr_command_list.extend( ['--timestamp_min',str(t_min) ] ) diff --git a/src/librer.py b/src/librer.py index 034ef20..34e2138 100644 --- a/src/librer.py +++ b/src/librer.py @@ -33,6 +33,7 @@ from pathlib import Path from time import strftime,time,mktime from signal import signal,SIGINT + from tkinter import Tk,Toplevel,PhotoImage,Menu,Label,LabelFrame,Frame,StringVar,BooleanVar,IntVar from tkinter.ttk import Treeview,Checkbutton,Radiobutton,Scrollbar,Button,Menubutton,Entry,Scale,Style from tkinter.filedialog import askdirectory,asksaveasfilename,askopenfilename,askopenfilenames @@ -46,6 +47,7 @@ from psutil import disk_partitions from librer_images import librer_image +from shutil import rmtree from dialogs import * from core import * @@ -54,6 +56,7 @@ if windows: from os import startfile from win32api import GetVolumeInformation + from signal import SIGBREAK #l_debug = logging.debug l_info = logging.info @@ -601,6 +604,7 @@ def help_cascade_post(): self_progress_dialog_on_load.abort_button.pack( anchor='center',padx=5,pady=5) self.action_abort=False + self.action_abort_single=False self_progress_dialog_on_load.abort_button.configure(state='normal') self.status_info.configure(image='',text = 'Checking records to load ...') @@ -735,6 +739,9 @@ def help_cascade_post(): self.any_valid_find_results = False self.external_find_params_change=True + self.temp_dir = mkdtemp() + #print(f'{self.temp_dir=}') + self_main.mainloop() def tree_scrollbar_set(self,v1,v2): @@ -2008,6 +2015,9 @@ def delete_window_wrapper(self): self.status('WM_DELETE_WINDOW NOT exiting ...') def exit(self): + #print(f'removing temp dir:{self.temp_dir}') + rmtree(self.temp_dir) + try: self.status('exiting ...') self.cfg.set(CFG_last_dir,self.last_dir) @@ -2473,7 +2483,7 @@ def find_items(self): gc_disable() gc_collect() - search_thread=Thread(target=lambda : librer_core.find_items_in_records(range_par, + search_thread=Thread(target=lambda : librer_core.find_items_in_records(self.temp_dir,range_par, min_num,max_num, t_min,t_max, find_filename_search_kind,find_name,find_name_case_sens, @@ -2751,7 +2761,11 @@ def tree_select(self): if item: record_item,record_name,subpath_list = self.get_item_record(item) - self.current_record = record = self.item_to_record[record_item] + record = self.item_to_record[record_item] + if record != self.current_record: + self.current_record = record + if not self.cfg.get(CFG_KEY_find_range_all): + self.external_find_params_change = True record_name = self.tree.item(record_item,'text') image=self.tree.item(record_item,'image') @@ -3216,10 +3230,8 @@ def scan(self,compression_level): ################################################################################################################################################# - settings_file = sep.join([DATA_DIR,'scaninfo']) - try: - with open(settings_file, "wb") as f: + with open(sep.join([self.temp_dir,SCAN_DAT_FILE]), "wb") as f: f.write(ZstdCompressor(level=8,threads=1).compress(dumps([new_label,path_to_scan_from_entry,check_dev,cde_list]))) except Exception as e: print(e) @@ -3227,7 +3239,7 @@ def scan(self,compression_level): gc_disable() gc_collect() - creation_thread=Thread(target=lambda : librer_core.create_new_record(settings_file,self.single_record_show),daemon=True) + creation_thread=Thread(target=lambda : librer_core.create_new_record(self.temp_dir,self.single_record_show),daemon=True) creation_thread.start() creation_thread_is_alive = creation_thread.is_alive @@ -3279,6 +3291,10 @@ def scan(self,compression_level): if self.action_abort: librer_core.abort() + self.action_abort=False + elif self.action_abort_single: + librer_core.abort_single() + self.action_abort_single=False if prev_stage!=librer_core.stage: self_progress_dialog_on_scan_update_lab_text(0,'') @@ -3543,7 +3559,9 @@ def threaded_simple_run(self,command_list,shell): sys.exit(0) #thread def abort_single(self): - librer_core.abort_single() + self.status("Abort single pressed ...") + l_info("Abort single pressed ...") + self.action_abort_single=True def kill_test(self): if self.subprocess and self.subprocess!=True: @@ -3939,16 +3957,19 @@ def unload_record(self,record=None): if record: record_item = self.record_to_item[record] - self.tree.delete(*self.tree.get_children(record_item)) - self.tree.insert(record_item,'end',text='dummy') #dummy_sub_item - self.tree.set(record_item,'opened','0') - self.tree.item(record_item, open=False) + self_tree = self.tree + + self_tree.delete(*self_tree.get_children(record_item)) + self_tree.insert(record_item,'end',text='dummy') #dummy_sub_item + self_tree.set(record_item,'opened','0') + self_tree.item(record_item, open=False) record.unload_filestructure() record.unload_customdata() - self.tree.item(record_item, image=self.get_record_raw_icon(record),tags=self.RECORD_RAW) - self.tree.focus(record_item) - self.tree.selection_set(record_item) + self_tree.item(record_item, image=self.get_record_raw_icon(record),tags=self.RECORD_RAW) + self_tree.focus(record_item) + self_tree.see(record_item) + self_tree.selection_set(record_item) self.tree_select() @block_actions_processing @@ -4036,8 +4057,6 @@ def show_homepage(self): Gui(getcwd()) - # signal(SIGINT, lambda a, k : librer_core.handle_sigint()) - except Exception as e_main: print(e_main) l_error(e_main) diff --git a/src/record.py b/src/record.py index d18d3cb..be55c98 100644 --- a/src/record.py +++ b/src/record.py @@ -28,15 +28,15 @@ import sys -from os.path import dirname,join as path_join -from os import name as os_name +from os.path import dirname,join as path_join,exists as path_exists +from os import name as os_name,remove from gc import disable as gc_disable from pathlib import Path as pathlib_Path from argparse import ArgumentParser,RawTextHelpFormatter -from time import sleep,perf_counter +from time import sleep,perf_counter,time from threading import Thread @@ -47,10 +47,13 @@ from json import dumps as json_dumps from collections import deque -from signal import signal,SIGINT,SIGTERM +#from signal import signal,SIGINT,CTRL_C_EVENT,SIGTERM,SIGABRT from core import * +#if windows: +# from signal import SIGBREAK + import logging l_info = logging.info @@ -78,7 +81,7 @@ def parse_args(ver): parser.add_argument('file',type=str,help='record dat file') - parser.add_argument('cmdfile',type=str,help='internal commands file') + parser.add_argument('comm_dir',type=str,help='internal communication dir') file_group = parser.add_argument_group() @@ -145,14 +148,19 @@ def find_params_check(self, return None stdout_data_queue=deque() +stdout_data_last_not_printed=None stdout_data_queue_print_time=0 def print_func(data,always=False): now=perf_counter() global stdout_data_queue_print_time + global stdout_data_last_not_printed if now>stdout_data_queue_print_time or always: stdout_data_queue.append(data) stdout_data_queue_print_time=now+0.1 + stdout_data_last_not_printed=None + else: + stdout_data_last_not_printed=data def print_func_always(data): stdout_data_queue.append(data) @@ -160,15 +168,11 @@ def print_func_always(data): def printer_stop(): stdout_data_queue.append(True) -def print_d(data): - print_func(['D',data],True) - -#def print_i(data): -# print_func(['I',data],True) - def printer(): stdout_data_queue_get = stdout_data_queue.popleft + last_print_time=0 + global stdout_data_last_not_printed try: while True: if stdout_data_queue: @@ -176,56 +180,71 @@ def printer(): if result==True: break print(json_dumps(result),flush=True) + last_print_time=time() else: - #sys.stdout.flush() sleep(0.01) - + if stdout_data_last_not_printed: + if time()>last_print_time+0.5: + print(json_dumps(stdout_data_last_not_printed),flush=True) + stdout_data_last_not_printed=None except Exception as pe: print_info(f'printer error:{pe}') - #print_info('printer finished.') - #sys.stdout.flush() sys.exit(0) #thread -def print_info(*args): - print('#',*args) - abort_list=[False,False] -def handle_sigterm(): - print("Received SIGTERM signal [1]") - abort_list[1]=True +def check_abort(signal_file): + while True: + if path_exists(signal_file): + try: + with open(signal_file,'r') as sf: + got_int = int(sf.read().strip()) + + print_info(f'got abort int:{got_int}') + + global abort_list + abort_list[got_int]=True -def handle_sigint(): - #self.status("Received SIGINT signal") - print("Received SIGINT signal [0]") - abort_list[0]=True + remove(signal_file) + + except Exception as pe: + print_info(f'check_abort error:{pe}') + else: + sleep(0.1) + + sys.exit(0) #thread + +def print_info(*args): + print('#',*args) if __name__ == "__main__": VER_TIMESTAMP = get_ver_timestamp() args=parse_args(VER_TIMESTAMP) - cmdfile=args.cmdfile + comm_dir=args.comm_dir logging.basicConfig(level=logging.INFO,format='%(asctime)s %(levelname)s %(message)s', filename='record-temp.log',filemode='w') - signal(SIGINT, lambda a, k : handle_sigint()) - signal(SIGTERM, lambda a, k : handle_sigterm()) - gc_disable() printer_thread = Thread(target=printer,daemon=True) printer_thread.start() + signal_file = sep.join([comm_dir,SIGINT_FILE]) + + abort_thread = Thread(target=lambda : check_abort(signal_file),daemon=True) + abort_thread.start() + if args.command == 'search': ##################################################################### record = LibrerRecord('nowy','sciezka','./record.log') record.load(args.file) print_info(f'record label:{record.header.label}') - print_info(f'{cmdfile=}') - if cmdfile: + print_info(f'{comm_dir=}') + if comm_dir: try: - with open(cmdfile,"rb") as f: + with open(sep.join([comm_dir,SEARCH_DAT_FILE]),"rb") as f: params = loads(ZstdDecompressor().decompress(f.read())) print_info(f'{params=}') except Exception as e: @@ -347,16 +366,14 @@ def handle_sigint(): print_info(f'finished. times:{t1-t0},{t2-t1}') ################################################################### elif args.command == 'create': - #print_i(f'{cmdfile=}') - #print_func(f'{cmdfile=}') - if cmdfile: + if comm_dir: try: - with open(cmdfile,"rb") as f: + with open(sep.join([comm_dir,SCAN_DAT_FILE]),"rb") as f: create_list = loads(ZstdDecompressor().decompress(f.read())) label,path_to_scan,check_dev,cde_list = create_list except Exception as e: - print_i(e) + print_info(e) exit(2) else: new_record = LibrerRecord(logging,label=label,scan_path=path_to_scan) @@ -365,24 +382,21 @@ def handle_sigint(): print_func(['stage',0],True) new_record.scan(print_func,abort_list,tuple(cde_list),check_dev) except Exception as fe: - print_i(f'scan error:{fe}') + print_info(f'scan error:{fe}') else: - if cde_list and not abort_list[0]: - ########################### - - try: - print_func(['stage',1],True) - new_record.extract_customdata(print_func,abort_list) - except Exception as fe: - print_i(f'scan error:{fe}') - else: - pass - print_func(['stage',2],True) - new_record.pack_data(print_func) - print_func(['stage',3],True) - new_record.save(print_func,file_path=args.file,compression_level=9) - print_func(['stage',4],True) - + if not abort_list[0]: + if cde_list : + try: + print_func(['stage',1],True) + new_record.extract_customdata(print_func,abort_list) + except Exception as cde: + print_info(f'cde error:{cde}') + + print_func(['stage',2],True) + new_record.pack_data(print_func) + print_func(['stage',3],True) + new_record.save(print_func,file_path=args.file,compression_level=9) + print_func(['stage',4],True) ##################################################################### else: