Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup 02: 3x speed up for prep_data_for_correlation with custom copy and trace-selection #525

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* core.match_filter.template
- new quick_group_templates function for 50x quicker template grouping.
* utils.pre_processing
- `_prep_data_for_correlation`: 3x speedup for filling NaN-traces in templates
- New function ``quick_trace_select` for a very efficient selection of trace
by seed ID without wildcards (4x speedup).
* utils.catalog_to_dd._prepare_stream
Expand Down
10 changes: 5 additions & 5 deletions eqcorrscan/core/match_filter/matched_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from eqcorrscan.utils.correlate import get_stream_xcorr
from eqcorrscan.utils.findpeaks import multi_find_peaks
from eqcorrscan.utils.pre_processing import (
dayproc, shortproc, _prep_data_for_correlation)
dayproc, shortproc, _prep_data_for_correlation, _quick_copy_stream)

Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -339,8 +339,8 @@ def _group_process(template_group, parallel, cores, stream, daylong,
kwargs.update({'endtime': _endtime})
else:
_endtime = kwargs['starttime'] + 86400
chunk_stream = stream.slice(starttime=kwargs['starttime'],
endtime=_endtime).copy()
chunk_stream = _quick_copy_stream(
stream.slice(starttime=kwargs['starttime'], endtime=_endtime))
Logger.debug(f"Processing chunk {i} between {kwargs['starttime']} "
f"and {_endtime}")
if len(chunk_stream) == 0:
Expand Down Expand Up @@ -688,8 +688,8 @@ def match_filter(template_names, template_list, st, threshold,
if copy_data:
# Copy the stream here because we will muck about with it
Logger.info("Copying data to keep your input safe")
stream = st.copy()
templates = [t.copy() for t in template_list]
stream = _quick_copy_stream(st)
templates = [_quick_copy_stream(t) for t in template_list]
_template_names = template_names.copy() # This can be a shallow copy
else:
stream, templates, _template_names = st, template_list, template_names
Expand Down
93 changes: 87 additions & 6 deletions eqcorrscan/utils/pre_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np
import logging
import datetime as dt
import copy

from collections import Counter, defaultdict
from multiprocessing import Pool, cpu_count
Expand Down Expand Up @@ -728,6 +729,77 @@ def _fill_gaps(tr):
return gaps, tr


def _quick_copy_trace(trace, deepcopy_data=True):
"""
Function to quickly copy a trace. Sets values in the traces' and trace
header's dict directly, circumventing obspy's init functions.
Speedup: from 37 us to 12 us per trace - 3x faster

:type trace: :class:`obspy.core.trace.Trace`
:param trace: Stream to quickly copy
:type deepcopy_data: bool
:param deepcopy_data:
Whether to deepcopy trace data (with `deepcopy_data=False` expect up to
20 % speedup, but use only when you know that data trace contents will
not change or affect results). Warning: do not use this option to copy
traces with processing history or response information.
:rtype: :class:`obspy.core.trace.Trace`
return: trace
"""
new_trace = Trace()
for key, value in trace.__dict__.items():
if key == 'stats':
new_stats = new_trace.stats
for key_2, value_2 in value.__dict__.items():
if isinstance(value_2, UTCDateTime):
new_stats.__dict__[key_2] = UTCDateTime(
ns=value_2.__dict__['_UTCDateTime__ns'])
else:
new_stats.__dict__[key_2] = value_2
elif deepcopy_data:
# data needs to be deepcopied (and anything else, to be safe)
new_trace.__dict__[key] = copy.deepcopy(value)
else: # No deepcopy, e.g. for NaN-traces with no effect on results
new_trace.__dict__[key] = value
return new_trace


def _quick_copy_stream(stream, deepcopy_data=True):
"""
Function to quickly copy a stream.
Speedup for simple trace:
from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster

Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the
changing the data in the stream later does not change results
(e.g., for NaN-trace or when data array will not be changed).

This is what takes longest (1 empty trace, total time to copy 27 us):
copy header: 18 us (vs create new empty header: 683 ns)
Two points that can speed up copying / creation:
1. circumvent trace.__init__ and trace.__set_attr__ by setting value
directly in trace's __dict__
2. when setting trace header, circumvent that Stats(header) is called
when header is already a Stats instance

:type stream: :class:`obspy.core.stream.Stream`
:param stream: Stream to quickly copy
:type deepcopy_data: bool
:param deepcopy_data:
Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 %
speedup, but use only when you know that data trace contents will not
change or affect results).

:rtype: :class:`obspy.core.stream.Stream`
return: stream
"""
new_traces = list()
for trace in stream:
new_traces.append(
_quick_copy_trace(trace, deepcopy_data=deepcopy_data))
return Stream(new_traces)


def _stream_quick_select(stream, seed_id):
"""
4x quicker selection of traces in stream by full Seed-ID. Does not support
Expand Down Expand Up @@ -849,12 +921,13 @@ def _prep_data_for_correlation(stream, templates, template_names=None,

# Initialize nan template for speed.
nan_channel = np.full(template_length, np.nan, dtype=np.float32)
nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS'])
nan_template = Stream()
for _seed_id in seed_ids:
net, sta, loc, chan = _seed_id[0].split('.')
nan_template += Trace(header=Stats({
'network': net, 'station': sta, 'location': loc,
'channel': chan, 'starttime': UTCDateTime(),
'channel': chan, 'starttime': UTCDateTime(ns=0),
'npts': template_length, 'sampling_rate': samp_rate}))

# Remove templates with no matching channels
Expand Down Expand Up @@ -889,8 +962,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
net, sta, loc, chan = earliest_templ_trace_id.split('.')
nan_template += Trace(header=Stats({
'network': net, 'station': sta, 'location': loc,
'channel': chan, 'starttime': UTCDateTime(),
'npts': template_length, 'sampling_rate': samp_rate}))
'channel': chan, 'starttime': UTCDateTime(ns=0),
'sampling_rate': samp_rate}))
stream_nan_data = np.full(
stream_length, np.nan, dtype=np.float32)
out_stream += Trace(
Expand All @@ -909,7 +982,7 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
for template_name in incomplete_templates:
template = _out[template_name]
template_starttime = min(tr.stats.starttime for tr in template)
out_template = nan_template.copy()
out_template = _quick_copy_stream(nan_template, deepcopy_data=False)

# Select traces very quickly: assume that trace order does not change,
# make dict of trace-ids and list of indices and use indices to select
Expand All @@ -925,9 +998,17 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
template_channel = Stream([
template.traces[idx] for idx in stream_trace_id_dict[seed_id]])
if len(template_channel) <= channel_index:
out_template[channel_number].data = nan_channel
out_template[channel_number].stats.starttime = \
# out_template[channel_number].data = nan_channel # quicker:
out_template[channel_number].__dict__['data'] = copy.deepcopy(
nan_channel)
out_template[channel_number].stats.__dict__['npts'] = \
template_length
out_template[channel_number].stats.__dict__['starttime'] = \
template_starttime
out_template[channel_number].stats.__dict__['endtime'] = \
UTCDateTime(ns=int(
round(template_starttime.ns
+ (template_length * samp_rate) * 1e9)))
else:
out_template[channel_number] = template_channel[channel_index]
# If a template-trace matches a NaN-trace in the stream , then set
Expand Down