Skip to content

Commit

Permalink
messages queue fix, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
xy committed Jan 13, 2024
1 parent 9af456f commit fc507af
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
27 changes: 8 additions & 19 deletions src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,11 @@ def kill_subprocess(subproc,print_func=print):

if windows:
kill_cmd = ['taskkill', '/F', '/T', '/PID', str(pid)]
#print_func( ('info',f'executing: {kill_cmd}') )
print_func( ('info',f'killing pid: {pid}') )
subprocess_run(kill_cmd)
else:
print_func( ('info',f'killing process group of pid {pid}') )
killpg(getpgid(pid), SIGTERM)
#print_func( ('info',f'killing process group done') )

except Exception as ke:
print_func( ('error',f'kill_subprocess error: {ke}') )
Expand Down Expand Up @@ -1263,10 +1261,6 @@ def create(self,label='',scan_path=''):
self.update_sorted()
return new_record

#def load_record(self):
#self.records.add(new_record)
# pass

def read_records_pre(self):
try:
with scandir(self.db_dir) as res:
Expand Down Expand Up @@ -1527,7 +1521,6 @@ def create_new_record(self,temp_dir,update_callback):
self.log.info(f'create_new_record')
self_log_info = self.log.info


new_file_path = sep.join([self.db_dir,f'rep.{int(time())}.dat'])

command = self.record_exe()
Expand Down Expand Up @@ -1613,7 +1606,6 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
else:
info_semi_list[0]=line_strip
except Exception as e:
print(f'threaded_run work error:{e} line:{line}')
info_semi_list[0]=f'threaded_run work error:{e} line:{line}'
self_log_info(f'threaded_run work error:{e} line:{line}')
else:
Expand All @@ -1629,15 +1621,13 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
job.start()
job_is_alive = job.is_alive

aborted=False
###########################################
while job_is_alive():
subprocess=processes_semi_list[0]
if subprocess:
if self.abort_action:
send_signal(subprocess,temp_dir,0)
self.abort_action=False
aborted=True
if self.abort_action_single:
send_signal(subprocess,temp_dir,1)
self.abort_action_single=False
Expand All @@ -1646,15 +1636,15 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
job.join()
###########################################

if not aborted:
new_record = self.create()
#nie wiadomo czy przerwano na skanie czy cde
new_record = self.create()

if res:=new_record.load(new_file_path) :
self.log.warning('removing:%s',new_file_path)
self.records.remove(new_record)
print(res)
else:
update_callback(new_record)
if res:=new_record.load(new_file_path) :
self.log.warning('removing:%s',new_file_path)
self.records.remove(new_record)
self_log_info(res)
else:
update_callback(new_record)

return True

Expand Down Expand Up @@ -1721,7 +1711,6 @@ def find_items_in_records(self,
############################################################

max_processes = cpu_count()
#max_processes = 1

records_to_process_len = len(records_to_process)

Expand Down
28 changes: 19 additions & 9 deletions src/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,40 @@ def caretaker(signal_file):
sys_stdout_flush = sys.stdout.flush
lines_non_stop=0

def flush_last_data_not_printed(flush):
nonlocal last_data_not_printed
if last_data_not_printed:
print(json_dumps(last_data_not_printed),flush=flush)
last_data_not_printed=None

while True:
now=perf_counter()
now_grater_than_next_time_print = bool(now>next_time_print)

if stdout_data_queue:
data,always=stdout_data_queue_get()

if data==True:
break

if always or now>next_time_print:
if always:
flush_last_data_not_printed(False)

if always or now_grater_than_next_time_print:
print(json_dumps(data),flush=True)
next_time_print=now+print_min_time_period
lines_non_stop+=1
last_data_not_printed=None
else:
last_data_not_printed=data
lines_non_stop+=1

if lines_non_stop<100:
if lines_non_stop<128:
continue
else:
lines_non_stop=0

lines_non_stop=0

if now_grater_than_next_time_print:
flush_last_data_not_printed(True)

if now>next_signal_file_check:
next_signal_file_check=now+signal_file_check_period
Expand All @@ -160,10 +174,6 @@ def caretaker(signal_file):
except Exception as pe:
print_info(f'check_abort error:{pe}')

if last_data_not_printed and now>next_time_print:
print(json_dumps(last_data_not_printed),flush=True)
last_data_not_printed=None

sleep(0.01)

sys.exit(0) #thread
Expand Down

0 comments on commit fc507af

Please sign in to comment.