Skip to content

Commit

Permalink
Implement cudf-polars chunked parquet reading (#16944)
Browse files Browse the repository at this point in the history
This PR provides access to the libcudf chunked parquet reader through the `cudf-polars` gpu engine, inspired by the cuDF python implementation. 

Closes #16818

Authors:
  - https://github.com/brandon-b-miller
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16944
  • Loading branch information
brandon-b-miller authored Nov 15, 2024
1 parent d475dca commit aa8c0c4
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 66 deletions.
25 changes: 25 additions & 0 deletions docs/cudf/source/cudf_polars/engine_options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# GPUEngine Configuration Options

The `polars.GPUEngine` object may be configured in several different ways.

## Parquet Reader Options
Reading large parquet files can use a large amount of memory, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks, limiting the peak memory usage at the cost of a small drop in performance.


To configure the parquet reader, we provide a dictionary of options to the `parquet_options` keyword of the `GPUEngine` object. Valid keys and values are:
- `chunked` indicates that chunked parquet reading is to be used. By default, chunked reading is turned on.
- [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk. By default, the maximum chunk size is unlimited.
- [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression. The default pass read limit is 16GiB.

For example, to select the chunked reader with custom values for `pass_read_limit` and `chunk_read_limit`:
```python
engine = GPUEngine(
parquet_options={
'chunked': True,
'chunk_read_limit': int(1e9),
'pass_read_limit': int(4e9)
}
)
result = query.collect(engine=engine)
```
Note that passing `chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect.
6 changes: 6 additions & 0 deletions docs/cudf/source/cudf_polars/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ Launch on Google Colab
:target: https://colab.research.google.com/github/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/polars_gpu_engine_demo.ipynb

Try out the GPU engine for Polars in a free GPU notebook environment. Sign in with your Google account and `launch the demo on Colab <https://colab.research.google.com/github/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/polars_gpu_engine_demo.ipynb>`__.

.. toctree::
:maxdepth: 1
:caption: Engine Config Options:

engine_options
40 changes: 34 additions & 6 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def set_device(device: int | None) -> Generator[int, None, None]:

def _callback(
ir: IR,
config: GPUEngine,
with_columns: list[str] | None,
pyarrow_predicate: str | None,
n_rows: int | None,
Expand All @@ -145,7 +146,30 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
return ir.evaluate(cache={}).to_polars()
return ir.evaluate(cache={}, config=config).to_polars()


def validate_config_options(config: dict) -> None:
"""
Validate the configuration options for the GPU engine.
Parameters
----------
config
Configuration options to validate.
Raises
------
ValueError
If the configuration contains unsupported options.
"""
if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}):
raise ValueError(
f"Engine configuration contains unsupported settings: {unsupported}"
)
assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset(
config.get("parquet_options", {})
)


def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
Expand Down Expand Up @@ -174,10 +198,8 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
device = config.device
memory_resource = config.memory_resource
raise_on_fail = config.config.get("raise_on_fail", False)
if unsupported := (config.config.keys() - {"raise_on_fail"}):
raise ValueError(
f"Engine configuration contains unsupported settings {unsupported}"
)
validate_config_options(config.config)

with nvtx.annotate(message="ConvertIR", domain="cudf_polars"):
translator = Translator(nt)
ir = translator.translate_ir()
Expand All @@ -200,5 +222,11 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
raise exception
else:
nt.set_udf(
partial(_callback, ir, device=device, memory_resource=memory_resource)
partial(
_callback,
ir,
config,
device=device,
memory_resource=memory_resource,
)
)
Loading

0 comments on commit aa8c0c4

Please sign in to comment.