-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmerge_files.py
119 lines (91 loc) · 4.55 KB
/
merge_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import click
import os
import glob
import numpy as np
import pandas as pd
from tqdm import tqdm
from joblib import Parallel, delayed
from process_simtel_file import write_result_to_file
from colorama import Fore, Style
@click.command()
@click.argument('input_pattern', type=str)
@click.argument('output_file', type=click.Path(dir_okay=False, file_okay=True))
@click.option('--verify/--no-verify', default=False, help='Wether to verify the output file ')
@click.option('-j', '--n_jobs', default=1, help='number of jobs to use for reading data')
@click.option('-c', '--chunk_size', default=50, help='files per chunk')
@click.option('-f', '--hdf_format', default='tables', type=click.Choice(['tables', 'h5py']))
def main(input_pattern, output_file, verify, n_jobs, chunk_size, hdf_format):
"""
Merge multiple hdf5 files matched by INPUT_PATTERN into one hdf5 file saved in OUTPUT_FILE.
The hdf5 file will contain three groups. 'runs', 'array_events', 'telescope_events'.
These files can be put into the classifier tools for learning.
See https://github.com/fact-project/classifier-tools
"""
input_files = glob.glob(input_pattern)
print(f'Found {len(input_files)} files.')
if len(input_files) == 0:
print(f'No files found for pattern {input_pattern} Aborting')
return
if os.path.exists(output_file):
click.confirm('Output file exists. Overwrite?', abort=True)
os.remove(output_file)
n_chunks = (len(input_files) // chunk_size) + 1
chunks = np.array_split(input_files, n_chunks)
if hdf_format == 'tables':
with pd.HDFStore(output_file) as hdf_store:
for chunk in tqdm(chunks):
results = [read_file(f) for f in chunk]
runs = pd.concat([r[0] for r in results])
array_events = pd.concat([r[1] for r in results])
telescope_events = pd.concat([r[2] for r in results])
sort_arrays_inplace(runs, array_events, telescope_events)
hdf_store.append('runs', runs)
hdf_store.append('array_events', array_events)
hdf_store.append('telescope_events', telescope_events)
else:
import fact.io
for chunk in tqdm(chunks):
results = [read_file(f) for f in chunk]
runs = pd.concat([r[0] for r in results])
array_events = pd.concat([r[1] for r in results])
telescope_events = pd.concat([r[2] for r in results])
sort_arrays_inplace(runs, array_events, telescope_events)
fact.io.write_data(runs, output_file, key='runs', mode='a')
fact.io.write_data(telescope_events, output_file, key='telescope_events', mode='a')
fact.io.write_data(array_events, output_file, key='array_events', mode='a')
if verify:
verify_file(output_file, hdf_format)
def sort_arrays_inplace(runs, array_events, telescope_events):
telescope_events.sort_values(by=['run_id', 'array_event_id', 'telescope_id'], inplace=True)
array_events.sort_values(by=['run_id', 'array_event_id'], inplace=True)
runs.sort_values(by=['run_id'], inplace=True)
def read_file(f):
telescope_events = pd.read_hdf(f, 'telescope_events')
array_events = pd.read_hdf(f, 'array_events')
run = pd.read_hdf(f, 'runs')
return run, array_events, telescope_events
def verify_file(input_file_path, format='tables'):
try:
if format == 'tables':
telescope_events = pd.read_hdf(input_file_path, 'telescope_events')
array_events = pd.read_hdf(input_file_path, 'array_events')
runs = pd.read_hdf(input_file_path, 'runs')
else:
import fact.io
telescope_events = fact.io.read_data(input_file_path, key='telescope_events')
array_events = fact.io.read_data(input_file_path, key='array_events')
runs = fact.io.read_data(input_file_path, key='runs')
runs.set_index('run_id', drop=True, verify_integrity=True, inplace=True)
telescope_events.set_index(
['run_id', 'array_event_id', 'telescope_id'], drop=True, verify_integrity=True, inplace=True
)
array_events.set_index(['run_id', 'array_event_id'], drop=True, verify_integrity=True, inplace=True)
print(Fore.GREEN + Style.BRIGHT + f'File "{input_file_path}" seems fine. ✔ ')
print(Style.RESET_ALL)
except:
print(Fore.RED + f'File {input_file_path} seems to be broken. \n')
print(Style.RESET_ALL)
import sys, traceback
traceback.print_exc(file=sys.stdout)
if __name__ == '__main__':
main()