diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 9ed2929a70e..ab40a5b9a55 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -149,10 +149,21 @@ inline __device__ bool is_bounds_page(page_state_s* const s, size_t const begin = start_row; size_t const end = start_row + num_rows; - // for non-nested schemas, rows cannot span pages, so use a more restrictive test - return has_repetition - ? ((page_begin <= begin && page_end >= begin) || (page_begin <= end && page_end >= end)) - : ((page_begin < begin && page_end > begin) || (page_begin < end && page_end > end)); + // Test for list schemas. + auto const is_bounds_page_lists = + ((page_begin <= begin and page_end >= begin) or (page_begin <= end and page_end >= end)); + + // For non-list schemas, rows cannot span pages, so use a more restrictive test. Make sure to + // relax the test for `page_end` if we adjusted the `num_rows` for the last page to compensate + // for list row size estimates in `generate_list_column_row_count_estimates()` when chunked + // read mode. + auto const test_page_end_nonlists = + s->page.is_num_rows_adjusted ? page_end >= end : page_end > end; + + auto const is_bounds_page_nonlists = + (page_begin < begin and page_end > begin) or (page_begin < end and test_page_end_nonlists); + + return has_repetition ? is_bounds_page_lists : is_bounds_page_nonlists; } /** diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index a8a8c441a84..6aec4ce0ec2 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -433,6 +433,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, // definition levels bs->page.chunk_row = 0; bs->page.num_rows = 0; + bs->page.is_num_rows_adjusted = false; bs->page.skipped_values = -1; bs->page.skipped_leaf_values = 0; bs->page.str_bytes = 0; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 3b4d0e6dc80..ce9d48693ec 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -310,8 +310,10 @@ struct PageInfo { // - In the case of a nested schema, you have to decode the repetition and definition // levels to extract actual column values int32_t num_input_values; - int32_t chunk_row; // starting row of this page relative to the start of the chunk - int32_t num_rows; // number of rows in this page + int32_t chunk_row; // starting row of this page relative to the start of the chunk + int32_t num_rows; // number of rows in this page + bool is_num_rows_adjusted; // Flag to indicate if the number of rows of this page have been + // adjusted to compensate for the list row size estimates. // the next four are calculated in gpuComputePageStringSizes int32_t num_nulls; // number of null values (V2 header), but recalculated for string cols int32_t num_valids; // number of non-null values, taking into account skip_rows/num_rows diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index f03f1214b9a..bcdae4cbd3b 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -729,7 +729,10 @@ struct set_final_row_count { if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; } size_t const page_start_row = chunk.start_row + page.chunk_row; size_t const chunk_last_row = chunk.start_row + chunk.num_rows; - page.num_rows = chunk_last_row - page_start_row; + // Mark `is_num_rows_adjusted` to signal string decoders that the `num_rows` of this page has + // been adjusted. + page.is_num_rows_adjusted = page.num_rows != (chunk_last_row - page_start_row); + page.num_rows = chunk_last_row - page_start_row; } }; diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 96512dacb69..659d2ebd89a 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3771,10 +3771,10 @@ def test_parquet_chunked_reader( chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups ): df = pd.DataFrame( - {"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000} + {"a": [1, 2, 3, None] * 10000, "b": ["av", "qw", None, "xyz"] * 10000} ) buffer = BytesIO() - df.to_parquet(buffer) + df.to_parquet(buffer, row_group_size=10000) actual = read_parquet_chunked( [buffer], chunk_read_limit=chunk_read_limit, @@ -3788,6 +3788,108 @@ def test_parquet_chunked_reader( assert_eq(expected, actual) +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("num_rows", [997, 2997, None]) +def test_parquet_chunked_reader_structs( + chunk_read_limit, + pass_read_limit, + num_rows, +): + data = [ + { + "a": "g", + "b": { + "b_a": 10, + "b_b": {"b_b_b": None, "b_b_a": 2}, + }, + "c": None, + }, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]}, + {"a": "j", "b": None, "c": [8, 10]}, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": None}, + None, + { + "a": None, + "b": {"b_a": None, "b_b": {"b_b_b": 1}}, + "c": [18, 19], + }, + {"a": None, "b": None, "c": None}, + ] * 1000 + + pa_struct = pa.Table.from_pydict({"struct": data}) + df = cudf.DataFrame.from_arrow(pa_struct) + buffer = BytesIO() + df.to_parquet(buffer) + + # Number of rows to read + nrows = num_rows if num_rows is not None else len(df) + + actual = read_parquet_chunked( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + ) + expected = cudf.read_parquet( + buffer, + nrows=nrows, + ) + assert_eq(expected, actual) + + +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("num_rows", [4997, 9997, None]) +@pytest.mark.parametrize( + "str_encoding", + [ + "PLAIN", + "DELTA_BYTE_ARRAY", + "DELTA_LENGTH_BYTE_ARRAY", + ], +) +def test_parquet_chunked_reader_string_decoders( + chunk_read_limit, + pass_read_limit, + num_rows, + str_encoding, +): + df = pd.DataFrame( + { + "i64": [1, 2, 3, None] * 10000, + "str": ["av", "qw", "asd", "xyz"] * 10000, + "list": list( + [["ad", "cd"], ["asd", "fd"], None, ["asd", None]] * 10000 + ), + } + ) + buffer = BytesIO() + # Write 4 Parquet row groups with string column encoded + df.to_parquet( + buffer, + row_group_size=10000, + use_dictionary=False, + column_encoding={"str": str_encoding}, + ) + + # Number of rows to read + nrows = num_rows if num_rows is not None else len(df) + + # Check with num_rows specified + actual = read_parquet_chunked( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + ) + expected = cudf.read_parquet( + buffer, + nrows=nrows, + ) + assert_eq(expected, actual) + + @pytest.mark.parametrize( "nrows,skip_rows", [ diff --git a/python/cudf_polars/cudf_polars/testing/plugin.py b/python/cudf_polars/cudf_polars/testing/plugin.py index 2a9104d8c82..080a1af6e19 100644 --- a/python/cudf_polars/cudf_polars/testing/plugin.py +++ b/python/cudf_polars/cudf_polars/testing/plugin.py @@ -64,7 +64,6 @@ def pytest_configure(config: pytest.Config) -> None: "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception", - "tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311", "tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394", diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 61925b21a97..9c58a24c065 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -112,22 +112,7 @@ def test_scan( n_rows=n_rows, ) engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) - if ( - is_chunked - and (columns is None or columns[0] != "a") - and ( - # When we mask with the slice, it happens to remove the - # bad row - (mask is None and slice is not None) - # When we both slice and read a subset of rows it also - # removes the bad row - or (slice is None and n_rows is not None) - ) - ): - # slice read produces wrong result for string column - request.applymarker( - pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") - ) + if slice is not None: q = q.slice(*slice) if mask is not None: @@ -377,13 +362,6 @@ def large_df(df, tmpdir_factory, chunked_slice): def test_scan_parquet_chunked( request, chunked_slice, large_df, chunk_read_limit, pass_read_limit ): - if chunked_slice in {"skip_partial", "partial"} and ( - chunk_read_limit == 0 and pass_read_limit != 0 - ): - request.applymarker( - pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") - ) - assert_gpu_result_equal( large_df, engine=pl.GPUEngine(