Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HELP] processing waveforms and memory issues with long periods #588

Open
HGeoAle opened this issue Sep 18, 2024 · 10 comments
Open

[HELP] processing waveforms and memory issues with long periods #588

HGeoAle opened this issue Sep 18, 2024 · 10 comments

Comments

@HGeoAle
Copy link

HGeoAle commented Sep 18, 2024

EQcorrscan Version: 0.5.0
ObsPy Version: 1.4.1
NumPy Version: 1.26.4

I'm working on applying the FMF matched filter to a swarm of about 100 events over a month of continuous data. My goal is to create a .dt.cc file with correlations and an events catalog with magnitudes. To do this, I need to process the data and pass the whole processed stream through several steps: lag calculation, relative magnitudes estimation, and writing correlations.

Right now, I load the entire stream into memory to get snippets for each detection using detection.extract_stream(). This approach is faster but requires a lot of RAM. I know I could load only the necessary data for each detection, which would save memory, but that would involve a lot of iterations and potentially redundant data loads, making it time-consuming. Unfortunately, I can't use that strategy for lag_calc, since it needs continuous data rather than snippets.

I'm looking for a more efficient way to handle this processing, ideally one that mirrows the 'client_detect' processing. Is there a way to pass the waveform in smaller segments to ease the memory pressure? Or I’m also thinking a solution could be something like detection.extract_stream() that works with a Wavebank or similar.

I’m also hitting memory issues when running write_correlations. This function is pretty memory-intensive, and I’ve crashed the process more often than I’d like while trying to run it.

the call looks something like this:

write_correlations(new_catalog, stream_dict, dt_length, dt_prepick, shift_len,
                                event_id_mapper=id_mapper, lowcut=None,  highcut=None,
                                max_sep=8, min_link=6, min_cc=0.6, interpolate=False,
                                all_horiz=False, max_workers=None, parallel_process=False, weight_by_square=True)

Any tips on how to make this function run more smoothly or alternative processing strategies would be super helpful!
What is your setup?

Operating System: Rocky Linux release 8.10 (Green Obsidian)
Python Version: 3.11.8
EQcorrscan Version: 0.5.0

@flixha
Copy link
Collaborator

flixha commented Sep 18, 2024

Hi Hugo,

Since you are writing about 100 events, the limitation is as you write the long continuous streams of data. To avoid holding the long streams in memory when you use extract_stream, you can use EQcorrscan's functionality to create a tribe where all your events (detection-templates and picked detections) are handled as templates. Then you can use tribe = Tribe().construct(method="from_client", ... to read the waveforms with the client-functionality. You can write the tribe to a file that you read in from your script, and then run write_correlations with the shortest possible traces in memory:

tribe = Tribe().read('tribe.tgz')
# Prepare dicts for write_correlations
stream_dict = dict(zip([template.event.resource_id.id for t in tribe],
                        [template.st for ttemplate in tribe]))

cat = Catalog([templ.event for templ in short_tribe])
event_ids = [ev.resource_id.id for ev in cat]
event_id_mapper = dict(zip(
    [event.resource_id.id for event in cat],
    range(1, len(stream_dict))))  # consecutive IDs

write_event(cat, event_id_mapper=event_id_mapper)
write_phase(cat, event_id_mapper=event_id_mapper)

event_id_mapper_out2 = write_correlations(
    cat, stream_dict, extract_len=extract_len, pre_pick=pre_pick,
    shift_len=shift_len,
    include_master=False,
    lowcut=None, highcut=None, min_cc=min_cc, min_link=min_link)

If you can decide on lowcut and highcut at template creation, then you can also skip all waveform processing in write_correlations.

I worked some on creating differential arrival times dt.cc files for large problems (up to ~300k events in one run) with write_correlation last year. If you need to scale up beyond thousands of events, running write_correlations in parallel will cost a lot of memory the more events are clustered. I have some extra functionality implemented in write_correlation, e.g., parameters max_sep, max_neighbors, min_max_sep, max_max_sep, use_shared_memory, prepare_sub_stream_dicts, write_dt_from_workers to limit the number of events that are correlated against the master event by each worker. I have not gotten merged in the EQcorrscan's main yet, but I could do that when I get the time, especially if it's relevant for someone.

@HGeoAle
Copy link
Author

HGeoAle commented Sep 21, 2024

Hi Felix,

Thank you very much for the information; it’s been really helpful! I just implemented your suggestions and can now access the waveform snippets more efficiently when needed.

I still have one question. Previously, I was using the multiprocess() function to process my stream of waveforms, aiming to replicate the processing done when constructing the tribe and detecting events. However, since I can now use the client option and process with the tribe construction as you mentioned, I realize I can streamline this.

That said, I still need the picked detections to construct the tribe. To get these picks, I have to run party.lagcalc() with a stream. Currently, I upload the full waveform to pass it to lagcalc(). While this isn’t an issue at the moment, I foresee it becoming a bottleneck as I scale up.

The documentation mentions that this stream can be gappy. I assume the solution would be to upload a processed stream that includes a few seconds of data around the templates and detections, depending on their size and shift lengths. So my question is: is using multiprocess() the right way to process the raw waveforms in a manner that mirrors what is done in tribe.construct() and party.client_detect()? And is uploading this "gappy" processed stream the best approach for passing it to lagcalc()?

Thanks again for your help!

Best,
Hugo

@calum-chamberlain
Copy link
Member

Generally you should let party.lag_calc handle the processing of your streams for you, which ensures the same processing is done as at the detection stage. If you are giving lag_calc a processed stream you should probably also be giving that processed stream to tribe.detect.

There are lots of ways to do this, and without knowing more about the limitations of your system and the scale of your problem I can't recommend the "best" way to do this.

@HGeoAle
Copy link
Author

HGeoAle commented Nov 4, 2024

Update:

I've been working on processing earthquake swarms, which, before template matching, typically consist of 200-2,000 events, with durations ranging from three weeks to two months. After applying template matching, the number of events increases significantly, ranging from 1,700 to 8,000.

I’m running my pipeline on our university's computing cluster, where I primarily use GPU nodes with 198 GB of virtual memory to handle the large data volumes efficiently. The pipeline workflow is as follows:
tribe construction -> detection -> declustering -> lag_calc -> relative magnitudes -> write correlations
Where any retrieval of segments of the events I leverage the tribe construct functionallity to process and trim the "templates" I need.

To optimize memory usage, I implemented a version of lag_calc that uses the waveform bank client. The code is modeled after the chunk-based loading and processing used in template construction and client detection. This setup has allowed me to successfully process many of my largest datasets without running into memory issues, as it avoids loading the entire waveform dataset at once.

However, I am currently facing a challenge when writing correlations for the largest datasets. The process fails when the node runs out of memory and crashes without producing an error message. The issue persists even when I bypass waveform processing, as the tribe construction functionality has already handled that.

Given the tectonic nature of the swarm, the inter-event distances are quite small, making it difficult to filter the correlations effectively using only epicentral distance. I believe a solution similar to what Felix suggested—a more memory-efficient write_correlations function that can handle a high number of events—would be essential.

Any guidance or suggestions on how to approach this issue would be greatly appreciated!

@calum-chamberlain
Copy link
Member

Kia ora Hugo,

Great, thank you for the update! Nice work on updating the lag-calc. I have similarly been implementing a wavebank/client backed lag-calc, but haven't gone the whole way that you have. It would be great if you have time or the inclination to make a pull request to add that code to EQcorrscan.

For write-correlations - yes, this is an issue. I have been working on a more memory efficient (but slower) implementation here for some near-real-time detection and analysis work. You would be welcome to try to use that and see if it meets your needs.

The Correlator object is designed to be a consumer of events and will correlate any "new" event added to it with all old events - this effectively generates a triangular matrix of the correlations. All correlations are written as it goes. There are some bugs with not removing event-pairs when there are not enough links that I need to fix, but generally this is working.

Usage is something like:

import logging

from correlator import Correlator
from obspy import read_events

logging.basicConfig(level="INFO")

cat = read_events("...")

correlator = Correlator(
        minlink=8,
        min_cc=0.2,
        maxsep=10.0,
        shift_len=0.2,
        pre_pick=0.4,
        length=2.0,
        lowcut=1.0,
        highcut=10.0,
        interpolate=False,
        client=WaveBank("waveforms"),  # Or any client-like object - if using a wavebank, the wavebank needs to exist
        max_event_links=500,  # Limit to correlate to only the n nearest events, can be set to None to run everything
        outfile="dt.cc",
        weight_by_square=False)
correlator.add_events(cat)

This will write correlations to the dt.cc file as it goes. Event waveforms are processed once per event and stored on disk then read in on-demand when needed for new event correlations.

Any comments greatly appreciated!

@HGeoAle
Copy link
Author

HGeoAle commented Nov 4, 2024

Thank you Calum, I will implement this soon and let you know how it goes.

About the pull-request. I don't think my programming skills suits that. but here is the module that I wrote. If you have any observations they are most welcome.
https://www.dropbox.com/scl/fi/gzdcc5u0njnbnq1p0kg2t/client_lag_calc.py?rlkey=ri4yp1eeg89zu9hg015az9hso&st=3a3fom4z&dl=0

@calum-chamberlain
Copy link
Member

I suggest testing the above code with a small subset of events first by the way - make sure it is actually doing what you want it to.

Thanks for sharing that link to your code. I think that is somewhat similar to the client_lag_calc implementation here (added to develop very recently in #589). I think our approaches were slightly different, particularly in that I am getting data for all channels in a party, rather than on a per-family basis. If you have time to look and/or test it would be great to know if this new client_lag_calc method meets your needs, or if you have suggestions for issues you ran into or improvements that could/should be made.

@HGeoAle
Copy link
Author

HGeoAle commented Nov 9, 2024

Hi, following this,

I tried to run the Correlator but there is a problem. It needs to import the following:

from eqcorrscan.utils.clustering import dist_array_km

which lead to an import error:

ImportError: cannot import name 'dist_array_km' from 'eqcorrscan.utils.clustering'

I checked and I do not have this function, also I could not find it in the master branch in the github realease.
I thought to skip the function whatsoever to check if it works but it is relevant for the method add_event()
any guidance?

@calum-chamberlain
Copy link
Member

Ah, sorry @HGeoAle - that is in the develop branch of EQcorrscan - there are some things in that branch that should help speed up calculation of cross-correlations as well - you should just be able to pull the develop branch and pip install that, e.g.:

cd EQcorrscan  # To whereever you have the EQcorrscan git repository - clone it from github if you need to.
git checkout develop
git pull origin develop
pip install . -e  # Install in editable mode - this should mean that you can make changes to the source without needing to reinstall.

@dikadissss
Copy link

dikadissss commented Dec 24, 2024

Hi @flixha, have you done push some extra functionality implemented write_correlation in main branch?

Hi @HGeoAle, you use Correlator from eqcorrscan?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants