diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md new file mode 100644 index 00000000000..4c930c7392d --- /dev/null +++ b/docs/cudf/source/cudf_polars/engine_options.md @@ -0,0 +1,25 @@ +# GPUEngine Configuration Options + +The `polars.GPUEngine` object may be configured in several different ways. + +## Parquet Reader Options +Reading large parquet files can use a large amount of memory, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks, limiting the peak memory usage at the cost of a small drop in performance. + + +To configure the parquet reader, we provide a dictionary of options to the `parquet_options` keyword of the `GPUEngine` object. Valid keys and values are: +- `chunked` indicates that chunked parquet reading is to be used. By default, chunked reading is turned on. +- [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk. By default, the maximum chunk size is unlimited. +- [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression. The default pass read limit is 16GiB. + +For example, to select the chunked reader with custom values for `pass_read_limit` and `chunk_read_limit`: +```python +engine = GPUEngine( + parquet_options={ + 'chunked': True, + 'chunk_read_limit': int(1e9), + 'pass_read_limit': int(4e9) + } +) +result = query.collect(engine=engine) +``` +Note that passing `chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect. diff --git a/docs/cudf/source/cudf_polars/index.rst b/docs/cudf/source/cudf_polars/index.rst index 0a3a0d86b2c..6fd98a6b5da 100644 --- a/docs/cudf/source/cudf_polars/index.rst +++ b/docs/cudf/source/cudf_polars/index.rst @@ -39,3 +39,9 @@ Launch on Google Colab :target: https://colab.research.google.com/github/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/polars_gpu_engine_demo.ipynb Try out the GPU engine for Polars in a free GPU notebook environment. Sign in with your Google account and `launch the demo on Colab `__. + +.. toctree:: + :maxdepth: 1 + :caption: Engine Config Options: + + engine_options diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index d085f21e0ad..c446ce0384e 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -129,6 +129,7 @@ def set_device(device: int | None) -> Generator[int, None, None]: def _callback( ir: IR, + config: GPUEngine, with_columns: list[str] | None, pyarrow_predicate: str | None, n_rows: int | None, @@ -145,7 +146,30 @@ def _callback( set_device(device), set_memory_resource(memory_resource), ): - return ir.evaluate(cache={}).to_polars() + return ir.evaluate(cache={}, config=config).to_polars() + + +def validate_config_options(config: dict) -> None: + """ + Validate the configuration options for the GPU engine. + + Parameters + ---------- + config + Configuration options to validate. + + Raises + ------ + ValueError + If the configuration contains unsupported options. + """ + if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}): + raise ValueError( + f"Engine configuration contains unsupported settings: {unsupported}" + ) + assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( + config.get("parquet_options", {}) + ) def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: @@ -174,10 +198,8 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: device = config.device memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) - if unsupported := (config.config.keys() - {"raise_on_fail"}): - raise ValueError( - f"Engine configuration contains unsupported settings {unsupported}" - ) + validate_config_options(config.config) + with nvtx.annotate(message="ConvertIR", domain="cudf_polars"): translator = Translator(nt) ir = translator.translate_ir() @@ -200,5 +222,11 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: raise exception else: nt.set_udf( - partial(_callback, ir, device=device, memory_resource=memory_resource) + partial( + _callback, + ir, + config, + device=device, + memory_resource=memory_resource, + ) ) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 98e8a83b04e..e44a0e0857a 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -37,6 +37,8 @@ from collections.abc import Callable, Hashable, MutableMapping, Sequence from typing import Literal + from polars import GPUEngine + from cudf_polars.typing import Schema @@ -180,7 +182,9 @@ def get_hashable(self) -> Hashable: translation phase should fail earlier. """ - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """ Evaluate the node (recursively) and return a dataframe. @@ -189,6 +193,8 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: cache Mapping from cached node ids to constructed DataFrames. Used to implement evaluation of the `Cache` node. + config + GPU engine configuration. Notes ----- @@ -208,8 +214,9 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: translation phase should fail earlier. """ return self.do_evaluate( + config, *self._non_child_args, - *(child.evaluate(cache=cache) for child in self.children), + *(child.evaluate(cache=cache, config=config) for child in self.children), ) @@ -294,6 +301,9 @@ class Scan(IR): predicate: expr.NamedExpr | None """Mask to apply to the read dataframe.""" + PARQUET_DEFAULT_CHUNK_SIZE: int = 0 # unlimited + PARQUET_DEFAULT_PASS_LIMIT: int = 16 * 1024**3 # 16GiB + def __init__( self, schema: Schema, @@ -339,7 +349,7 @@ def __init__( # TODO: polars has this implemented for parquet, # maybe we can do this too? raise NotImplementedError("slice pushdown for negative slices") - if self.typ == "csv" and self.skip_rows != 0: # pragma: no cover + if self.typ in {"csv"} and self.skip_rows != 0: # pragma: no cover # This comes from slice pushdown, but that # optimization doesn't happen right now raise NotImplementedError("skipping rows in CSV reader") @@ -413,6 +423,7 @@ def get_hashable(self) -> Hashable: @classmethod def do_evaluate( cls, + config: GPUEngine, schema: Schema, typ: str, reader_options: dict[str, Any], @@ -498,25 +509,80 @@ def do_evaluate( colnames[0], ) elif typ == "parquet": - filters = None - if predicate is not None and row_index is None: - # Can't apply filters during read if we have a row index. - filters = to_parquet_filter(predicate.value) - tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(paths), - columns=with_columns, - filters=filters, - nrows=n_rows, - skip_rows=skip_rows, - ) - df = DataFrame.from_table( - tbl_w_meta.tbl, - # TODO: consider nested column names? - tbl_w_meta.column_names(include_children=False), - ) - if filters is not None: - # Mask must have been applied. - return df + parquet_options = config.config.get("parquet_options", {}) + if parquet_options.get("chunked", True): + reader = plc.io.parquet.ChunkedParquetReader( + plc.io.SourceInfo(paths), + columns=with_columns, + # We handle skip_rows != 0 by reading from the + # up to n_rows + skip_rows and slicing off the + # first skip_rows entries. + # TODO: Remove this workaround once + # https://github.com/rapidsai/cudf/issues/16186 + # is fixed + nrows=n_rows + skip_rows, + skip_rows=0, + chunk_read_limit=parquet_options.get( + "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE + ), + pass_read_limit=parquet_options.get( + "pass_read_limit", cls.PARQUET_DEFAULT_PASS_LIMIT + ), + ) + chk = reader.read_chunk() + rows_left_to_skip = skip_rows + + def slice_skip(tbl: plc.Table): + nonlocal rows_left_to_skip + if rows_left_to_skip > 0: + table_rows = tbl.num_rows() + chunk_skip = min(rows_left_to_skip, table_rows) + # TODO: Check performance impact of skipping this + # call and creating an empty table manually when the + # slice would be empty (chunk_skip == table_rows). + (tbl,) = plc.copying.slice(tbl, [chunk_skip, table_rows]) + rows_left_to_skip -= chunk_skip + return tbl + + tbl = slice_skip(chk.tbl) + # TODO: Nested column names + names = chk.column_names(include_children=False) + concatenated_columns = tbl.columns() + while reader.has_next(): + tbl = slice_skip(reader.read_chunk().tbl) + + for i in range(tbl.num_columns()): + concatenated_columns[i] = plc.concatenate.concatenate( + [concatenated_columns[i], tbl._columns[i]] + ) + # Drop residual columns to save memory + tbl._columns[i] = None + + df = DataFrame.from_table( + plc.Table(concatenated_columns), + names=names, + ) + else: + filters = None + if predicate is not None and row_index is None: + # Can't apply filters during read if we have a row index. + filters = to_parquet_filter(predicate.value) + tbl_w_meta = plc.io.parquet.read_parquet( + plc.io.SourceInfo(paths), + columns=with_columns, + filters=filters, + nrows=n_rows, + skip_rows=skip_rows, + ) + df = DataFrame.from_table( + tbl_w_meta.tbl, + # TODO: consider nested column names? + tbl_w_meta.column_names(include_children=False), + ) + if filters is not None: + # Mask must have been applied. + return df + elif typ == "ndjson": json_schema: list[plc.io.json.NameAndType] = [ (name, typ, []) for name, typ in schema.items() @@ -591,14 +657,16 @@ def __init__(self, schema: Schema, key: int, value: IR): @classmethod def do_evaluate( - cls, key: int, df: DataFrame + cls, config: GPUEngine, key: int, df: DataFrame ) -> DataFrame: # pragma: no cover; basic evaluation never calls this """Evaluate and return a dataframe.""" # Our value has already been computed for us, so let's just # return it. return df - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + def evaluate( + self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine + ) -> DataFrame: """Evaluate and return a dataframe.""" # We must override the recursion scheme because we don't want # to recurse if we're in the cache. @@ -606,7 +674,9 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return cache[self.key] except KeyError: (value,) = self.children - return cache.setdefault(self.key, value.evaluate(cache=cache)) + return cache.setdefault( + self.key, value.evaluate(cache=cache, config=config) + ) class DataFrameScan(IR): @@ -652,6 +722,7 @@ def get_hashable(self) -> Hashable: @classmethod def do_evaluate( cls, + config: GPUEngine, schema: Schema, df: Any, projection: tuple[str, ...] | None, @@ -699,6 +770,7 @@ def __init__( @classmethod def do_evaluate( cls, + config: GPUEngine, exprs: tuple[expr.NamedExpr, ...], should_broadcast: bool, # noqa: FBT001 df: DataFrame, @@ -733,7 +805,10 @@ def __init__( @classmethod def do_evaluate( - cls, exprs: tuple[expr.NamedExpr, ...], df: DataFrame + cls, + config: GPUEngine, + exprs: tuple[expr.NamedExpr, ...], + df: DataFrame, ) -> DataFrame: # pragma: no cover; not exposed by polars yet """Evaluate and return a dataframe.""" columns = broadcast(*(e.evaluate(df) for e in exprs)) @@ -824,6 +899,7 @@ def check_agg(agg: expr.Expr) -> int: @classmethod def do_evaluate( cls, + config: GPUEngine, keys_in: Sequence[expr.NamedExpr], agg_requests: Sequence[expr.NamedExpr], maintain_order: bool, # noqa: FBT001 @@ -945,6 +1021,7 @@ def __init__( @classmethod def do_evaluate( cls, + config: GPUEngine, predicate: plc.expressions.Expression, zlice: tuple[int, int] | None, suffix: str, @@ -1117,6 +1194,7 @@ def _reorder_maps( @classmethod def do_evaluate( cls, + config: GPUEngine, left_on_exprs: Sequence[expr.NamedExpr], right_on_exprs: Sequence[expr.NamedExpr], options: tuple[ @@ -1240,6 +1318,7 @@ def __init__( @classmethod def do_evaluate( cls, + config: GPUEngine, exprs: Sequence[expr.NamedExpr], should_broadcast: bool, # noqa: FBT001 df: DataFrame, @@ -1302,6 +1381,7 @@ def __init__( @classmethod def do_evaluate( cls, + config: GPUEngine, keep: plc.stream_compaction.DuplicateKeepOption, subset: frozenset[str] | None, zlice: tuple[int, int] | None, @@ -1391,6 +1471,7 @@ def __init__( @classmethod def do_evaluate( cls, + config: GPUEngine, by: Sequence[expr.NamedExpr], order: Sequence[plc.types.Order], null_order: Sequence[plc.types.NullOrder], @@ -1446,7 +1527,9 @@ def __init__(self, schema: Schema, offset: int, length: int, df: IR): self.children = (df,) @classmethod - def do_evaluate(cls, offset: int, length: int, df: DataFrame) -> DataFrame: + def do_evaluate( + cls, config: GPUEngine, offset: int, length: int, df: DataFrame + ) -> DataFrame: """Evaluate and return a dataframe.""" return df.slice((offset, length)) @@ -1466,7 +1549,9 @@ def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR): self.children = (df,) @classmethod - def do_evaluate(cls, mask_expr: expr.NamedExpr, df: DataFrame) -> DataFrame: + def do_evaluate( + cls, config: GPUEngine, mask_expr: expr.NamedExpr, df: DataFrame + ) -> DataFrame: """Evaluate and return a dataframe.""" (mask,) = broadcast(mask_expr.evaluate(df), target_length=df.num_rows) return df.filter(mask) @@ -1484,7 +1569,7 @@ def __init__(self, schema: Schema, df: IR): self.children = (df,) @classmethod - def do_evaluate(cls, schema: Schema, df: DataFrame) -> DataFrame: + def do_evaluate(cls, config: GPUEngine, schema: Schema, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" # This can reorder things. columns = broadcast( @@ -1560,7 +1645,9 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): self._non_child_args = (name, self.options) @classmethod - def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: + def do_evaluate( + cls, config: GPUEngine, name: str, options: Any, df: DataFrame + ) -> DataFrame: """Evaluate and return a dataframe.""" if name == "rechunk": # No-op in our data model @@ -1639,7 +1726,9 @@ def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR) raise NotImplementedError("Schema mismatch") @classmethod - def do_evaluate(cls, zlice: tuple[int, int] | None, *dfs: DataFrame) -> DataFrame: + def do_evaluate( + cls, config: GPUEngine, zlice: tuple[int, int] | None, *dfs: DataFrame + ) -> DataFrame: """Evaluate and return a dataframe.""" # TODO: only evaluate what we need if we have a slice? return DataFrame.from_table( @@ -1688,7 +1777,7 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: ) @classmethod - def do_evaluate(cls, *dfs: DataFrame) -> DataFrame: + def do_evaluate(cls, config: GPUEngine, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" max_rows = max(df.num_rows for df in dfs) # Horizontal concatenation extends shorter tables with nulls diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index 2207545aa60..1821cfedfb8 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -23,6 +23,7 @@ def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, + engine: GPUEngine | None = None, collect_kwargs: dict[OptimizationArgs, bool] | None = None, polars_collect_kwargs: dict[OptimizationArgs, bool] | None = None, cudf_collect_kwargs: dict[OptimizationArgs, bool] | None = None, @@ -41,6 +42,8 @@ def assert_gpu_result_equal( ---------- lazydf frame to collect. + engine + Custom GPU engine configuration. collect_kwargs Common keyword arguments to pass to collect for both polars CPU and cudf-polars. @@ -76,12 +79,14 @@ def assert_gpu_result_equal( NotImplementedError If GPU collection failed in some way. """ + if engine is None: + engine = GPUEngine(raise_on_fail=True) + final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs( collect_kwargs, polars_collect_kwargs, cudf_collect_kwargs ) expect = lazydf.collect(**final_polars_collect_kwargs) - engine = GPUEngine(raise_on_fail=True) got = lazydf.collect(**final_cudf_collect_kwargs, engine=engine) assert_frame_equal( expect, diff --git a/python/cudf_polars/cudf_polars/testing/plugin.py b/python/cudf_polars/cudf_polars/testing/plugin.py index 080a1af6e19..2a9104d8c82 100644 --- a/python/cudf_polars/cudf_polars/testing/plugin.py +++ b/python/cudf_polars/cudf_polars/testing/plugin.py @@ -64,6 +64,7 @@ def pytest_configure(config: pytest.Config) -> None: "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception", + "tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311", "tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394", diff --git a/python/cudf_polars/tests/dsl/test_to_ast.py b/python/cudf_polars/tests/dsl/test_to_ast.py index f6c24da0180..795ba991c62 100644 --- a/python/cudf_polars/tests/dsl/test_to_ast.py +++ b/python/cudf_polars/tests/dsl/test_to_ast.py @@ -63,7 +63,7 @@ def test_compute_column(expr, df): ir = Translator(q._ldf.visit()).translate_ir() assert isinstance(ir, ir_nodes.Select) - table = ir.children[0].evaluate(cache={}) + table = ir.children[0].evaluate(cache={}, config=pl.GPUEngine()) name_to_index = {c.name: i for i, c in enumerate(table.columns)} def compute_column(e): diff --git a/python/cudf_polars/tests/dsl/test_traversal.py b/python/cudf_polars/tests/dsl/test_traversal.py index 8958c2a0f84..8849629e0fd 100644 --- a/python/cudf_polars/tests/dsl/test_traversal.py +++ b/python/cudf_polars/tests/dsl/test_traversal.py @@ -124,7 +124,7 @@ def replace_df(node, rec): new = mapper(orig) - result = new.evaluate(cache={}).to_polars() + result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars() expect = pl.DataFrame({"a": [2, 1], "b": [-4, -3]}) @@ -153,7 +153,7 @@ def replace_scan(node, rec): orig = Translator(q._ldf.visit()).translate_ir() new = mapper(orig) - result = new.evaluate(cache={}).to_polars() + result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars() expect = q.collect() @@ -224,6 +224,6 @@ def _(node: ir.Select, fn: IRTransformer): new_ir = rewriter(qir) - got = new_ir.evaluate(cache={}).to_polars() + got = new_ir.evaluate(cache={}, config=pl.GPUEngine()).to_polars() assert_frame_equal(expect, got) diff --git a/python/cudf_polars/tests/expressions/test_sort.py b/python/cudf_polars/tests/expressions/test_sort.py index 6170281ad54..49e075e0338 100644 --- a/python/cudf_polars/tests/expressions/test_sort.py +++ b/python/cudf_polars/tests/expressions/test_sort.py @@ -68,7 +68,11 @@ def test_setsorted(descending, nulls_last, with_nulls): assert_gpu_result_equal(q) - df = Translator(q._ldf.visit()).translate_ir().evaluate(cache={}) + df = ( + Translator(q._ldf.visit()) + .translate_ir() + .evaluate(cache={}, config=pl.GPUEngine()) + ) a = df.column_map["a"] diff --git a/python/cudf_polars/tests/test_parquet_filters.py b/python/cudf_polars/tests/test_parquet_filters.py index 545a89250fc..5c5f927e4f4 100644 --- a/python/cudf_polars/tests/test_parquet_filters.py +++ b/python/cudf_polars/tests/test_parquet_filters.py @@ -5,7 +5,8 @@ import pytest import polars as pl -from polars.testing import assert_frame_equal + +from cudf_polars.testing.asserts import assert_gpu_result_equal @pytest.fixture(scope="module") @@ -50,11 +51,9 @@ def pq_file(tmp_path_factory, df): ], ) @pytest.mark.parametrize("selection", [["c", "b"], ["a"], ["a", "c"], ["b"], "c"]) -def test_scan_by_hand(expr, selection, pq_file): - df = pq_file.collect() +@pytest.mark.parametrize("chunked", [False, True], ids=["unchunked", "chunked"]) +def test_scan_by_hand(expr, selection, pq_file, chunked): q = pq_file.filter(expr).select(*selection) - # Not using assert_gpu_result_equal because - # https://github.com/pola-rs/polars/issues/19238 - got = q.collect(engine=pl.GPUEngine(raise_on_fail=True)) - expect = df.filter(expr).select(*selection) - assert_frame_equal(got, expect) + assert_gpu_result_equal( + q, engine=pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": chunked}) + ) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 792b136acd8..61925b21a97 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -13,18 +13,20 @@ assert_ir_translation_raises, ) +NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) + @pytest.fixture( params=[(None, None), ("row-index", 0), ("index", 10)], - ids=["no-row-index", "zero-offset-row-index", "offset-row-index"], + ids=["no_row_index", "zero_offset_row_index", "offset_row_index"], ) def row_index(request): return request.param @pytest.fixture( - params=[None, 2, 3], - ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], + params=[None, 3], + ids=["all_rows", "some_rows"], ) def n_rows(request): return request.param @@ -51,21 +53,15 @@ def columns(request, row_index): @pytest.fixture( - params=[None, pl.col("c").is_not_null()], ids=["no-mask", "c-is-not-null"] + params=[None, pl.col("c").is_not_null()], ids=["no_mask", "c_is_not_null"] ) def mask(request): return request.param @pytest.fixture( - params=[ - None, - (1, 1), - ], - ids=[ - "no-slice", - "slice-second", - ], + params=[None, (1, 1)], + ids=["no_slice", "slice_second"], ) def slice(request): # For use in testing that we handle @@ -92,12 +88,16 @@ def make_source(df, path, format): ("csv", pl.scan_csv), ("ndjson", pl.scan_ndjson), ("parquet", pl.scan_parquet), + ("chunked_parquet", pl.scan_parquet), ], ) def test_scan( tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request ): name, offset = row_index + is_chunked = format == "chunked_parquet" + if is_chunked: + format = "parquet" make_source(df, tmp_path / "file", format) request.applymarker( pytest.mark.xfail( @@ -111,13 +111,30 @@ def test_scan( row_index_offset=offset, n_rows=n_rows, ) + engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) + if ( + is_chunked + and (columns is None or columns[0] != "a") + and ( + # When we mask with the slice, it happens to remove the + # bad row + (mask is None and slice is not None) + # When we both slice and read a subset of rows it also + # removes the bad row + or (slice is None and n_rows is not None) + ) + ): + # slice read produces wrong result for string column + request.applymarker( + pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") + ) if slice is not None: q = q.slice(*slice) if mask is not None: q = q.filter(mask) if columns is not None: q = q.select(*columns) - assert_gpu_result_equal(q) + assert_gpu_result_equal(q, engine=engine) def test_negative_slice_pushdown_raises(tmp_path): @@ -153,7 +170,7 @@ def test_scan_row_index_projected_out(tmp_path): q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a")) - assert_gpu_result_equal(q) + assert_gpu_result_equal(q, engine=NO_CHUNK_ENGINE) def test_scan_csv_column_renames_projection_schema(tmp_path): @@ -323,6 +340,63 @@ def test_scan_parquet_only_row_index_raises(df, tmp_path): assert_ir_translation_raises(q, NotImplementedError) +@pytest.fixture( + scope="module", params=["no_slice", "skip_to_end", "skip_partial", "partial"] +) +def chunked_slice(request): + return request.param + + +@pytest.fixture(scope="module") +def large_df(df, tmpdir_factory, chunked_slice): + # Something big enough that we get more than a single chunk, + # empirically determined + df = pl.concat([df] * 1000) + df = pl.concat([df] * 10) + df = pl.concat([df] * 10) + path = str(tmpdir_factory.mktemp("data") / "large.pq") + make_source(df, path, "parquet") + n_rows = len(df) + q = pl.scan_parquet(path) + if chunked_slice == "no_slice": + return q + elif chunked_slice == "skip_to_end": + return q.slice(int(n_rows * 0.6), n_rows) + elif chunked_slice == "skip_partial": + return q.slice(int(n_rows * 0.6), int(n_rows * 0.2)) + else: + return q.slice(0, int(n_rows * 0.6)) + + +@pytest.mark.parametrize( + "chunk_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"chunk_{x}" +) +@pytest.mark.parametrize( + "pass_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"pass_{x}" +) +def test_scan_parquet_chunked( + request, chunked_slice, large_df, chunk_read_limit, pass_read_limit +): + if chunked_slice in {"skip_partial", "partial"} and ( + chunk_read_limit == 0 and pass_read_limit != 0 + ): + request.applymarker( + pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") + ) + + assert_gpu_result_equal( + large_df, + engine=pl.GPUEngine( + raise_on_fail=True, + parquet_options={ + "chunked": True, + "chunk_read_limit": chunk_read_limit, + "pass_read_limit": pass_read_limit, + }, + ), + ) + + def test_scan_hf_url_raises(): q = pl.scan_csv("hf://datasets/scikit-learn/iris/Iris.csv") assert_ir_translation_raises(q, NotImplementedError)