Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Multiprocessing support (to avoid CUDA OOM due to memory spikes) #9042

Closed
lmmx opened this issue Aug 16, 2021 · 6 comments
Closed

[FEA] Multiprocessing support (to avoid CUDA OOM due to memory spikes) #9042

lmmx opened this issue Aug 16, 2021 · 6 comments
Labels
feature request New feature or request

Comments

@lmmx
Copy link

lmmx commented Aug 16, 2021

Is your feature request related to a problem? Please describe.
It is possible to use multiprocessing as described in #5515 last year:

import cudf
import pandas as pd
from multiprocessing import get_context

def get_df(idx):
    pdf = pd.DataFrame({
        'a':[1,2],
        'b':[3,4]
    })
    return cudf.from_pandas(pdf)

if __name__ == "__main__":
    ctx = get_context("spawn")

    # Parallelize the method calls
    with ctx.Pool(2) as pool:
        print(pool.map(get_df, [1,2]))

I was trying to load the WIT dataset with cuDF, as `cudf.read_csv(p, sep="\t") and got it to work with the following helper functions:

import multiprocessing as mp
from multiprocessing import Process, Pool, get_context

def append_and_update(result, results_list, pbar):
    results_list.append(result)
    pbar.update()

def spawned_batch_multiprocess_with_return(
    function_list,
    pool_results=None,
    n_cores=mp.cpu_count(),
    show_progress=False,
    tqdm_desc=None,
):
    """
    Run a list of functions on ``n_cores`` (default: all CPU cores),
    with the option to show a progress bar using tqdm (default: shown).

    As described in https://github.com/rapidsai/cudf/issues/5515
    """
    pool_results = pool_results if pool_results else []
    ctx = get_context("spawn")
    pool = ctx.Pool(processes=n_cores)
    if show_progress:
        pbar = tqdm(total=len(function_list), desc=tqdm_desc)
    for f in function_list:
        if show_progress:
            tqdm_cb = partial(
                append_and_update, results_list=pool_results, pbar=pbar
            )
        callback = tqdm_cb if show_progress else pool_results.append
        pool.apply_async(func=f, callback=tqdm_cb)
    pool.close()
    pool.join()
    return pool_results

When I run this with a list of functions to batch multiprocess which are essentially as follows:

def handle_tsv_file(
    tsv_path: Path,
    tsv_number: int,
):
    """
    Open and process a TSV file (gzipped or uncompressed) of the dataset.

    Args:
      tsv_path        : path to the TSV file (gzipped or uncompressed)
      ...
      tsv_number      : The number in the list of input TSVs (for logging)
    """
    time.sleep(tsv_number * 1.5) # stagger by 1.5 second each
    fields = ["mime_type"]
    t0 = time.time()
    try:
        tsv_df = cudf.read_csv(tsv_path, usecols=fields, sep="\t")
        t1 = time.time()
    except Exception as exc:
        print(f"Got an exception {exc}")
        return [-1] # rather than an error
    else:
        print(f"cuDF took {t1-t0}s")
        return [tsv_df]

I find I get a CUDA Out Of Memory error, hence I introduced the time.sleep call into it, which "staggers" the calls sufficiently so as to avoid multiple functions 'unloading' and causing those spikes that give the CUDA OOM errors:

Got an exception std::bad_alloc: CUDA error at: /home/louis/miniconda3/envs/cudf_demo/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

Describe the solution you'd like
I'm wondering if it's possible to avoid these OOM errors? The spikes must surely be identifiable if they're coming from computation being done by cudf, so perhaps there's some way to identify when one is about to occur and sleep a little while internally to avoid unloading/spiking the memory in that way so as to not error out...?

Describe alternatives you've considered

If I use the staggering as described above, then for 10 files my final file is delayed by (10-1) * 1.5 = 13.5 seconds, and since they all take 24 seconds then it finishes at about 38 seconds. 2 seconds staggering finishes at 42 seconds or so. This is approximately what I saw.

For comparison, pandas.read_csv takes 50 seconds to read these files, and parallelises with multiprocessing without a problem. Unfortunately then, my staggered cuDF solution approaches the time taken by a simpler pandas multiprocessing solution, and I don't achieve the 22 to 24 seconds I see with cuDF for the entire dataset.

Additional context
I just wanted to make you aware of this as I'm still coming to know cuDF and perhaps I am missing an existing solution. Or perhaps you've not thought of using it this way, I don't know? It just seems like my staggering approach would perhaps be better done internally, and I'm sure you know more about the appropriate GPU internals than me.

If you don't see much to be constructive about feel free to close, but I'd love to hear if you have other ideas as those extra 22 seconds would be nice to have!

@quasiben
Copy link
Member

I'm curious if there is some reason in particular why you are using mutliprocessing and not dask/dask-cudf? Are you using multiple GPUs or one GPU with mutli processes ?

@lmmx
Copy link
Author

lmmx commented Aug 17, 2021

I saw the dask-cudf library here but the repo had been archived and there are no docs, so as a new user I assumed (as the note says) that it had been merged into cudf itself. I see now that's not the case. (Actually, it appears to be in cudf/python/dask_cudf but not importable from there?)

  • If you read [BUG] #6613 it's mentioned there how the docs are out of date, perhaps it should be made clearer that the way to install dask_cudf is now from conda install -c rapidsai dask-cudf

One GPU/multiprocess, to clarify.

I've now tried and unfortunately find it far slower than I can achieve with staggered multiprocessing (which in turn is not much faster than simple pandas multiprocessing). This is disappointing as the memory spilling approach should indeed prevent exactly the CUDA OOM memory spikes I was referring to.

I timed dask-cudf at 245 seconds for the 10 files (compared to the 24 second benchmark above for each one of those files with cudf), it's clear that this means the GPU processes are being carried out serially not in parallel.

  • Note: following example here
import dask.dataframe as dd
import dask_cudf
from pathlib import Path
import time

input_tsvs = Path("wit_v1.train.all-0000*-of-00010.tsv.gz")

print(f"Data files: {input_tsvs.name}")

def read_tsv(tsv_path, cudf=True):
    fields = ["mime_type"]
    df = dask_cudf.read_csv(tsv_path, sep="\t", usecols=fields, blocksize=None, chunksize=None)
    return df

t0 = time.time()
df = read_tsv(input_tsvs, cudf=True)
pngs = df[df.mime_type == "image/png"]
print(pngs.compute())
t1 = time.time()
print(f"dask-cudf took: {t1-t0}s")

Data files: wit_v1.train.all-0000*-of-00010.tsv.gz
         mime_type
1        image/png
13       image/png
14       image/png
48       image/png
49       image/png
...            ...
3704171  image/png
3704179  image/png
3704182  image/png
3704193  image/png
3704207  image/png

[4229094 rows x 1 columns]
dask-cudf took: 245.1010594367981s

I suspect the problem here may be the scheduler used by compute, but I can't see any particular scheduler being exposed by dask-cudf. Is there some glaring error here I've overlooked (or any docs describing the type of workflow you're suggesting)?

I changed the glob string to an explicit list of file paths and tried different numbers of files:

  • 1 file: 43 seconds (43s per file)
  • 3 files: 88 seconds (29s per file)
  • 10 files: 243 seconds (24s per file)

So you can see that it isn't even achieving the speed of serial execution of cudf.read_csv for fewer files.

I also tried with dask_cuda (made a client and passed compute(scheduler=client.get)) and saw identical results.

I can achieve better times (120 seconds) by changing the client config at initialisation of threads_per_worker but the jit_unspill argument does not seem to work, and I get a CUDA OOM error above ~3 threads per worker. This is not stable enough therefore to automatically handle, and I don't think it's satisfactory to have to manually tune the threads per worker on a per-CSV basis! Again this is still not as good as the naive 'staggered' scheduler I wrote by sleeping for 1.5 seconds between calls to cudf.read_csv (which was ~40s).

@quasiben
Copy link
Member

Until now, RAPIDS/Dask has assumed a one dask worker/one GPU/one thread model of execution. This was done initially to help reason about CUDA context creation but also to handle OOM issues. However, it's come at the expense of performance. That is, we are probably under-utilizing the GPU. I/O is a good example of how we could be leveraging more performance from the GPU if we had multiple processes using the same GPU.

It's thus not too surprising to see you can achieve higher performance with multiple process on the same GPU compared with standard defaults with dask-cudf/dask-cuda. As you noted, you can configure dask-cuda to use multiple threads and should be able to get similar performance: https://docs.rapids.ai/api/dask-cuda/nightly/api.html?highlight=threads#cmdoption-dask-cuda-worker-nthreads . Unfortunately, these methods also run the risk of OOM errors as you are seeing because each thread acts independently and can/will make cuda memory allocations. Again, dask-cuda can help a bit here by spilling but only if Dask knows about the memory.

In trying to better support multiple threads per GPU, @charlesbluca and @pentschev have done some light testing of PTDS rapidsai/dask-cuda#96 rapidsai/dask-cuda#517 -- cuDF is also keen to enable and test PTDS but will require some refactoring which is going on now

I probably should've asked this at the beginning, how big is the dataset and how much memory do you have on your GPU?

@lmmx
Copy link
Author

lmmx commented Aug 17, 2021

Very interesting, thanks. Per-thread default CUDA streams sound great. Would love for these to become reliable but the OOM makes it a no-go (seems daft to be tuning schedulers just to read CSVs). GPU has 24GB, dataset is 25GB WIT (10 gzip compressed TSVs about 2.5GB each) so it’s just barely over the limit:

We are providing WIT as a set of 10 tsv files (compressed). The total dataset size is about ~25GB. If you want to start quick, pick any one file which will be about ~2.5GB and will give you about ~10% of the data which has about ~3.5M+ image text example sets.

Unfortunately contains intra-row newlines so cannot currently partition (hence blocksize=None) though am trying to solve that aspect separately (to contribute a csv module to fsspec-reference-maker).

@quasiben
Copy link
Member

It's not surprising that you are getting OOM issues if you are right at the limit of what your GPU has. Can you load the entire dataset with cudf.read_csv ? If so, can you perform other operations like groupby-aggs /joins/etc? i ask these additional questions, because those operations and others (just like in pandas) require in operation memory allocations (temporary storage and such) so even if you could load all the data doing additional analysis may also cause OOM problems. However, with tools like Dask you can do out-of-core operations and load only the data which is necessary to perform some chunk of an operation

@lmmx
Copy link
Author

lmmx commented Aug 17, 2021

Er, I can't (NotImplementedError: 'read_csv' does not yet support reading multiple files) but now you put it like that I think I see what you're getting at here: that it's not in scope for cudf to be reading out of memory, which I fully get! D'oh. I think I was grasping for a solution like dask-cudf to assist with the memory errors, which is a separate issue, and it's not for cudf to handle memory spilling. Perhaps also a little confused as the archived repo for dask-cudf points back here, so I maybe thought it was in scope for this repo. I'm going to close this one with apologies for the misunderstanding 😅 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants