From 36cd6f2639648454ab6111531aade7cdf1907b32 Mon Sep 17 00:00:00 2001 From: flixha Date: Thu, 1 Dec 2022 13:54:04 +0100 Subject: [PATCH 1/9] speed up prep_data_for_correlation with custom copy and trace-selection --- eqcorrscan/utils/pre_processing.py | 87 ++++++++++++++++++++++++++---- 1 file changed, 78 insertions(+), 9 deletions(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index e0f515ff3..cfb497638 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -12,8 +12,11 @@ import numpy as np import logging import datetime as dt +from timeit import default_timer +import copy -from collections import Counter +from collections import Counter, defaultdict +from joblib import Parallel, delayed from multiprocessing import Pool, cpu_count from obspy import Stream, Trace, UTCDateTime @@ -728,6 +731,59 @@ def _fill_gaps(tr): return gaps, tr +def _stream_quick_select(stream, seed_id): + """ + 4x quicker selection of traces in stream by full Seed-ID. Does not support + wildcards or selection by network/station/location/channel alone. + """ + net, sta, loc, chan = seed_id.split('.') + stream = Stream( + [tr for tr in stream + if (tr.stats.network == net and + tr.stats.station == sta and + tr.stats.location == loc and + tr.stats.channel == chan)]) + return stream + + +def _quick_copy_trace(trace): + """ + Function to quickly copy a trace. Sets values in the traces' and trace + header's dict directly, circumventing obspy's init functions. + Speedup: from 37 us to 11 us per trace - 3.36x faster + """ + new_trace = Trace() + for key, value in trace.__dict__.items(): + if key == 'stats': + new_stats = Stats() + for key_2, value_2 in value.__dict__.items(): + new_stats.__dict__[key_2] = value_2 + new_trace.__dict__[key] = new_stats + else: # data needs to be deepcopied (and anything else, to be safe) + new_trace.__dict__[key] = copy.deepcopy(value) + return new_trace + + +def _quick_copy_stream(stream): + """ + Function to quickly copy a stream that consists only of empty data traces. + Speedup: from 112 us to 35 us per 3-trace stream - 3.2x faster + + This is what takes longest (1 empty trace, total time to copy 27 us): + copy header: 18 us (vs create new empty header: 683 ns) + + Two points that can speed up copying / creation: + 1. circumvent trace.__init__ and trace.__set_attr__ by setting value + directly in trace's __dict__ + 2. when setting trace header, circumvent that Stats(header) is called + when header is already a Stats instance + """ + new_traces = list() + for trace in stream: + new_traces.append(_quick_copy_trace(trace)) + return Stream(new_traces) + + def _prep_data_for_correlation(stream, templates, template_names=None, force_stream_epoch=True): """ @@ -839,8 +895,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None, net, sta, loc, chan = _seed_id[0].split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, - 'channel': chan, 'starttime': UTCDateTime(), - 'npts': template_length, 'sampling_rate': samp_rate})) + 'channel': chan, 'npts': template_length, + 'sampling_rate': samp_rate})) # Remove templates with no matching channels filt = np.ones(len(template_names)).astype(bool) @@ -874,8 +930,7 @@ def _prep_data_for_correlation(stream, templates, template_names=None, net, sta, loc, chan = earliest_templ_trace_id.split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, - 'channel': chan, 'starttime': UTCDateTime(), - 'npts': template_length, 'sampling_rate': samp_rate})) + 'channel': chan, 'sampling_rate': samp_rate})) stream_nan_data = np.full( stream_length, np.nan, dtype=np.float32) out_stream += Trace( @@ -894,13 +949,27 @@ def _prep_data_for_correlation(stream, templates, template_names=None, for template_name in incomplete_templates: template = _out[template_name] template_starttime = min(tr.stats.starttime for tr in template) - out_template = nan_template.copy() + # out_template = nan_template.copy() + out_template = _quick_copy_stream(nan_template) + + # Select traces very quickly: assume that trace order does not change, + # make dict of trace-ids and list of indices and use indices to select + stream_trace_id_dict = defaultdict(list) + for n, tr in enumerate(template.traces): + stream_trace_id_dict[tr.id].append(n) + for channel_number, _seed_id in enumerate(seed_ids): seed_id, channel_index = _seed_id - template_channel = template.select(id=seed_id) + # Select all traces with same seed_id, based on indices for + # corresponding traces stored in stream_trace_id_dict + # Much quicker than: template_channel = template.select(id=seed_id) + template_channel = Stream([ + template.traces[idx] for idx in stream_trace_id_dict[seed_id]]) if len(template_channel) <= channel_index: - out_template[channel_number].data = nan_channel - out_template[channel_number].stats.starttime = \ + # Direct changing of trace dict is very quick: + out_template[channel_number].__dict__['data'] = nan_channel + out_template[channel_number].__dict__['npts'] = template_length + out_template[channel_number].__dict__['starttime'] = \ template_starttime else: out_template[channel_number] = template_channel[channel_index] From 4b998caef98c6e9d11efc86a193ab9fb7e06b881 Mon Sep 17 00:00:00 2001 From: flixha Date: Mon, 12 Dec 2022 10:04:08 +0100 Subject: [PATCH 2/9] add changelog, remove unused imports --- CHANGES.md | 4 ++++ eqcorrscan/utils/pre_processing.py | 2 -- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 024c576b4..d3601fa62 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,8 @@ ## Current +* utils.pre_processing: + - _prep_data_for_correlation: 3x speedup for filling NaN-traces in templates + - new functions _quick_copy_trace and _quick_stream_copy for 3x quicker + trace / stream copy. ## 0.4.4 * core.match_filter diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index cfb497638..07977805b 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -12,11 +12,9 @@ import numpy as np import logging import datetime as dt -from timeit import default_timer import copy from collections import Counter, defaultdict -from joblib import Parallel, delayed from multiprocessing import Pool, cpu_count from obspy import Stream, Trace, UTCDateTime From e5cc9059532879e0ded81528a25770f18c362c67 Mon Sep 17 00:00:00 2001 From: flixha Date: Thu, 1 Dec 2022 15:33:59 +0100 Subject: [PATCH 3/9] quicker to define empty starttimes with ns=0; need to copy nan-numpy array --- eqcorrscan/utils/pre_processing.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 07977805b..3a4080a0b 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -755,8 +755,11 @@ def _quick_copy_trace(trace): if key == 'stats': new_stats = Stats() for key_2, value_2 in value.__dict__.items(): - new_stats.__dict__[key_2] = value_2 - new_trace.__dict__[key] = new_stats + if isinstance(value_2, UTCDateTime): + new_stats.__dict__[key_2] = UTCDateTime(ns=value_2.ns) + else: + new_stats.__dict__[key_2] = value_2 # copy.deepcopy(value_2) + new_trace.__dict__[key] = new_stats else: # data needs to be deepcopied (and anything else, to be safe) new_trace.__dict__[key] = copy.deepcopy(value) return new_trace @@ -781,7 +784,7 @@ def _quick_copy_stream(stream): new_traces.append(_quick_copy_trace(trace)) return Stream(new_traces) - +# @profile def _prep_data_for_correlation(stream, templates, template_names=None, force_stream_epoch=True): """ @@ -893,8 +896,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None, net, sta, loc, chan = _seed_id[0].split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, - 'channel': chan, 'npts': template_length, - 'sampling_rate': samp_rate})) + 'channel': chan, 'starttime': UTCDateTime(ns=0), + 'npts': template_length, 'sampling_rate': samp_rate})) # Remove templates with no matching channels filt = np.ones(len(template_names)).astype(bool) @@ -928,7 +931,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None, net, sta, loc, chan = earliest_templ_trace_id.split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, - 'channel': chan, 'sampling_rate': samp_rate})) + 'channel': chan, 'starttime': UTCDateTime(ns=0), + 'sampling_rate': samp_rate})) stream_nan_data = np.full( stream_length, np.nan, dtype=np.float32) out_stream += Trace( @@ -964,8 +968,9 @@ def _prep_data_for_correlation(stream, templates, template_names=None, template_channel = Stream([ template.traces[idx] for idx in stream_trace_id_dict[seed_id]]) if len(template_channel) <= channel_index: - # Direct changing of trace dict is very quick: - out_template[channel_number].__dict__['data'] = nan_channel + # out_template[channel_number].data = nan_channel # quicker: + out_template[channel_number].__dict__['data'] = copy.deepcopy( + nan_channel) out_template[channel_number].__dict__['npts'] = template_length out_template[channel_number].__dict__['starttime'] = \ template_starttime From 931491a8eb2a82b26fef0b25b97ce3a75892d4e6 Mon Sep 17 00:00:00 2001 From: flixha Date: Fri, 2 Dec 2022 14:43:31 +0100 Subject: [PATCH 4/9] slightly more optimization for quick trace copy --- eqcorrscan/utils/pre_processing.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 3a4080a0b..62237d225 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -748,20 +748,24 @@ def _quick_copy_trace(trace): """ Function to quickly copy a trace. Sets values in the traces' and trace header's dict directly, circumventing obspy's init functions. - Speedup: from 37 us to 11 us per trace - 3.36x faster + Speedup: from 37 us to 12 us per trace - 3x faster + + Warning: do not use top copy traces with processing history or response + information. """ new_trace = Trace() for key, value in trace.__dict__.items(): if key == 'stats': - new_stats = Stats() + new_stats = new_trace.stats for key_2, value_2 in value.__dict__.items(): if isinstance(value_2, UTCDateTime): new_stats.__dict__[key_2] = UTCDateTime(ns=value_2.ns) else: - new_stats.__dict__[key_2] = value_2 # copy.deepcopy(value_2) - new_trace.__dict__[key] = new_stats + new_stats.__dict__[key_2] = value_2 else: # data needs to be deepcopied (and anything else, to be safe) - new_trace.__dict__[key] = copy.deepcopy(value) + # This can not yet handle copy of complex stats like response + # instance , processing history, etc. + new_trace.__dict__[key] = value # for scalars and strings return new_trace @@ -770,6 +774,9 @@ def _quick_copy_stream(stream): Function to quickly copy a stream that consists only of empty data traces. Speedup: from 112 us to 35 us per 3-trace stream - 3.2x faster + Warning: do not use top copy streams / traces with processing history or + response information. + This is what takes longest (1 empty trace, total time to copy 27 us): copy header: 18 us (vs create new empty header: 683 ns) @@ -784,7 +791,7 @@ def _quick_copy_stream(stream): new_traces.append(_quick_copy_trace(trace)) return Stream(new_traces) -# @profile + def _prep_data_for_correlation(stream, templates, template_names=None, force_stream_epoch=True): """ From 4df0d673b22288e5c65b272aba2d01b4c26d104c Mon Sep 17 00:00:00 2001 From: flixha Date: Mon, 12 Dec 2022 10:15:51 +0100 Subject: [PATCH 5/9] add deepcopy option and documentation to quick copy functions --- eqcorrscan/utils/pre_processing.py | 53 +++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 62237d225..007ff1cca 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -744,14 +744,22 @@ def _stream_quick_select(stream, seed_id): return stream -def _quick_copy_trace(trace): +def _quick_copy_trace(trace, deepcopy_data=True): """ Function to quickly copy a trace. Sets values in the traces' and trace header's dict directly, circumventing obspy's init functions. Speedup: from 37 us to 12 us per trace - 3x faster - Warning: do not use top copy traces with processing history or response - information. + :type trace: :class:`obspy.core.trace.Trace` + :param trace: Stream to quickly copy + :type deepcopy_data: bool + :param deepcopy_data: + Whether to deepcopy trace data (with `deepcopy_data=False` expect up to + 20 % speedup, but use only when you know that data trace contents will + not change or affect results). Warning: do not use this option to copy + traces with processing history or response information. + :rtype: :class:`obspy.core.trace.Trace` + return: trace """ new_trace = Trace() for key, value in trace.__dict__.items(): @@ -762,33 +770,47 @@ def _quick_copy_trace(trace): new_stats.__dict__[key_2] = UTCDateTime(ns=value_2.ns) else: new_stats.__dict__[key_2] = value_2 - else: # data needs to be deepcopied (and anything else, to be safe) - # This can not yet handle copy of complex stats like response - # instance , processing history, etc. - new_trace.__dict__[key] = value # for scalars and strings + elif deepcopy_data: + # data needs to be deepcopied (and anything else, to be safe) + new_trace.__dict__[key] = copy.deepcopy(value) + else: # No deepcopy, e.g. for NaN-traces with no effect on results + new_trace.__dict__[key] = value return new_trace -def _quick_copy_stream(stream): +def _quick_copy_stream(stream, deepcopy_data=True): """ - Function to quickly copy a stream that consists only of empty data traces. - Speedup: from 112 us to 35 us per 3-trace stream - 3.2x faster + Function to quickly copy a stream. + Speedup for simple trace: + from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster - Warning: do not use top copy streams / traces with processing history or - response information. + Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the + changing the data in the stream later does not change results + (e.g., for NaN-trace or when data array will not be changed). This is what takes longest (1 empty trace, total time to copy 27 us): copy header: 18 us (vs create new empty header: 683 ns) - Two points that can speed up copying / creation: 1. circumvent trace.__init__ and trace.__set_attr__ by setting value directly in trace's __dict__ 2. when setting trace header, circumvent that Stats(header) is called when header is already a Stats instance + + :type stream: :class:`obspy.core.stream.Stream` + :param stream: Stream to quickly copy + :type deepcopy_data: bool + :param deepcopy_data: + Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 % + speedup, but use only when you know that data trace contents will not + change or affect results). + + :rtype: :class:`obspy.core.stream.Stream` + return: stream """ new_traces = list() for trace in stream: - new_traces.append(_quick_copy_trace(trace)) + new_traces.append( + _quick_copy_trace(trace, deepcopy_data=deepcopy_data)) return Stream(new_traces) @@ -898,6 +920,7 @@ def _prep_data_for_correlation(stream, templates, template_names=None, # Initialize nan template for speed. nan_channel = np.full(template_length, np.nan, dtype=np.float32) + nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS']) nan_template = Stream() for _seed_id in seed_ids: net, sta, loc, chan = _seed_id[0].split('.') @@ -959,7 +982,7 @@ def _prep_data_for_correlation(stream, templates, template_names=None, template = _out[template_name] template_starttime = min(tr.stats.starttime for tr in template) # out_template = nan_template.copy() - out_template = _quick_copy_stream(nan_template) + out_template = _quick_copy_stream(nan_template, deepcopy_data=False) # Select traces very quickly: assume that trace order does not change, # make dict of trace-ids and list of indices and use indices to select From 8fa548d54eb943562f3ed578542a9cc71e3f7ae9 Mon Sep 17 00:00:00 2001 From: flixha Date: Fri, 2 Dec 2022 16:46:43 +0100 Subject: [PATCH 6/9] use quick stream copy function for continuous data and templates --- eqcorrscan/core/match_filter/matched_filter.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/eqcorrscan/core/match_filter/matched_filter.py b/eqcorrscan/core/match_filter/matched_filter.py index 7c6c50dd9..38996005e 100644 --- a/eqcorrscan/core/match_filter/matched_filter.py +++ b/eqcorrscan/core/match_filter/matched_filter.py @@ -23,7 +23,7 @@ from eqcorrscan.utils.correlate import get_stream_xcorr from eqcorrscan.utils.findpeaks import multi_find_peaks from eqcorrscan.utils.pre_processing import ( - dayproc, shortproc, _prep_data_for_correlation) + dayproc, shortproc, _prep_data_for_correlation, _quick_copy_stream) Logger = logging.getLogger(__name__) @@ -324,8 +324,8 @@ def _group_process(template_group, parallel, cores, stream, daylong, kwargs.update({'endtime': _endtime}) else: _endtime = kwargs['starttime'] + 86400 - chunk_stream = stream.slice(starttime=kwargs['starttime'], - endtime=_endtime).copy() + chunk_stream = _quick_copy_stream( + stream.slice(starttime=kwargs['starttime'], endtime=_endtime)) Logger.debug(f"Processing chunk {i} between {kwargs['starttime']} " f"and {_endtime}") if len(chunk_stream) == 0: @@ -666,8 +666,8 @@ def match_filter(template_names, template_list, st, threshold, if copy_data: # Copy the stream here because we will muck about with it Logger.info("Copying data to keep your input safe") - stream = st.copy() - templates = [t.copy() for t in template_list] + stream = _quick_copy_stream(st) + templates = [_quick_copy_stream(t) for t in template_list] _template_names = template_names.copy() # This can be a shallow copy else: stream, templates, _template_names = st, template_list, template_names From 59302a8de5a6eec9ef2786dbae598687597e814d Mon Sep 17 00:00:00 2001 From: flixha Date: Mon, 12 Dec 2022 10:28:01 +0100 Subject: [PATCH 7/9] remove stream_quick_select from this PR --- eqcorrscan/utils/pre_processing.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 007ff1cca..65fe2bb75 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -729,21 +729,6 @@ def _fill_gaps(tr): return gaps, tr -def _stream_quick_select(stream, seed_id): - """ - 4x quicker selection of traces in stream by full Seed-ID. Does not support - wildcards or selection by network/station/location/channel alone. - """ - net, sta, loc, chan = seed_id.split('.') - stream = Stream( - [tr for tr in stream - if (tr.stats.network == net and - tr.stats.station == sta and - tr.stats.location == loc and - tr.stats.channel == chan)]) - return stream - - def _quick_copy_trace(trace, deepcopy_data=True): """ Function to quickly copy a trace. Sets values in the traces' and trace From 928fe8f69713a2f08895c4ae8e6028052b7ae6d7 Mon Sep 17 00:00:00 2001 From: flixha Date: Tue, 13 Dec 2022 10:24:37 +0100 Subject: [PATCH 8/9] 5 % more speedup with quicker .ns - retrieval --- eqcorrscan/utils/pre_processing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 65fe2bb75..8f7128751 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -752,7 +752,8 @@ def _quick_copy_trace(trace, deepcopy_data=True): new_stats = new_trace.stats for key_2, value_2 in value.__dict__.items(): if isinstance(value_2, UTCDateTime): - new_stats.__dict__[key_2] = UTCDateTime(ns=value_2.ns) + new_stats.__dict__[key_2] = UTCDateTime( + ns=value_2.__dict__['_UTCDateTime__ns']) else: new_stats.__dict__[key_2] = value_2 elif deepcopy_data: From 9fc78dee501997ae99ec91fb2247e670148aa2c8 Mon Sep 17 00:00:00 2001 From: flixha Date: Tue, 3 Jan 2023 16:00:12 +0100 Subject: [PATCH 9/9] properly set stats-dict for nan-channels --- eqcorrscan/utils/pre_processing.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 8f7128751..8526f0e7d 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -987,9 +987,14 @@ def _prep_data_for_correlation(stream, templates, template_names=None, # out_template[channel_number].data = nan_channel # quicker: out_template[channel_number].__dict__['data'] = copy.deepcopy( nan_channel) - out_template[channel_number].__dict__['npts'] = template_length - out_template[channel_number].__dict__['starttime'] = \ + out_template[channel_number].stats.__dict__['npts'] = \ + template_length + out_template[channel_number].stats.__dict__['starttime'] = \ template_starttime + out_template[channel_number].stats.__dict__['endtime'] = \ + UTCDateTime(ns=int( + round(template_starttime.ns + + (template_length * samp_rate) * 1e9))) else: out_template[channel_number] = template_channel[channel_index] # If a template-trace matches a NaN-trace in the stream , then set