Skip to content

Commit

Permalink
communication with subprocess on both windows and linux
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrj committed Jan 10, 2024
1 parent b7575f9 commit aebc9a1
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 133 deletions.
152 changes: 89 additions & 63 deletions src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@

from time import sleep, perf_counter,time,strftime,localtime
from threading import Thread
from os import cpu_count,scandir,stat,sep,name as os_name,remove as os_remove
from os import cpu_count,scandir,stat,sep,name as os_name,remove as os_remove,kill,rename

from tempfile import mkdtemp
windows = bool(os_name=='nt')

if windows:
from subprocess import CREATE_NO_WINDOW
from signal import SIGBREAK
else:
from os import getpgid, killpg

Expand All @@ -50,7 +52,10 @@
import sys
from collections import defaultdict
from pathlib import Path as pathlib_Path
from signal import SIGTERM,SIGINT
from signal import SIGTERM,SIGINT,SIGABRT
if windows:
from signal import CTRL_C_EVENT

from pickle import dumps,loads
from zstandard import ZstdCompressor,ZstdDecompressor
from pympler.asizeof import asizeof
Expand All @@ -65,6 +70,10 @@

VERSION_FILE='version.txt'

SCAN_DAT_FILE = 'scaninfo'
SEARCH_DAT_FILE = 'searchinfo'
SIGINT_FILE = 'signal'

def get_dev_labes_dict():
lsblk = subprocess_run(['lsblk','-fJ'],capture_output = True,text = True)
lsblk.dict = json_loads(lsblk.stdout)
Expand Down Expand Up @@ -192,30 +201,44 @@ def get_command(executable,parameters,full_file_path,shell):
return res,' '.join(res)

#'ignore','replace','backslashreplace'
def popen_win(command,shell):
return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=DEVNULL,shell=shell,text=True,universal_newlines=True,creationflags=CREATE_NO_WINDOW,close_fds=False,errors='ignore')
def popen_win(command,shell,stdin=DEVNULL):
return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=stdin,shell=shell,text=True,universal_newlines=True,creationflags=CREATE_NO_WINDOW,close_fds=False,errors='ignore')

def popen_lin(command,shell,stdin=DEVNULL):
return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=stdin,shell=shell,text=True,universal_newlines=True,start_new_session=True,errors='ignore')

uni_popen = (lambda command,shell=False,stdin=DEVNULL : popen_win(command,shell,stdin)) if windows else (lambda command,shell=False,stdin=DEVNULL : popen_lin(command,shell,stdin))

def send_signal(subproc,temp_dir,kind=0):
try:
signal_file = sep.join([temp_dir,SIGINT_FILE])
#print(f'sending signal in file {signal_file}')

def popen_lin(command,shell):
return Popen(command, stdout=PIPE, stderr=STDOUT,stdin=DEVNULL,shell=shell,text=True,universal_newlines=True,start_new_session=True,errors='ignore')
temp_signal_file = signal_file+ '_temp'
with open(temp_signal_file,'w') as tsf:
tsf.write(str(kind))

uni_popen = (lambda command,shell=False : popen_win(command,shell)) if windows else (lambda command,shell=False : popen_lin(command,shell))
rename(temp_signal_file, signal_file)

def kill_subprocess(subproc,signal=SIGTERM):
except Exception as se:
print(f'subprocess signal error: {se}')

def kill_subprocess(subproc,print_func=print):
try:
pid = subproc.pid

if windows:
kill_cmd = ['taskkill', '/F', '/T', '/PID', str(pid)]
print(f'executing: {kill_cmd}')
#print_func( ('info',f'executing: {kill_cmd}') )
print_func( ('info',f'killing pid: {pid}') )
subprocess_run(kill_cmd)
print(f'executing: {kill_cmd} - done')
else:
print(f'killing process group')
killpg(getpgid(pid), signal)
print(f'killing process group done')
print_func( ('info',f'killing process group of pid {pid}') )
killpg(getpgid(pid), SIGTERM)
#print_func( ('info',f'killing process group done') )

except Exception as ke:
print(f'kill_subprocess: {ke}')
print_func( ('error',f'kill_subprocess error: {ke}') )

def compress_with_header_update(header,data,compression,datalabel,zip_file):
t0 = perf_counter()
Expand Down Expand Up @@ -292,7 +315,7 @@ def __init__(self,log,label=None,scan_path=None,file_path=None):
self.find_results = []

self.info_line = ''
self.info_line_current = ''
#self.info_line_current = ''

self.abort_action = False

Expand Down Expand Up @@ -411,15 +434,17 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
if is_file:
self_header_ext_stats[ext]+=1

self.info_line_current = entry_name
#self.info_line_current = entry_name

#print_func(('scan-line',entry_name))

try:
stat_res = stat(entry)
mtime = int(stat_res.st_mtime)
dev=stat_res.st_dev
except Exception as e:
#self.log.error('stat error:%s', e )
print_func(f'stat error:{e}' )
print_func( ('error',f'stat {entry_name} error:{e}') )
#size -1 <=> error, dev,in ==0
is_bind = False
size=-1
Expand All @@ -433,7 +458,7 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
if dev_call:
if dev_call!=dev:
#self.log.info('devices mismatch:%s %s %s %s' % (path,entry_name,dev_call,dev) )
print_func(f'devices mismatch:{path},{entry_name},{dev_call},{dev}' )
print_func( ('info',f'devices mismatch:{path},{entry_name},{dev_call},{dev}') )
is_bind=True
else:
dev_call=dev
Expand Down Expand Up @@ -475,25 +500,20 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
self_header.quant_files += local_folder_files_count
self_header.quant_folders += local_folder_folders_count

print_func(('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,self.info_line_current))
print_func( ('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,path) )
#t_now = perf_counter()
#if t_now>self.progress_update_time+1.0:
#self.progress_update_time = t_now

except Exception as e:
#self.log.error('scandir error:%s',e )
print_func(f'scandir error:{e}' )
print_func( ('error', f'scandir {path} error:{e}') )

self.info_line_current = ''
#self.info_line_current = ''

return (local_folder_size_with_subtree+local_folder_size,subitems)

def scan(self,print_func,abort_list,cde_list,check_dev=True):
#self.info_line = 'Scanning filesystem'

#results_queue_put = results_queue.append
#self.progress_update_time = perf_counter()

self.header.sum_size = 0

self.header.ext_stats=defaultdict(int)
Expand All @@ -509,8 +529,6 @@ def scan(self,print_func,abort_list,cde_list,check_dev=True):
self.header.scanning_time = time_end-time_start

self.filenames = tuple(sorted(list(filenames_set)))
#########################
#self.info_line = 'indexing filesystem names'

self.filenames_helper = {fsname:fsname_index for fsname_index,fsname in enumerate(self.filenames)}

Expand All @@ -521,14 +539,9 @@ def scan(self,print_func,abort_list,cde_list,check_dev=True):
self.customdata_pool_index = 0

if cde_list:
#self.log.info('estimating CD pool')
print_func( ('info','estimating CD pool') )
#self.info_line = 'estimating files pool for custom data extraction'
#print_func = 'estimating files pool for custom data extraction'
print_func( ('info','estimating files pool for custom data extraction') )
self.prepare_customdata_pool_rec(print_func,abort_list,self.scan_data,[])

#self.info_line = ''

def prepare_customdata_pool_rec(self,print_func,abort_list,scan_like_data,parent_path):
self_header = self.header
scan_path = self_header.scan_path
Expand Down Expand Up @@ -648,7 +661,8 @@ def threaded_cde(timeout_semi_list):
full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep)
command,command_info = get_command(executable,parameters,full_file_path,shell)

print_func( ('cde',command_info,size, files_cde_size_extracted,self_header.files_cde_errors_quant_all,files_cde_quant,self_header.files_cde_quant_sum,files_cde_size,self_header.files_cde_size_sum) )
info_line = f'{full_file_path} ({bytes_to_str(size)})'
print_func( ('cde',info_line,size, files_cde_size_extracted,self_header.files_cde_errors_quant_all,files_cde_quant,self_header.files_cde_quant_sum,files_cde_size,self_header.files_cde_size_sum) )

timeout_val=time()+timeout if timeout else None
#####################################
Expand All @@ -659,7 +673,7 @@ def threaded_cde(timeout_semi_list):
except Exception as re:
timeout_semi_list[0]=None
returncode=201
output = str(re)
output = f'Exception: {re}'
else:
subprocess_stdout_readline = subprocess.stdout.readline
subprocess_poll = subprocess.poll
Expand All @@ -682,6 +696,7 @@ def threaded_cde(timeout_semi_list):

output = '\n'.join(output_list).strip()
if not output:
output = 'No output collected.'
returncode=203

#####################################
Expand Down Expand Up @@ -741,7 +756,7 @@ def threaded_cde(timeout_semi_list):
if timeout_semi_list[0]:
timeout_val,subprocess = timeout_semi_list[0]
if any(abort_list) or (timeout_val and time()>timeout_val):
kill_subprocess(subprocess)
kill_subprocess(subprocess,print_func)
self.killed=True
abort_list[1]=False
else:
Expand Down Expand Up @@ -1191,17 +1206,17 @@ def prepare_info(self):

sublist=[]
for ext,ext_stat in sorted(self.header.ext_stats.items(),key = lambda x : x[1],reverse=True):
sublist.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(ext_stat).rjust(12)} {ext.ljust(longest)}')
sublist.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(ext_stat).rjust(12)} {ext}')
info_list.append('')
info_list.append('Files extensions statistics by quantity:')
info_list.append('========================================')
info_list.extend(sublist)

sublist_size=[]
for ext,ext_stat in sorted(self.header.ext_stats_size.items(),key = lambda x : x[1],reverse=True):
sublist_size.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(self.header.ext_stats[ext]).rjust(12)} {ext.ljust(longest)}')
sublist_size.append(f'{bytes_to_str(self.header.ext_stats_size[ext]).rjust(12)} {fnumber(self.header.ext_stats[ext]).rjust(12)} {ext}')
info_list.append('')
info_list.append('Files extensions statistics by sum size:')
info_list.append('Files extensions statistics by space:')
info_list.append('========================================')
info_list.extend(sublist_size)
except Exception as se:
Expand Down Expand Up @@ -1273,6 +1288,7 @@ def __init__(self,db_dir,log):

self.records_to_show=[]
self.abort_action=False
self.abort_action_single=False
self.search_record_nr=0

self.find_res_quant = 0
Expand Down Expand Up @@ -1502,7 +1518,7 @@ def abort(self):
self.abort_action = True

def abort_single(self):
#print('core abort')
#print('core abort single')
self.abort_action_single = True

def threaded_read_records(self,load_errors):
Expand Down Expand Up @@ -1551,16 +1567,17 @@ def find_results_clean(self):
stdout_files_cde_size_sum=0

########################################################################################################################
def create_new_record(self,settings_file,update_callback):
self.log.info(f'create_new_record {settings_file}')
def create_new_record(self,temp_dir,update_callback):
self.log.info(f'create_new_record')

new_file_path = sep.join([self.db_dir,f'rep.{int(time())}.dat'])

#new_record_filename = str(int(time()) + .dat
command = self.record_exe()
command.append('create')
command.append(new_file_path)
command.append(settings_file)
#command.append(settings_file)
command.append(temp_dir)

self.abort_action=False
self.abort_action_single=False
Expand All @@ -1584,7 +1601,7 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
#results_list_append = results_semi_list[0].find_results.append

try:
subprocess = uni_popen(command)
subprocess = uni_popen(command,stdin=PIPE)
except Exception as re:
print('threaded_run run error',re)
info_semi_list[0].append(f'threaded_run run error: {re}')
Expand All @@ -1597,10 +1614,10 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
while True:
if line := subprocess_stdout_readline():
try:
#print(line)
if line[0]!='#':
val = json_loads(line.strip())

#print(f'{val=}')
self.info_line = val
kind = val[0]
if kind == 'stage':
Expand All @@ -1609,6 +1626,8 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
self.stdout_info_line_current = val[1]
elif kind == 'info':
self.stdout_info_line_current = val[1]
elif kind == 'scan-line':
self.stdout_info_line_current = val[1]
else:
if self.stage==0: #scan
self.stdout_sum_size,self.stdout_quant_files,self.stdout_quant_folders,self.stdout_info_line_current = val[1:5]
Expand Down Expand Up @@ -1643,7 +1662,7 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
elif self.stage==4: #end
pass
else:
info_semi_list[0].append(line.strip())
info_semi_list[0]=line.strip()
except Exception as e:
print(f'threaded_run work error:{e} line:{line}')
info_semi_list[0]=f'threaded_run work error:{e} line:{line}'
Expand All @@ -1659,23 +1678,31 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
job = Thread(target=lambda : threaded_run(command,results_semi_list,info_semi_list,processes_semi_list),daemon=True)
job.start()
job_is_alive = job.is_alive
while job_is_alive():
if self.abort_action:
self.info_line = 'Aborting ...'

subprocess=processes_semi_list[0]
if subprocess:
try:
subprocess.kill()
except Exception as ke:
print('killing error:',ke)

break
###########################################
while job_is_alive():
subprocess=processes_semi_list[0]
if subprocess:
if self.abort_action:
self.info_line = 'Aborting ...'
send_signal(subprocess,temp_dir,0)
self.abort_action=False
if self.abort_action_single:
self.info_line = 'Aborting single ...'
send_signal(subprocess,temp_dir,1)
self.abort_action_single=False

#try:
# subprocess.kill()
#except Exception as ke:
# print('killing error:',ke)

#break
sleep(0.01)

self.info_line = f'scanning czy costam'

job.join()
###########################################

if not self.abort_action:
new_record = self.create()
Expand Down Expand Up @@ -1704,6 +1731,7 @@ def record_exe(self):
return(['python3','./src/record.py'])

def find_items_in_records(self,
temp_dir,
range_par,
size_min,size_max,
t_min,t_max,
Expand All @@ -1728,7 +1756,7 @@ def find_items_in_records(self,
find_cd_search_kind,cd_expr,cd_case_sens,
filename_fuzzy_threshold,cd_fuzzy_threshold)

searchinfofile = sep.join([self.db_dir,'searchinfo'])
searchinfofile = sep.join([temp_dir,SEARCH_DAT_FILE])
try:
with open(searchinfofile, "wb") as f:
f.write(ZstdCompressor(level=8,threads=1).compress(dumps(params)))
Expand All @@ -1741,9 +1769,7 @@ def find_items_in_records(self,
for record_nr,record in enumerate(records_to_process):
curr_command_list = record_command_list[record_nr] = self.record_exe()

curr_command_list.extend(['search',record.file_path])

curr_command_list.append(searchinfofile)
curr_command_list.extend(['search',record.file_path,temp_dir])

if t_min:
curr_command_list.extend( ['--timestamp_min',str(t_min) ] )
Expand Down
Loading

0 comments on commit aebc9a1

Please sign in to comment.