Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup 07: 2.5x speedup with parallel parallel median / MAD calculation #531

Merged
merged 8 commits into from
Jan 3, 2023

Conversation

flixha
Copy link
Collaborator

@flixha flixha commented Dec 12, 2022

What does this PR do?

Implements 2.5x speedup for MAD threshold calculation in core.match_filter.matched_filter. With this PR, MAD thresholds can be calculated in parallel; with joblib this is already worth it with a relatively small dataset (> 15 cccsum or 2e7 values in cccsums).

This has a noticeable effect for larger datasets, e.g., for 2000 templates, this reduced MAD-calculation from 50 s to 20 s with 30 cores.

Why was it initiated? Any relevant Issues?

  • Computing median is not a cheap operation for big arrays, even in numpy.
  • I checked two other ways for speeding this up:
    • full vectorization with numpy --> 30 % slower than serial list-comprehension
    • parallelization with Pool --> 50% slower than serial

This PR contributes to the summary issue in #522

Examples for comparing serial and parallel MAD:

import numpy as np
from joblib import delayed, Parallel
from multiprocessing import Pool

def _mad(cccsum):
    """
    Internal helper to compute MAD-thresholds in parallel.
    """
    return np.median(np.abs(cccsum))

median_cores = 20
threshold = 8
cccsums = [np.random.randn(2000000) for j in range(100) ]

thresholds_ser = [threshold * np.median(np.abs(cccsum)) for cccsum in cccsums]

pool = Pool(processes=median_cores)
results = [pool.apply_async(_mad, ([cccsum]),) for cccsum in cccsums]
pool.close()
thresholds_pool = [threshold * p.get() for p in results]

medians = Parallel(n_jobs=median_cores)(delayed(
    _mad)(cccsum) for cccsum in cccsums)
thresholds_joblib = [threshold * median for median in medians]

for item1, item2 in zip(thresholds_ser, thresholds_joblib):
    if item1 != item2:
        print('not equal')

%timeit pool = Pool(processes=median_cores); results = [pool.apply_async(_mad, ([cccsum]),) for cccsum in cccsums]; pool.close(); pool_tresholds = [threshold * p.get() for p in results]
%timeit [threshold * np.median(np.abs(cccsum)) for cccsum in cccsums]
%timeit Parallel(n_jobs=median_cores)(delayed(_mad)(cccsum) for cccsum in cccsums)

PR Checklist

  • develop base branch selected?
  • This PR is not directly related to an existing issue (which has no PR yet).
  • [] All tests still pass.
    - [ ] Any new features or fixed regressions are be covered via new tests.
    - [ ] Any new or changed features have are fully documented.
  • Significant changes have been added to CHANGES.md.
    - [ ] First time contributors have added your name to CONTRIBUTORS.md.

@calum-chamberlain
Copy link
Member

Fails are coming from the need for the joblib dependency. I'm not averse to adding joblib as a dependency, but if we can avoid it then that would be great. Do you know why joblib has such an advantage over direct multiprocessing? Because the mad function is just calling numpy funcs this could also be a use for multithreading given that numpy should release the GIL.

This would also be fun to write a quick and simple c-func for to compare speed.

@calum-chamberlain
Copy link
Member

Timings from me:

  • 2.54 s +/- 7.96 ms for serial loop
  • 2.79 s +/- 115 ms for numpy (%timeit np.median(np.abs(cccsums), axis=1) * threshold - note that cccsums should be a numpy array already I think?)
  • 466 ms +/- 3 ms for ThreadPoolExecutor from concurrent.futures (%timeit executor = ThreadPoolExecutor(); mads = executor.map(_mad, cccsums); executor.shutdown(); mads = [threshold * mad for mad in mads]
  • 4.07 s +/- 78.2 ms for ProcessPoolExecutor (equivalent of multiprocessing): (%timeit executor = ProcessPoolExecutor(); mads = executor.map(_mad, cccsums); executor.shutdown(); mads = [threshold * mad for mad in mads])
  • 1.18 s +/- 32 ms for joblib parallel: note missing multiplication by threshold which shouldn't add much.

Based on this and the additional dependency I'm going to suggest a change to using a ThreadPoolExecutor for this. The joy of numpy releasing the GIL!

@flixha
Copy link
Collaborator Author

flixha commented Jan 3, 2023

I felt that I was searching for alternative solutions and how to best parallelize numpy operations for quite a while - but obviously missed out on how much threadpools actually help because numpy releases the GIL. Your suggestion is of course much cleaner in not needing joblib, and being another 2.5 times faster it's a lot better. So a total speedup of 5x sounds very good, and closer to what I was expecting should be possible here :-). Thanks a lot!

And while the joblib-parallelization quickly saturates with the number of cores because it needs to transfer a lot of memory, the threaded execution continues to scale with more cores - so for a cccsums of 2000000 x 500 elements I got a speedup of 8x over joblib (0.75 s vs 6.1 s), or 15x over serial with 40 cores - awesome!

@flixha flixha merged commit b2ecb31 into eqcorrscan:develop Jan 3, 2023
@calum-chamberlain
Copy link
Member

Great, thanks for merging this! I would love to take advantage of numpy releasing the GIL in more places (and this is one of my internal arguments for redesigning EQcorrscan objects so that they immediately work on numpy array rather than obspy streams, but that is something for when I have more time...).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants