-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error: "'ProxifyHostFile' object has no attribute 'fast'" upon JIT unspill #704
Comments
Thanks for the bug report @lmmx, much appreciated. I think this is because of two issues.
JIT-unspill doesn't support spill-to-disk at the moment but it is on our to-do list: #657. |
When I use the There is a high load (monitored via htop), the dataset is only 25GB in total and the sizes of each gzipped TSV is 2.5GB compressed, ~6.5GB uncompressed (2.6x larger), so the entire dataset uncompressed is likely ~65GB. Since blocksize is None, the datasets are not partitioned when being spread amongst workers. The same result is seen with 10 threads per worker. It doesn't seem right to me either that there is more CPU memory being used than the uncompressed dataset takes up on disk... cd dev
git clone -b warn_spill_to_disk [email protected]:madsbk/dask-cuda.git
pip install -e dask-cuda
cd -
python dask_test_cudf_multiple_cluster.py ⇣
The htop profile looks like this (memory is the flatter line):
Edit: I've run that again after changing the console logger format (at L172) in Click to show logs with timings
self.logger = logging.getLogger("distributed.worker")
log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
console = logging.StreamHandler()
console.setFormatter(log_format)
self.logger.addHandler(console) ⇣
|
I think it is because of The following works for me without any spilling (15GB CPU and 8GB GPU peak memory usage) from dask.sizeof import sizeof
import dask.dataframe as dd
from distributed.client import wait
from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client
from pathlib import Path
import time
if __name__ == "__main__":
cluster = LocalCUDACluster(threads_per_worker=1, n_workers=1, jit_unspill=True) # runs on 1 local GPU
client = Client(cluster)
store_p = Path.home() / "repos" / "dask-cuda" / "csv_test"
#input_tsv_list = [store_p / "wit_v1.train.all-1percent_sample.tsv"]
input_tsvs = [
store_p / f"wit_v1.train.all-0000{i}-of-00010.tsv.gz"
for i in range(10)
]
#input_tsvs = store_p / f"wit_v1.train.all-0000*-of-00010.tsv.gz"
print(f"Data files: {[i.name for i in input_tsvs]}")
def read_tsv(tsv_path):
fields = ["mime_type"]
df = dask_cudf.read_csv(tsv_path, sep="\t", usecols=fields, blocksize=None, chunksize=None)
return df
t0 = time.time()
df = read_tsv(input_tsvs)
pngs = df[df.mime_type == "image/png"]
pngs = pngs.persist()
wait(pngs)
print(f"dask-cudf persist took: {time.time()-t0}s")
pngs = pngs.compute()
print("sizeof(pngs): ", sizeof(pngs))
print(f"dask-cudf took: {time.time()-t0}s") |
Ah yes I was already aware of that, but my point was to parallelise the work being done here (to achieve the speedup obtained by This issue originated when I was trying to use simple multiprocessing with (I now notice I didn't link back to that as the source of this issue, apologies) Thanks for the wait/persist pattern example, I knew I was missing something when trying to use that... I guess I'll revisit to try again when JIT unspilling is finished up (is there an expected release date?). Thanks for your help -- if you want to try anything else let me know. Will leave this for you to close. |
Have you tried It's worth noting that |
There is no expected release date for spilling to disk but I don't think it would work in any case. Dask and Dask-CUDA is not able to spill data used by running tasks. It is only data that are being staged on workers that can be spilled. I did a talk at the Dask Summit 2021, which explain the inner workings of spilling, you might find interesting: https://www.youtube.com/watch?v=mHWk7y2p-NM |
The code is using Yep I get that it's all WIP, appreciate it. |
That's fair enough, I'm going to focus on improving the ability to partition this [in dask], may help split up the work staging as a result. I came across it yesterday 😄 As far as I could understand at the time was very neat 😎 Cheers all |
Thanks @lmmx |
I'm trying to achieve the speed of cuDF on individual files, for a list of 10 files, and cannot do so. When I tried to run them in parallel using multiprocessing, I got a CUDA OOM error and was directed to use dask-cudf, which uses 'memory spilling', and I've read more about how that works.
Unfortunately, I get the following error,
and the only guidance to troubleshoot I can find is to "use
unproxy()
", which I don't think I can do here (as it's internal to the dask/distributed code).What happened:
The code succeeds with say 3 threads per worker (the number of threads per worker essentially dictates a sliding scale of how parallel the CSVs are read) and achieves a faster time than 1 thread per worker (completely serial), but when a higher number e.g. 15 threads per worker is used, JIT spilling is supposed to take place, and the error occurs where the spilling does not happen correctly.
What you expected to happen:
The CUDA OOM error is supposed to be avoided due to JIT spilling. When JIT spilling fails here I get the CUDA OOM error I was trying to avoid.
Minimal Complete Verifiable Example:
The source files here are the 10 TSVs comprising the WIT dataset, available here
Anything else we need to know?:
Environment:
conda list | grep dask
⇣
The text was updated successfully, but these errors were encountered: