From 03ac8455557838b57255d619bb792da0149e53c8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 18 Nov 2024 12:39:05 -0500 Subject: [PATCH 1/7] Reading multi-source compressed JSONL files (#17161) Fixes #17068 Fixes #12299 This PR introduces a new datasource for compressed inputs which enables batching and byte range reading of multi-source JSONL files using the reallocate-and-retry policy. Moreover. instead of using a 4:1 compression ratio heuristic, the device buffer size is estimated accurately for GZIP, ZIP, and SNAPPY compression types. For remaining types, the files are first decompressed then batched. ~~TODO: Reuse existing JSON tests but with an additional compression parameter to verify correctness.~~ ~~Handled by #17219, which implements compressed JSON writer required for the above test.~~ Multi-source compressed input tests added! Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Kyle Edwards (https://github.com/KyleFromNVIDIA) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/17161 --- cpp/CMakeLists.txt | 1 + cpp/src/io/comp/comp.cpp | 119 +++++++++ cpp/src/io/comp/comp.hpp | 43 ++++ cpp/src/io/comp/gpuinflate.cu | 8 +- cpp/src/io/comp/io_uncomp.hpp | 34 ++- cpp/src/io/comp/uncomp.cpp | 315 +++++++++++++---------- cpp/src/io/json/read_json.cu | 306 +++++++++++++--------- cpp/src/io/json/read_json.hpp | 10 +- cpp/src/io/orc/orc.cpp | 2 +- cpp/tests/io/json/json_chunked_reader.cu | 131 ++++++++-- cpp/tests/io/json/json_test.cpp | 58 +++++ cpp/tests/io/json/json_utils.cuh | 10 +- cpp/tests/large_strings/json_tests.cu | 47 +++- 13 files changed, 771 insertions(+), 313 deletions(-) create mode 100644 cpp/src/io/comp/comp.cpp create mode 100644 cpp/src/io/comp/comp.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6752ce12d83..506f6c185f5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -464,6 +464,7 @@ add_library( src/io/avro/avro_gpu.cu src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp + src/io/comp/comp.cpp src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp new file mode 100644 index 00000000000..2176dbb2373 --- /dev/null +++ b/cpp/src/io/comp/comp.cpp @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "comp.hpp" + +#include "io/utilities/hostdevice_vector.hpp" +#include "nvcomp_adapter.hpp" + +#include +#include +#include +#include +#include +#include + +#include // compress + +namespace cudf::io::detail { + +namespace { + +/** + * @brief GZIP host compressor (includes header) + */ +std::vector compress_gzip(host_span src) +{ + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + zs.avail_in = src.size(); + zs.next_in = reinterpret_cast(const_cast(src.data())); + + std::vector dst; + zs.avail_out = 0; + zs.next_out = nullptr; + + int windowbits = 15; + int gzip_encoding = 16; + int ret = deflateInit2( + &zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY); + CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed."); + + uint32_t estcomplen = deflateBound(&zs, src.size()); + dst.resize(estcomplen); + zs.avail_out = estcomplen; + zs.next_out = dst.data(); + + ret = deflate(&zs, Z_FINISH); + CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!"); + dst.resize(std::distance(dst.data(), zs.next_out)); + + ret = deflateEnd(&zs); + CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation"); + + return dst; +} + +/** + * @brief SNAPPY device compressor + */ +std::vector compress_snappy(host_span src, + rmm::cuda_stream_view stream) +{ + auto const d_src = + cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref()); + rmm::device_uvector d_dst(src.size(), stream); + + cudf::detail::hostdevice_vector> inputs(1, stream); + inputs[0] = d_src; + inputs.host_to_device_async(stream); + + cudf::detail::hostdevice_vector> outputs(1, stream); + outputs[0] = d_dst; + outputs.host_to_device_async(stream); + + cudf::detail::hostdevice_vector hd_status(1, stream); + hd_status[0] = {}; + hd_status.host_to_device_async(stream); + + nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream); + + stream.synchronize(); + hd_status.device_to_host_sync(stream); + CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, + "snappy compression failed"); + std::vector dst(d_dst.size()); + cudf::detail::cuda_memcpy(host_span{dst}, device_span{d_dst}, stream); + return dst; +} + +} // namespace + +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + switch (compression) { + case compression_type::GZIP: return compress_gzip(src); + case compression_type::SNAPPY: return compress_snappy(src, stream); + default: CUDF_FAIL("Unsupported compression type"); + } +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp new file mode 100644 index 00000000000..652abbbeda6 --- /dev/null +++ b/cpp/src/io/comp/comp.hpp @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace CUDF_EXPORT cudf { +namespace io::detail { + +/** + * @brief Compresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Decompressed host buffer + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Vector containing the Compressed output + */ +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream); + +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index fff1cf0c96a..090ea1430b5 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -980,27 +980,27 @@ __device__ int parse_gzip_header(uint8_t const* src, size_t src_size) { uint8_t flags = src[3]; hdr_len = 10; - if (flags & GZIPHeaderFlag::fextra) // Extra fields present + if (flags & detail::GZIPHeaderFlag::fextra) // Extra fields present { int xlen = src[hdr_len] | (src[hdr_len + 1] << 8); hdr_len += xlen; if (hdr_len >= src_size) return -1; } - if (flags & GZIPHeaderFlag::fname) // Original file name present + if (flags & detail::GZIPHeaderFlag::fname) // Original file name present { // Skip zero-terminated string do { if (hdr_len >= src_size) return -1; } while (src[hdr_len++] != 0); } - if (flags & GZIPHeaderFlag::fcomment) // Comment present + if (flags & detail::GZIPHeaderFlag::fcomment) // Comment present { // Skip zero-terminated string do { if (hdr_len >= src_size) return -1; } while (src[hdr_len++] != 0); } - if (flags & GZIPHeaderFlag::fhcrc) // Header CRC present + if (flags & detail::GZIPHeaderFlag::fhcrc) // Header CRC present { hdr_len += 2; } diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index 1c9578fa5c0..ca722a9b7ee 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ using cudf::host_span; -namespace cudf { -namespace io { +namespace CUDF_EXPORT cudf { +namespace io::detail { /** * @brief Decompresses a system memory buffer. @@ -36,13 +36,35 @@ namespace io { * * @return Vector containing the Decompressed output */ -std::vector decompress(compression_type compression, host_span src); +[[nodiscard]] std::vector decompress(compression_type compression, + host_span src); +/** + * @brief Decompresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * @param dst Destination host span to place decompressed buffer + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Size of decompressed output + */ size_t decompress(compression_type compression, host_span src, host_span dst, rmm::cuda_stream_view stream); +/** + * @brief Without actually decompressing the compressed input buffer passed, return the size of + * decompressed output. If the decompressed size cannot be extracted apriori, return zero. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * + * @return Size of decompressed output + */ +size_t get_uncompressed_size(compression_type compression, host_span src); + /** * @brief GZIP header flags * See https://tools.ietf.org/html/rfc1952 @@ -55,5 +77,5 @@ constexpr uint8_t fname = 0x08; // Original file name present constexpr uint8_t fcomment = 0x10; // Comment present }; // namespace GZIPHeaderFlag -} // namespace io -} // namespace cudf +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index fb8c308065d..b3d43fa786a 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -28,13 +28,12 @@ #include // memset -using cudf::host_span; - -namespace cudf { -namespace io { +namespace cudf::io::detail { #pragma pack(push, 1) +namespace { + struct gz_file_header_s { uint8_t id1; // 0x1f uint8_t id2; // 0x8b @@ -261,7 +260,7 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz strm.avail_out = dst.size(); strm.total_out = 0; auto zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers - CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream"); + CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream: inflateInit2 failed"); do { if (strm.avail_out == 0) { dst.resize(strm.total_out + (1 << 30)); @@ -273,125 +272,7 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz strm.total_out == dst.size()); dst.resize(strm.total_out); inflateEnd(&strm); - CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); -} - -std::vector decompress(compression_type compression, host_span src) -{ - CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - - auto raw = src.data(); - uint8_t const* comp_data = nullptr; - size_t comp_len = 0; - size_t uncomp_len = 0; - - switch (compression) { - case compression_type::AUTO: - case compression_type::GZIP: { - gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src.size())) { - compression = compression_type::GZIP; - comp_data = gz.comp_data; - comp_len = gz.comp_len; - uncomp_len = gz.isize; - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::ZIP: { - zip_archive_s za; - if (OpenZipArchive(&za, raw, src.size())) { - size_t cdfh_ofs = 0; - for (int i = 0; i < za.eocd->num_entries; i++) { - auto const* cdfh = reinterpret_cast( - reinterpret_cast(za.cdfh) + cdfh_ofs); - int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; - if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { - // Bad cdir - break; - } - // For now, only accept with non-zero file sizes and DEFLATE - if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { - size_t lfh_ofs = cdfh->hdr_ofs; - auto const* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { - if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { - size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; - size_t file_end = file_start + lfh->comp_size; - if (file_end <= src.size()) { - // Pick the first valid file of non-zero size (only 1 file expected in archive) - compression = compression_type::ZIP; - comp_data = raw + file_start; - comp_len = lfh->comp_size; - uncomp_len = lfh->uncomp_size; - break; - } - } - } - } - cdfh_ofs += cdfh_len; - } - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - case compression_type::BZIP2: - if (src.size() > 4) { - auto const* fhdr = reinterpret_cast(raw); - // Check for BZIP2 file signature "BZh1" to "BZh9" - if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && - fhdr->blksz >= '1' && fhdr->blksz <= '9') { - compression = compression_type::BZIP2; - comp_data = raw; - comp_len = src.size(); - uncomp_len = 0; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - default: CUDF_FAIL("Unsupported compressed stream type"); - } - - CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); - - if (uncomp_len <= 0) { - uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume - // ~4:1 compression for initial size - } - - if (compression == compression_type::GZIP || compression == compression_type::ZIP) { - // INFLATE - std::vector dst(uncomp_len); - cpu_inflate_vector(dst, comp_data, comp_len); - return dst; - } - if (compression == compression_type::BZIP2) { - size_t src_ofs = 0; - size_t dst_ofs = 0; - int bz_err = 0; - std::vector dst(uncomp_len); - do { - size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); - if (bz_err == BZ_OUTBUFF_FULL) { - // TBD: We could infer the compression ratio based on produced/consumed byte counts - // in order to minimize realloc events and over-allocation - dst_ofs = dst_len; - dst_len = uncomp_len + (uncomp_len / 2); - dst.resize(dst_len); - uncomp_len = dst_len; - } else if (bz_err == 0) { - uncomp_len = dst_len; - dst.resize(uncomp_len); - } - } while (bz_err == BZ_OUTBUFF_FULL); - CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); - return dst; - } - - CUDF_FAIL("Unsupported compressed stream type"); + CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream: Z_STREAM_END not encountered"); } /** @@ -536,14 +417,130 @@ size_t decompress_zstd(host_span src, CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` - cudf::detail::cuda_memcpy_async( - dst.subspan(0, hd_stats[0].bytes_written), - device_span{d_dst.data(), hd_stats[0].bytes_written}, - stream); + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); return hd_stats[0].bytes_written; } +struct source_properties { + compression_type compression = compression_type::NONE; + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; +}; + +source_properties get_source_properties(compression_type compression, host_span src) +{ + auto raw = src.data(); + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; + + switch (compression) { + case compression_type::AUTO: + case compression_type::GZIP: { + gz_archive_s gz; + auto const parse_succeeded = ParseGZArchive(&gz, src.data(), src.size()); + CUDF_EXPECTS(parse_succeeded, "Failed to parse GZIP header while fetching source properties"); + compression = compression_type::GZIP; + comp_data = gz.comp_data; + comp_len = gz.comp_len; + uncomp_len = gz.isize; + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::ZIP: { + zip_archive_s za; + if (OpenZipArchive(&za, raw, src.size())) { + size_t cdfh_ofs = 0; + for (int i = 0; i < za.eocd->num_entries; i++) { + auto const* cdfh = reinterpret_cast( + reinterpret_cast(za.cdfh) + cdfh_ofs); + int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; + if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { + // Bad cdir + break; + } + // For now, only accept with non-zero file sizes and DEFLATE + if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { + size_t lfh_ofs = cdfh->hdr_ofs; + auto const* lfh = reinterpret_cast(raw + lfh_ofs); + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { + if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { + size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; + size_t file_end = file_start + lfh->comp_size; + if (file_end <= src.size()) { + // Pick the first valid file of non-zero size (only 1 file expected in archive) + compression = compression_type::ZIP; + comp_data = raw + file_start; + comp_len = lfh->comp_size; + uncomp_len = lfh->uncomp_size; + break; + } + } + } + } + cdfh_ofs += cdfh_len; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::BZIP2: { + if (src.size() > 4) { + auto const* fhdr = reinterpret_cast(raw); + // Check for BZIP2 file signature "BZh1" to "BZh9" + if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && + fhdr->blksz >= '1' && fhdr->blksz <= '9') { + compression = compression_type::BZIP2; + comp_data = raw; + comp_len = src.size(); + uncomp_len = 0; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::SNAPPY: { + uncomp_len = 0; + auto cur = src.begin(); + auto const end = src.end(); + // Read uncompressed length (varint) + { + uint32_t l = 0, c; + do { + c = *cur++; + auto const lo7 = c & 0x7f; + if (l >= 28 && c > 0xf) { + uncomp_len = 0; + break; + } + uncomp_len |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + CUDF_EXPECTS(uncomp_len != 0 and cur < end, "Error in retrieving SNAPPY source properties"); + } + comp_data = raw; + comp_len = src.size(); + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + default: CUDF_FAIL("Unsupported compressed stream type"); + } + + return source_properties{compression, comp_data, comp_len, uncomp_len}; +} + +} // namespace + +size_t get_uncompressed_size(compression_type compression, host_span src) +{ + return get_source_properties(compression, src).uncomp_len; +} + size_t decompress(compression_type compression, host_span src, host_span dst, @@ -558,5 +555,63 @@ size_t decompress(compression_type compression, } } -} // namespace io -} // namespace cudf +std::vector decompress(compression_type compression, host_span src) +{ + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto srcprops = get_source_properties(compression, src); + CUDF_EXPECTS(srcprops.comp_data != nullptr and srcprops.comp_len > 0, + "Unsupported compressed stream type"); + + if (srcprops.uncomp_len <= 0) { + srcprops.uncomp_len = + srcprops.comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume + // ~4:1 compression for initial size + } + + if (compression == compression_type::GZIP) { + // INFLATE + std::vector dst(srcprops.uncomp_len); + decompress_gzip(src, dst); + return dst; + } + if (compression == compression_type::ZIP) { + std::vector dst(srcprops.uncomp_len); + cpu_inflate_vector(dst, srcprops.comp_data, srcprops.comp_len); + return dst; + } + if (compression == compression_type::BZIP2) { + size_t src_ofs = 0; + size_t dst_ofs = 0; + int bz_err = 0; + std::vector dst(srcprops.uncomp_len); + do { + size_t dst_len = srcprops.uncomp_len - dst_ofs; + bz_err = cpu_bz2_uncompress( + srcprops.comp_data, srcprops.comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); + if (bz_err == BZ_OUTBUFF_FULL) { + // TBD: We could infer the compression ratio based on produced/consumed byte counts + // in order to minimize realloc events and over-allocation + dst_ofs = dst_len; + dst_len = srcprops.uncomp_len + (srcprops.uncomp_len / 2); + dst.resize(dst_len); + srcprops.uncomp_len = dst_len; + } else if (bz_err == 0) { + srcprops.uncomp_len = dst_len; + dst.resize(srcprops.uncomp_len); + } + } while (bz_err == BZ_OUTBUFF_FULL); + CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); + return dst; + } + if (compression == compression_type::SNAPPY) { + std::vector dst(srcprops.uncomp_len); + decompress_snappy(src, dst); + return dst; + } + + CUDF_FAIL("Unsupported compressed stream type"); +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 279f5e71351..82d8152ca1c 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,70 @@ namespace cudf::io::json::detail { namespace { +class compressed_host_buffer_source final : public datasource { + public: + explicit compressed_host_buffer_source(std::unique_ptr const& src, + compression_type comptype) + : _comptype{comptype}, _dbuf_ptr{src->host_read(0, src->size())} + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || + comptype == compression_type::SNAPPY) { + _decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer); + } else { + _decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer); + _decompressed_ch_buffer_size = _decompressed_buffer.size(); + } + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = cudf::io::detail::decompress(_comptype, ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) { + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + std::memcpy(dst, _decompressed_buffer.data() + offset, count); + return count; + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = cudf::io::detail::decompress(_comptype, ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) + return std::make_unique>>( + std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + return std::make_unique(_decompressed_buffer.data() + offset, count); + } + + [[nodiscard]] bool supports_device_read() const override { return false; } + + [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } + + private: + std::unique_ptr _dbuf_ptr; + compression_type _comptype; + size_t _decompressed_ch_buffer_size; + std::vector _decompressed_buffer; +}; + // Return total size of sources enclosing the passed range std::size_t sources_size(host_span> const sources, std::size_t range_offset, @@ -126,13 +191,12 @@ datasource::owning_buffer get_record_range_raw_input( { CUDF_FUNC_RANGE(); - std::size_t const total_source_size = sources_size(sources, 0, 0); - auto constexpr num_delimiter_chars = 1; - auto const delimiter = reader_opts.get_delimiter(); - auto const num_extra_delimiters = num_delimiter_chars * sources.size(); - compression_type const reader_compression = reader_opts.get_compression(); - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); + std::size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); + auto const num_extra_delimiters = num_delimiter_chars * sources.size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -143,22 +207,16 @@ datasource::owning_buffer get_record_range_raw_input( int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced; std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - // The allocation for single source compressed input is estimated by assuming a ~4:1 - // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea - // of subchunks. - auto constexpr header_size = 4096; std::size_t buffer_size = - reader_compression != compression_type::NONE - ? total_source_size * estimated_compression_ratio + header_size - : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; rmm::device_buffer buffer(buffer_size, stream); device_span bufspan(reinterpret_cast(buffer.data()), buffer.size()); // Offset within buffer indicating first read position std::int64_t buffer_offset = 0; - auto readbufspan = ingest_raw_input( - bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream); + auto readbufspan = + ingest_raw_input(bufspan, sources, chunk_offset, chunk_size, delimiter, stream); auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = @@ -179,7 +237,6 @@ datasource::owning_buffer get_record_range_raw_input( buffer_offset += readbufspan.size(); readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), sources, - reader_compression, next_subchunk_start, size_per_subchunk, delimiter, @@ -196,11 +253,9 @@ datasource::owning_buffer get_record_range_raw_input( // Our buffer_size estimate is insufficient to read until the end of the line! We need to // allocate more memory and try again! num_subchunks_prealloced *= 2; - buffer_size = reader_compression != compression_type::NONE - ? 2 * buffer_size - : std::min(total_source_size, - buffer_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + buffer_size = std::min(total_source_size, + buffer_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; buffer.resize(buffer_size, stream); bufspan = device_span(reinterpret_cast(buffer.data()), buffer.size()); } @@ -258,111 +313,11 @@ table_with_metadata read_batch(host_span> sources, return device_parse_nested_json(buffer, reader_opts, stream, mr); } -} // anonymous namespace - -device_span ingest_raw_input(device_span buffer, - host_span> sources, - compression_type compression, - std::size_t range_offset, - std::size_t range_size, - char delimiter, - rmm::cuda_stream_view stream) +table_with_metadata read_json_impl(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { - CUDF_FUNC_RANGE(); - // We append a line delimiter between two files to make sure the last line of file i and the first - // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line - // delimiter. - auto constexpr num_delimiter_chars = 1; - - if (compression == compression_type::NONE) { - auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); - std::vector prefsum_source_sizes(sources.size()); - std::vector> h_buffers; - std::size_t bytes_read = 0; - std::transform_inclusive_scan(sources.begin(), - sources.end(), - prefsum_source_sizes.begin(), - std::plus{}, - [](std::unique_ptr const& s) { return s->size(); }); - auto upper = - std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); - std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - - auto const total_bytes_to_read = - std::min(range_size, prefsum_source_sizes.back() - range_offset); - range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; - i++) { - if (sources[i]->is_empty()) continue; - auto data_size = - std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); - auto destination = reinterpret_cast(buffer.data()) + bytes_read + - (num_delimiter_chars * delimiter_map.size()); - if (sources[i]->is_device_read_preferred(data_size)) { - bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); - bytes_read += h_buffer->size(); - } - range_offset = 0; - delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); - } - // Removing delimiter inserted after last non-empty source is read - if (!delimiter_map.empty()) { delimiter_map.pop_back(); } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator(delimiter); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - delimiter_map, stream, cudf::get_current_device_resource_ref()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - buffer.data()); - } - stream.synchronize(); - return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); - } - // TODO: allow byte range reading from multiple compressed files. - auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); - auto hbuffer = std::vector(remaining_bytes_to_read); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); - auto uncomp_data = decompress(compression, hbuffer); - auto ret_buffer = buffer.first(uncomp_data.size()); - cudf::detail::cuda_memcpy( - ret_buffer, - host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, - stream); - return ret_buffer; -} - -table_with_metadata read_json(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { - CUDF_EXPECTS(reader_opts.is_enabled_lines(), - "Specifying a byte range is supported only for JSON Lines"); - } - - if (sources.size() > 1) { - CUDF_EXPECTS(reader_opts.get_compression() == compression_type::NONE, - "Multiple compressed inputs are not supported"); - CUDF_EXPECTS(reader_opts.is_enabled_lines(), - "Multiple inputs are supported only for JSON Lines format"); - } - /* * The batched JSON reader enforces that the size of each batch is at most INT_MAX * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by @@ -462,4 +417,101 @@ table_with_metadata read_json(host_span> sources, {partial_tables[0].metadata.schema_info}}; } +} // anonymous namespace + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + std::size_t range_offset, + std::size_t range_size, + char delimiter, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + // We append a line delimiter between two files to make sure the last line of file i and the first + // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line + // delimiter. + auto constexpr num_delimiter_chars = 1; + + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; + std::size_t bytes_read = 0; + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](std::unique_ptr const& s) { return s->size(); }); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { + if (sources[i]->is_empty()) continue; + auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read + + (num_delimiter_chars * delimiter_map.size()); + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); + bytes_read += h_buffer->size(); + } + range_offset = 0; + delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); + } + // Removing delimiter inserted after last non-empty source is read + if (!delimiter_map.empty()) { delimiter_map.pop_back(); } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1 && !delimiter_map.empty()) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator(delimiter); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + delimiter_map, stream, cudf::get_current_device_resource_ref()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + buffer.data()); + } + stream.synchronize(); + return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); +} + +table_with_metadata read_json(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "Specifying a byte range is supported only for JSON Lines"); + } + + if (sources.size() > 1) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "Multiple inputs are supported only for JSON Lines format"); + } + + if (reader_opts.get_compression() == compression_type::NONE) + return read_json_impl(sources, reader_opts, stream, mr); + + std::vector> compressed_sources; + for (size_t i = 0; i < sources.size(); i++) { + compressed_sources.emplace_back( + std::make_unique(sources[i], reader_opts.get_compression())); + } + // in read_json_impl, we need the compressed source size to actually be the + // uncompressed source size for correct batching + return read_json_impl(compressed_sources, reader_opts, stream, mr); +} + } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 4def69cc629..ac980938522 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -32,10 +32,9 @@ namespace CUDF_EXPORT cudf { namespace io::json::detail { // Some magic numbers -constexpr int num_subchunks = 10; // per chunk_size -constexpr size_t min_subchunk_size = 10000; -constexpr int estimated_compression_ratio = 4; -constexpr int max_subchunks_prealloced = 3; +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int max_subchunks_prealloced = 3; /** * @brief Read from array of data sources into RMM buffer. The size of the returned device span @@ -45,15 +44,14 @@ constexpr int max_subchunks_prealloced = 3; * * @param buffer Device span buffer to which data is read * @param sources Array of data sources - * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source + * @param delimiter Delimiter character for JSONL inputs * @param stream CUDA stream used for device memory operations and kernel launches * @returns A subspan of the input device span containing data read */ device_span ingest_raw_input(device_span buffer, host_span> sources, - compression_type compression, size_t range_offset, size_t range_size, char delimiter, diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 1fe5e5aa41e..7046b3b3f91 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -460,7 +460,7 @@ host_span OrcDecompressor::decompress_blocks(host_span @@ -31,36 +32,66 @@ /** * @brief Base test fixture for JSON reader tests */ -struct JsonReaderTest : public cudf::test::BaseFixture {}; +struct JsonReaderTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonReaderTest, + JsonReaderTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); cudf::test::TempDirTestEnvironment* const temp_env = static_cast( ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); -TEST_F(JsonReaderTest, ByteRange_SingleSource) +TEST_P(JsonReaderTest, ByteRange_SingleSource) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = cudf::io::json_reader_options::builder( cudf::io::source_info{json_string.c_str(), json_string.size()}) .compression(cudf::io::compression_type::NONE) .lines(true); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span(cdata.data(), cdata.size())}) + .compression(comptype) + .lines(true); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); - auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto cdatasources = cudf::io::datasource::create(cjson_lines_options.get_source().host_buffers()); // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); @@ -77,37 +108,54 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource) } } -TEST_F(JsonReaderTest, ReadCompleteFiles) +TEST_P(JsonReaderTest, ReadCompleteFiles) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + + auto cfilename = temp_env->get_temp_dir() + "cParseInRangeIntegers.json"; { - std::ofstream outfile(filename, std::ofstream::out); - outfile << json_string; + std::ofstream outfile(cfilename, std::ofstream::out); + std::copy(cdata.begin(), cdata.end(), std::ostreambuf_iterator(outfile)); } constexpr int num_sources = 5; - std::vector filepaths(num_sources, filename); + std::vector cfilepaths(num_sources, cfilename); - cudf::io::json_reader_options in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) + cudf::io::json_reader_options cin_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepaths}) .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); - cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + cudf::io::table_with_metadata result = cudf::io::read_json(cin_options); std::vector part_tables; - for (auto filepath : filepaths) { - cudf::io::json_reader_options part_in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) + for (auto cfilepath : cfilepaths) { + cudf::io::json_reader_options part_cin_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepath}) .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); - part_tables.push_back(cudf::io::read_json(part_in_options)); + part_tables.push_back(cudf::io::read_json(part_cin_options)); } auto part_table_views = std::vector(part_tables.size()); @@ -120,42 +168,69 @@ TEST_F(JsonReaderTest, ReadCompleteFiles) CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result.tbl->view(), expected_result->view()); } -TEST_F(JsonReaderTest, ByteRange_MultiSource) +TEST_P(JsonReaderTest, ByteRange_MultiSource) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + + auto cfilename = temp_env->get_temp_dir() + "cParseInRangeIntegers.json"; { - std::ofstream outfile(filename, std::ofstream::out); - outfile << json_string; + std::ofstream outfile(cfilename, std::ofstream::out); + std::copy(cdata.begin(), cdata.end(), std::ostreambuf_iterator(outfile)); } constexpr int num_sources = 5; - std::vector filepaths(num_sources, filename); + std::vector cfilepaths(num_sources, cfilename); + std::vector> hostbufs( + num_sources, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) - .lines(true) + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(hostbufs.data(), hostbufs.size())}) .compression(cudf::io::compression_type::NONE) + .lines(true); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepaths}) + .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); - auto file_paths = json_lines_options.get_source().filepaths(); - std::vector> datasources; - for (auto& fp : file_paths) { - datasources.emplace_back(cudf::io::datasource::create(fp)); + std::vector> cdatasources; + for (auto& fp : cfilepaths) { + cdatasources.emplace_back(cudf::io::datasource::create(fp)); } + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 26937c9298a..3c8db99c3c7 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -14,6 +14,9 @@ * limitations under the License. */ +#include "io/comp/comp.hpp" +#include "io/comp/io_uncomp.hpp" + #include #include #include @@ -3252,4 +3255,59 @@ TEST_F(JsonReaderTest, JsonNestedDtypeFilterWithOrder) } } +struct JsonCompressedIOTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonCompressedIOTest, + JsonCompressedIOTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); + +TEST_P(JsonCompressedIOTest, BasicJsonLines) +{ + cudf::io::compression_type const comptype = GetParam(); + std::string data = to_records_orient( + {{{"0", "1"}, {"1", "1.1"}}, {{"0", "2"}, {"1", "2.2"}}, {{"0", "3"}, {"1", "3.3"}}}, "\n"); + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(data.data()), data.size()), + cudf::get_default_stream()); + auto decomp_out_buffer = cudf::io::detail::decompress( + comptype, cudf::host_span(cdata.data(), cdata.size())); + std::string const expected = R"({"0":1, "1":1.1} +{"0":2, "1":2.2} +{"0":3, "1":3.3})"; + EXPECT_EQ( + expected, + std::string(reinterpret_cast(decomp_out_buffer.data()), decomp_out_buffer.size())); + } else + cdata = std::vector(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span(cdata.data(), cdata.size())}) + .dtypes(std::vector{dtype(), dtype()}) + .compression(comptype) + .lines(true); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 2); + EXPECT_EQ(result.tbl->num_rows(), 3); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT32); + EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64); + + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + EXPECT_EQ(result.metadata.schema_info[1].name, "1"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int_wrapper{{1, 2, 3}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2, 3.3}}); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh index c31bb2d24e0..629a89fa777 100644 --- a/cpp/tests/io/json/json_utils.cuh +++ b/cpp/tests/io/json/json_utils.cuh @@ -32,7 +32,9 @@ template std::vector split_byte_range_reading( cudf::host_span> sources, + cudf::host_span> csources, cudf::io::json_reader_options const& reader_opts, + cudf::io::json_reader_options const& creader_opts, IndexType chunk_size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -49,7 +51,6 @@ std::vector split_byte_range_reading( rmm::device_uvector buffer(total_source_size, stream); auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, sources, - reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), reader_opts.get_delimiter(), @@ -95,10 +96,11 @@ std::vector split_byte_range_reading( record_ranges.emplace_back(prev, total_source_size); std::vector tables; + auto creader_opts_chunk = creader_opts; for (auto const& [chunk_start, chunk_end] : record_ranges) { - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); - tables.push_back(cudf::io::json::detail::read_json(sources, reader_opts_chunk, stream, mr)); + creader_opts_chunk.set_byte_range_offset(chunk_start); + creader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(cudf::io::json::detail::read_json(csources, creader_opts_chunk, stream, mr)); } // assume all records have same number of columns, and inferred same type. (or schema is passed) // TODO a step before to merge all columns, types and infer final schema. diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index a212d7d654a..0703fa72f67 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -15,6 +15,7 @@ */ #include "../io/json/json_utils.cuh" +#include "io/comp/comp.hpp" #include "large_strings_fixture.hpp" #include @@ -25,10 +26,19 @@ #include #include -struct JsonLargeReaderTest : public cudf::test::StringsLargeTest {}; +struct JsonLargeReaderTest : public cudf::test::StringsLargeTest, + public testing::WithParamInterface {}; -TEST_F(JsonLargeReaderTest, MultiBatch) +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonLargeReaderTest, + JsonLargeReaderTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::NONE)); + +TEST_P(JsonLargeReaderTest, MultiBatch) { + cudf::io::compression_type const comptype = GetParam(); + std::string json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } @@ -48,11 +58,26 @@ TEST_F(JsonLargeReaderTest, MultiBatch) json_string += json_string; } + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + constexpr int num_sources = 2; std::vector> hostbufs( num_sources, cudf::host_span(reinterpret_cast(json_string.data()), json_string.size())); + std::vector> chostbufs( + num_sources, + cudf::host_span(reinterpret_cast(cdata.data()), cdata.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = @@ -62,14 +87,20 @@ TEST_F(JsonLargeReaderTest, MultiBatch) .lines(true) .compression(cudf::io::compression_type::NONE) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(chostbufs.data(), chostbufs.size())}) + .lines(true) + .compression(comptype) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); + + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto cdatasources = cudf::io::datasource::create(cjson_lines_options.get_source().host_buffers()); - std::vector> datasources; - for (auto& hb : hostbufs) { - datasources.emplace_back(cudf::io::datasource::create(hb)); - } // Test for different chunk sizes std::vector chunk_sizes{batch_size_upper_bound / 4, batch_size_upper_bound / 2, @@ -79,7 +110,9 @@ TEST_F(JsonLargeReaderTest, MultiBatch) for (auto chunk_size : chunk_sizes) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); From d514517001598158a31eba1590b01bf2b14d61f6 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Nov 2024 10:13:28 -0800 Subject: [PATCH 2/7] Test the full matrix for polars and dask wheels on nightlies (#17320) This PR ensures that we have nightly coverage of more of the CUDA/Python/arch versions that we claim to support for dask-cudf and cudf-polars wheels. In addition, this PR ensures that we do not attempt to run the dbgen executable in the Polars repository on systems with too old of a glibc to support running them. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17320 --- .github/workflows/test.yaml | 6 ------ ci/run_cudf_polars_polars_tests.sh | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a7f8c6ca0a9..3be07480b15 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -121,8 +121,6 @@ jobs: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: - # This selects "ARCH=amd64 + the latest supported Python + CUDA". - matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: nightly branch: ${{ inputs.branch }} date: ${{ inputs.date }} @@ -153,8 +151,6 @@ jobs: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: - # This selects "ARCH=amd64 + the latest supported Python + CUDA". - matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: nightly branch: ${{ inputs.branch }} date: ${{ inputs.date }} @@ -164,8 +160,6 @@ jobs: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: - # This selects "ARCH=amd64 + the latest supported Python + CUDA". - matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: nightly branch: ${{ inputs.branch }} date: ${{ inputs.date }} diff --git a/ci/run_cudf_polars_polars_tests.sh b/ci/run_cudf_polars_polars_tests.sh index 95f78f17f2f..37616989f00 100755 --- a/ci/run_cudf_polars_polars_tests.sh +++ b/ci/run_cudf_polars_polars_tests.sh @@ -14,6 +14,24 @@ DESELECTED_TESTS=( "tests/docs/test_user_guide.py" # No dot binary in CI image ) +if [[ $(arch) == "aarch64" ]]; then + # The binary used for TPC-H generation is compiled for x86_64, not aarch64. + DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh") + # The connectorx package is not available on arm + DESELECTED_TESTS+=("tests/unit/io/database/test_read.py::test_read_database") + # The necessary timezone information cannot be found in our CI image. + DESELECTED_TESTS+=("tests/unit/io/test_parquet.py::test_parametric_small_page_mask_filtering") + DESELECTED_TESTS+=("tests/unit/testing/test_assert_series_equal.py::test_assert_series_equal_parametric") + DESELECTED_TESTS+=("tests/unit/operations/test_join.py::test_join_4_columns_with_validity") +else + # Ensure that we don't run dbgen when it uses newer symbols than supported by the glibc version in the CI image. + glibc_minor_version=$(ldd --version | head -1 | grep -o "[0-9]\.[0-9]\+" | tail -1 | cut -d '.' -f2) + latest_glibc_symbol_found=$(nm py-polars/tests/benchmark/data/pdsh/dbgen/dbgen | grep GLIBC | grep -o "[0-9]\.[0-9]\+" | sort --version-sort | tail -1 | cut -d "." -f 2) + if [[ ${glibc_minor_version} -lt ${latest_glibc_symbol_found} ]]; then + DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh") + fi +fi + DESELECTED_TESTS=$(printf -- " --deselect %s" "${DESELECTED_TESTS[@]}") python -m pytest \ --import-mode=importlib \ From 43f2f68b5e15cc541efe78fa226a0dd3a1fd6885 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:36:11 -0800 Subject: [PATCH 3/7] Fix reading Parquet string cols when `nrows` and `input_pass_limit` > 0 (#17321) This PR fixes reading string columns in Parquet using chunked parquet reader when `nrows` and `input_pass_limit` are > 0. Closes #17311 Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Ed Seidl (https://github.com/etseidl) - Lawrence Mitchell (https://github.com/wence-) - Bradley Dice (https://github.com/bdice) - https://github.com/nvdbaranec - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17321 --- cpp/src/io/parquet/page_decode.cuh | 19 +++- cpp/src/io/parquet/page_hdr.cu | 1 + cpp/src/io/parquet/parquet_gpu.hpp | 6 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 5 +- python/cudf/cudf/tests/test_parquet.py | 106 +++++++++++++++++- .../cudf_polars/cudf_polars/testing/plugin.py | 1 - python/cudf_polars/tests/test_scan.py | 24 +--- 7 files changed, 129 insertions(+), 33 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 9ed2929a70e..ab40a5b9a55 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -149,10 +149,21 @@ inline __device__ bool is_bounds_page(page_state_s* const s, size_t const begin = start_row; size_t const end = start_row + num_rows; - // for non-nested schemas, rows cannot span pages, so use a more restrictive test - return has_repetition - ? ((page_begin <= begin && page_end >= begin) || (page_begin <= end && page_end >= end)) - : ((page_begin < begin && page_end > begin) || (page_begin < end && page_end > end)); + // Test for list schemas. + auto const is_bounds_page_lists = + ((page_begin <= begin and page_end >= begin) or (page_begin <= end and page_end >= end)); + + // For non-list schemas, rows cannot span pages, so use a more restrictive test. Make sure to + // relax the test for `page_end` if we adjusted the `num_rows` for the last page to compensate + // for list row size estimates in `generate_list_column_row_count_estimates()` when chunked + // read mode. + auto const test_page_end_nonlists = + s->page.is_num_rows_adjusted ? page_end >= end : page_end > end; + + auto const is_bounds_page_nonlists = + (page_begin < begin and page_end > begin) or (page_begin < end and test_page_end_nonlists); + + return has_repetition ? is_bounds_page_lists : is_bounds_page_nonlists; } /** diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index a8a8c441a84..6aec4ce0ec2 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -433,6 +433,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, // definition levels bs->page.chunk_row = 0; bs->page.num_rows = 0; + bs->page.is_num_rows_adjusted = false; bs->page.skipped_values = -1; bs->page.skipped_leaf_values = 0; bs->page.str_bytes = 0; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 3b4d0e6dc80..ce9d48693ec 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -310,8 +310,10 @@ struct PageInfo { // - In the case of a nested schema, you have to decode the repetition and definition // levels to extract actual column values int32_t num_input_values; - int32_t chunk_row; // starting row of this page relative to the start of the chunk - int32_t num_rows; // number of rows in this page + int32_t chunk_row; // starting row of this page relative to the start of the chunk + int32_t num_rows; // number of rows in this page + bool is_num_rows_adjusted; // Flag to indicate if the number of rows of this page have been + // adjusted to compensate for the list row size estimates. // the next four are calculated in gpuComputePageStringSizes int32_t num_nulls; // number of null values (V2 header), but recalculated for string cols int32_t num_valids; // number of non-null values, taking into account skip_rows/num_rows diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index f03f1214b9a..bcdae4cbd3b 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -729,7 +729,10 @@ struct set_final_row_count { if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; } size_t const page_start_row = chunk.start_row + page.chunk_row; size_t const chunk_last_row = chunk.start_row + chunk.num_rows; - page.num_rows = chunk_last_row - page_start_row; + // Mark `is_num_rows_adjusted` to signal string decoders that the `num_rows` of this page has + // been adjusted. + page.is_num_rows_adjusted = page.num_rows != (chunk_last_row - page_start_row); + page.num_rows = chunk_last_row - page_start_row; } }; diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 96512dacb69..659d2ebd89a 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3771,10 +3771,10 @@ def test_parquet_chunked_reader( chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups ): df = pd.DataFrame( - {"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000} + {"a": [1, 2, 3, None] * 10000, "b": ["av", "qw", None, "xyz"] * 10000} ) buffer = BytesIO() - df.to_parquet(buffer) + df.to_parquet(buffer, row_group_size=10000) actual = read_parquet_chunked( [buffer], chunk_read_limit=chunk_read_limit, @@ -3788,6 +3788,108 @@ def test_parquet_chunked_reader( assert_eq(expected, actual) +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("num_rows", [997, 2997, None]) +def test_parquet_chunked_reader_structs( + chunk_read_limit, + pass_read_limit, + num_rows, +): + data = [ + { + "a": "g", + "b": { + "b_a": 10, + "b_b": {"b_b_b": None, "b_b_a": 2}, + }, + "c": None, + }, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]}, + {"a": "j", "b": None, "c": [8, 10]}, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": None}, + None, + { + "a": None, + "b": {"b_a": None, "b_b": {"b_b_b": 1}}, + "c": [18, 19], + }, + {"a": None, "b": None, "c": None}, + ] * 1000 + + pa_struct = pa.Table.from_pydict({"struct": data}) + df = cudf.DataFrame.from_arrow(pa_struct) + buffer = BytesIO() + df.to_parquet(buffer) + + # Number of rows to read + nrows = num_rows if num_rows is not None else len(df) + + actual = read_parquet_chunked( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + ) + expected = cudf.read_parquet( + buffer, + nrows=nrows, + ) + assert_eq(expected, actual) + + +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("num_rows", [4997, 9997, None]) +@pytest.mark.parametrize( + "str_encoding", + [ + "PLAIN", + "DELTA_BYTE_ARRAY", + "DELTA_LENGTH_BYTE_ARRAY", + ], +) +def test_parquet_chunked_reader_string_decoders( + chunk_read_limit, + pass_read_limit, + num_rows, + str_encoding, +): + df = pd.DataFrame( + { + "i64": [1, 2, 3, None] * 10000, + "str": ["av", "qw", "asd", "xyz"] * 10000, + "list": list( + [["ad", "cd"], ["asd", "fd"], None, ["asd", None]] * 10000 + ), + } + ) + buffer = BytesIO() + # Write 4 Parquet row groups with string column encoded + df.to_parquet( + buffer, + row_group_size=10000, + use_dictionary=False, + column_encoding={"str": str_encoding}, + ) + + # Number of rows to read + nrows = num_rows if num_rows is not None else len(df) + + # Check with num_rows specified + actual = read_parquet_chunked( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + ) + expected = cudf.read_parquet( + buffer, + nrows=nrows, + ) + assert_eq(expected, actual) + + @pytest.mark.parametrize( "nrows,skip_rows", [ diff --git a/python/cudf_polars/cudf_polars/testing/plugin.py b/python/cudf_polars/cudf_polars/testing/plugin.py index 2a9104d8c82..080a1af6e19 100644 --- a/python/cudf_polars/cudf_polars/testing/plugin.py +++ b/python/cudf_polars/cudf_polars/testing/plugin.py @@ -64,7 +64,6 @@ def pytest_configure(config: pytest.Config) -> None: "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception", - "tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311", "tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394", diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 61925b21a97..9c58a24c065 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -112,22 +112,7 @@ def test_scan( n_rows=n_rows, ) engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) - if ( - is_chunked - and (columns is None or columns[0] != "a") - and ( - # When we mask with the slice, it happens to remove the - # bad row - (mask is None and slice is not None) - # When we both slice and read a subset of rows it also - # removes the bad row - or (slice is None and n_rows is not None) - ) - ): - # slice read produces wrong result for string column - request.applymarker( - pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") - ) + if slice is not None: q = q.slice(*slice) if mask is not None: @@ -377,13 +362,6 @@ def large_df(df, tmpdir_factory, chunked_slice): def test_scan_parquet_chunked( request, chunked_slice, large_df, chunk_read_limit, pass_read_limit ): - if chunked_slice in {"skip_partial", "partial"} and ( - chunk_read_limit == 0 and pass_read_limit != 0 - ): - request.applymarker( - pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") - ) - assert_gpu_result_equal( large_df, engine=pl.GPUEngine( From 18b40dca3862df81170a8077dc66d5cb20d1e60a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:43:25 -0800 Subject: [PATCH 4/7] Remove cudf._lib.hash in favor of inlining pylibcudf (#17345) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Lawrence Mitchell (https://github.com/wence-) - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17345 --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/hash.pyx | 47 -------------------------- python/cudf/cudf/core/dataframe.py | 23 ++++++++++--- python/cudf/cudf/core/indexed_frame.py | 47 ++++++++++++++++++++++---- 5 files changed, 58 insertions(+), 61 deletions(-) delete mode 100644 python/cudf/cudf/_lib/hash.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index bdc98064f6c..2fc82a57a6f 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -22,7 +22,6 @@ set(cython_sources datetime.pyx filling.pyx groupby.pyx - hash.pyx interop.pyx join.pyx json.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 6f9f0c84e26..cd86767f0cd 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -9,7 +9,6 @@ datetime, filling, groupby, - hash, interop, join, json, diff --git a/python/cudf/cudf/_lib/hash.pyx b/python/cudf/cudf/_lib/hash.pyx deleted file mode 100644 index 89309b36371..00000000000 --- a/python/cudf/cudf/_lib/hash.pyx +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -import pylibcudf as plc - -from cudf.core.buffer import acquire_spill_lock - -from pylibcudf.table cimport Table - -from cudf._lib.column cimport Column - - -@acquire_spill_lock() -def hash_partition(list source_columns, list columns_to_hash, - int num_partitions): - plc_table, offsets = plc.partitioning.hash_partition( - plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), - columns_to_hash, - num_partitions - ) - return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets - - -@acquire_spill_lock() -def hash(list source_columns, str method, int seed=0): - cdef Table ctbl = Table( - [c.to_pylibcudf(mode="read") for c in source_columns] - ) - if method == "murmur3": - return Column.from_pylibcudf(plc.hashing.murmurhash3_x86_32(ctbl, seed)) - elif method == "xxhash64": - return Column.from_pylibcudf(plc.hashing.xxhash_64(ctbl, seed)) - elif method == "md5": - return Column.from_pylibcudf(plc.hashing.md5(ctbl)) - elif method == "sha1": - return Column.from_pylibcudf(plc.hashing.sha1(ctbl)) - elif method == "sha224": - return Column.from_pylibcudf(plc.hashing.sha224(ctbl)) - elif method == "sha256": - return Column.from_pylibcudf(plc.hashing.sha256(ctbl)) - elif method == "sha384": - return Column.from_pylibcudf(plc.hashing.sha384(ctbl)) - elif method == "sha512": - return Column.from_pylibcudf(plc.hashing.sha512(ctbl)) - else: - raise ValueError( - f"Unsupported hashing algorithm {method}." - ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index bf1c39b23da..9be5aabb4e2 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -26,6 +26,8 @@ from pandas.io.formats.printing import pprint_thing from typing_extensions import Self, assert_never +import pylibcudf as plc + import cudf import cudf.core.common from cudf import _lib as libcudf @@ -43,6 +45,7 @@ from cudf.core import column, df_protocol, indexing_utils, reshape from cudf.core._compat import PANDAS_LT_300 from cudf.core.abc import Serializable +from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ( CategoricalColumn, ColumnBase, @@ -4962,7 +4965,9 @@ def apply_chunks( ) @_performance_tracking - def partition_by_hash(self, columns, nparts, keep_index=True): + def partition_by_hash( + self, columns, nparts: int, keep_index: bool = True + ) -> list[DataFrame]: """Partition the dataframe by the hashed value of data in *columns*. Parameters @@ -4986,13 +4991,21 @@ def partition_by_hash(self, columns, nparts, keep_index=True): else: cols = [*self._columns] - output_columns, offsets = libcudf.hash.hash_partition( - cols, key_indices, nparts - ) + with acquire_spill_lock(): + plc_table, offsets = plc.partitioning.hash_partition( + plc.Table([col.to_pylibcudf(mode="read") for col in cols]), + key_indices, + nparts, + ) + output_columns = [ + libcudf.column.Column.from_pylibcudf(col) + for col in plc_table.columns() + ] + outdf = self._from_columns_like_self( output_columns, self._column_names, - self._index_names if keep_index else None, + self._index_names if keep_index else None, # type: ignore[arg-type] ) # Slice into partitions. Notice, `hash_partition` returns the start # offset of each partition thus we skip the first offset diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index e031f2a4e8e..9130779c3e9 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -21,7 +21,7 @@ import pandas as pd from typing_extensions import Self -import pylibcudf +import pylibcudf as plc import cudf import cudf._lib as libcudf @@ -2817,7 +2817,20 @@ def memory_usage(self, index=True, deep=False): """ raise NotImplementedError - def hash_values(self, method="murmur3", seed=None): + def hash_values( + self, + method: Literal[ + "murmur3", + "xxhash64", + "md5", + "sha1", + "sha224", + "sha256", + "sha384", + "sha512", + ] = "murmur3", + seed: int | None = None, + ) -> cudf.Series: """Compute the hash of values in this column. Parameters @@ -2894,11 +2907,31 @@ def hash_values(self, method="murmur3", seed=None): "Provided seed value has no effect for the hash method " f"`{method}`. Only {seed_hash_methods} support seeds." ) - # Note that both Series and DataFrame return Series objects from this - # calculation, necessitating the unfortunate circular reference to the - # child class here. + with acquire_spill_lock(): + plc_table = plc.Table( + [c.to_pylibcudf(mode="read") for c in self._columns] + ) + if method == "murmur3": + plc_column = plc.hashing.murmurhash3_x86_32(plc_table, seed) + elif method == "xxhash64": + plc_column = plc.hashing.xxhash_64(plc_table, seed) + elif method == "md5": + plc_column = plc.hashing.md5(plc_table) + elif method == "sha1": + plc_column = plc.hashing.sha1(plc_table) + elif method == "sha224": + plc_column = plc.hashing.sha224(plc_table) + elif method == "sha256": + plc_column = plc.hashing.sha256(plc_table) + elif method == "sha384": + plc_column = plc.hashing.sha384(plc_table) + elif method == "sha512": + plc_column = plc.hashing.sha512(plc_table) + else: + raise ValueError(f"Unsupported hashing algorithm {method}.") + result = libcudf.column.Column.from_pylibcudf(plc_column) return cudf.Series._from_column( - libcudf.hash.hash([*self._columns], method, seed), + result, index=self.index, ) @@ -6270,7 +6303,7 @@ def rank( if method not in {"average", "min", "max", "first", "dense"}: raise KeyError(method) - method_enum = pylibcudf.aggregation.RankMethod[method.upper()] + method_enum = plc.aggregation.RankMethod[method.upper()] if na_option not in {"keep", "top", "bottom"}: raise ValueError( "na_option must be one of 'keep', 'top', or 'bottom'" From ba21673b93c7ba83f2b0dc76f2294535f684f120 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:46:14 -0800 Subject: [PATCH 5/7] Remove cudf._lib.concat in favor of inlining pylibcudf (#17344) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17344 --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/concat.pyx | 35 --------------------- python/cudf/cudf/_lib/utils.pxd | 2 +- python/cudf/cudf/_lib/utils.pyx | 2 +- python/cudf/cudf/core/column/categorical.py | 4 +-- python/cudf/cudf/core/column/column.py | 9 +++++- python/cudf/cudf/core/dataframe.py | 29 ++++++++++++++--- 8 files changed, 36 insertions(+), 47 deletions(-) delete mode 100644 python/cudf/cudf/_lib/concat.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 2fc82a57a6f..13beec3c7f7 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -16,7 +16,6 @@ set(cython_sources aggregation.pyx binaryop.pyx column.pyx - concat.pyx copying.pyx csv.pyx datetime.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index cd86767f0cd..a63bc1a3d1c 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -3,7 +3,6 @@ from . import ( binaryop, - concat, copying, csv, datetime, diff --git a/python/cudf/cudf/_lib/concat.pyx b/python/cudf/cudf/_lib/concat.pyx deleted file mode 100644 index e6c2d136f0d..00000000000 --- a/python/cudf/cudf/_lib/concat.pyx +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from libcpp cimport bool - -from cudf._lib.column cimport Column -from cudf._lib.utils cimport data_from_pylibcudf_table - -import pylibcudf - -from cudf.core.buffer import acquire_spill_lock - - -@acquire_spill_lock() -def concat_columns(object columns): - return Column.from_pylibcudf( - pylibcudf.concatenate.concatenate( - [col.to_pylibcudf(mode="read") for col in columns] - ) - ) - - -@acquire_spill_lock() -def concat_tables(object tables, bool ignore_index=False): - plc_tables = [] - for table in tables: - cols = table._columns - if not ignore_index: - cols = table._index._columns + cols - plc_tables.append(pylibcudf.Table([c.to_pylibcudf(mode="read") for c in cols])) - - return data_from_pylibcudf_table( - pylibcudf.concatenate.concatenate(plc_tables), - column_names=tables[0]._column_names, - index_names=None if ignore_index else tables[0]._index_names - ) diff --git a/python/cudf/cudf/_lib/utils.pxd b/python/cudf/cudf/_lib/utils.pxd index 623c5064a1a..f273aeb4270 100644 --- a/python/cudf/cudf/_lib/utils.pxd +++ b/python/cudf/cudf/_lib/utils.pxd @@ -10,7 +10,7 @@ from pylibcudf.libcudf.table.table cimport table, table_view cdef data_from_unique_ptr( unique_ptr[table] c_tbl, column_names, index_names=*) -cdef data_from_pylibcudf_table(tbl, column_names, index_names=*) +cpdef data_from_pylibcudf_table(tbl, column_names, index_names=*) cpdef data_from_pylibcudf_io(tbl_with_meta, column_names = *, index_names = *) cdef data_from_table_view( table_view tv, object owner, object column_names, object index_names=*) diff --git a/python/cudf/cudf/_lib/utils.pyx b/python/cudf/cudf/_lib/utils.pyx index 292de82e4c4..2ccc6ca34dc 100644 --- a/python/cudf/cudf/_lib/utils.pyx +++ b/python/cudf/cudf/_lib/utils.pyx @@ -309,7 +309,7 @@ cdef data_from_unique_ptr( ) -cdef data_from_pylibcudf_table(tbl, column_names, index_names=None): +cpdef data_from_pylibcudf_table(tbl, column_names, index_names=None): return _data_from_columns( columns_from_pylibcudf_table(tbl), column_names, diff --git a/python/cudf/cudf/core/column/categorical.py b/python/cudf/cudf/core/column/categorical.py index b7d5e8658a0..7354b917f90 100644 --- a/python/cudf/cudf/core/column/categorical.py +++ b/python/cudf/cudf/core/column/categorical.py @@ -1204,9 +1204,7 @@ def _concat( elif newsize == 0: codes_col = column.column_empty(0, head.codes.dtype, masked=True) else: - # Filter out inputs that have 0 length, then concatenate. - codes = [o for o in codes if len(o)] - codes_col = libcudf.concat.concat_columns(objs) + codes_col = column.concat_columns(codes) # type: ignore[arg-type] codes_col = as_unsigned_codes( len(cats), diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 03dcf6bec1e..f6eaea4b783 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -19,6 +19,7 @@ from pandas.core.arrays.arrow.extension_types import ArrowIntervalType from typing_extensions import Self +import pylibcudf as plc import rmm import cudf @@ -2300,4 +2301,10 @@ def concat_columns(objs: "MutableSequence[ColumnBase]") -> ColumnBase: return column_empty(0, head.dtype, masked=True) # Filter out inputs that have 0 length, then concatenate. - return libcudf.concat.concat_columns([o for o in objs if len(o)]) + objs_with_len = [o for o in objs if len(o)] + with acquire_spill_lock(): + return Column.from_pylibcudf( + plc.concatenate.concatenate( + [col.to_pylibcudf(mode="read") for col in objs_with_len] + ) + ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 9be5aabb4e2..bd78d5dd9f1 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -1787,11 +1787,32 @@ def _concat( ) # Concatenate the Tables - out = cls._from_data( - *libcudf.concat.concat_tables( - tables, ignore_index=ignore_index or are_all_range_index + ignore = ignore_index or are_all_range_index + index_names = None if ignore else tables[0]._index_names + column_names = tables[0]._column_names + with acquire_spill_lock(): + plc_tables = [ + plc.Table( + [ + c.to_pylibcudf(mode="read") + for c in ( + table._columns + if ignore + else itertools.chain( + table._index._columns, table._columns + ) + ) + ] + ) + for table in tables + ] + + concatted = libcudf.utils.data_from_pylibcudf_table( + plc.concatenate.concatenate(plc_tables), + column_names=column_names, + index_names=index_names, ) - ) + out = cls._from_data(*concatted) # If ignore_index is True, all input frames are empty, and at # least one input frame has an index, assign a new RangeIndex From 02c35bfe71abcc8f5889fbc54c4f4902d1d2a29b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:48:07 -0800 Subject: [PATCH 6/7] Remove cudf._lib.quantiles in favor of inlining pylibcudf (#17347) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17347 --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/utils.pxd | 2 +- python/cudf/cudf/_lib/utils.pyx | 2 +- python/cudf/cudf/core/column/decimal.py | 14 -------------- python/cudf/cudf/core/column/numerical_base.py | 16 ++++++++++++---- python/cudf/cudf/core/frame.py | 13 +++++++++---- 7 files changed, 23 insertions(+), 26 deletions(-) diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 13beec3c7f7..b274f771479 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -31,7 +31,6 @@ set(cython_sources orc.pyx parquet.pyx partitioning.pyx - quantiles.pyx reduce.pyx replace.pyx reshape.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index a63bc1a3d1c..8455db5d2b5 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -18,7 +18,6 @@ orc, parquet, partitioning, - quantiles, reduce, replace, reshape, diff --git a/python/cudf/cudf/_lib/utils.pxd b/python/cudf/cudf/_lib/utils.pxd index f273aeb4270..6db3036d514 100644 --- a/python/cudf/cudf/_lib/utils.pxd +++ b/python/cudf/cudf/_lib/utils.pxd @@ -18,5 +18,5 @@ cdef table_view table_view_from_columns(columns) except * cdef table_view table_view_from_table(tbl, ignore_index=*) except* cdef columns_from_unique_ptr(unique_ptr[table] c_tbl) cdef columns_from_table_view(table_view tv, object owners) -cdef columns_from_pylibcudf_table(tbl) +cpdef columns_from_pylibcudf_table(tbl) cdef _data_from_columns(columns, column_names, index_names=*) diff --git a/python/cudf/cudf/_lib/utils.pyx b/python/cudf/cudf/_lib/utils.pyx index 2ccc6ca34dc..244d7fdc006 100644 --- a/python/cudf/cudf/_lib/utils.pyx +++ b/python/cudf/cudf/_lib/utils.pyx @@ -229,7 +229,7 @@ cdef columns_from_unique_ptr( return columns -cdef columns_from_pylibcudf_table(tbl): +cpdef columns_from_pylibcudf_table(tbl): """Convert a pylibcudf table into list of columns. Parameters diff --git a/python/cudf/cudf/core/column/decimal.py b/python/cudf/cudf/core/column/decimal.py index 540aa02b842..ce7aa91f775 100644 --- a/python/cudf/cudf/core/column/decimal.py +++ b/python/cudf/cudf/core/column/decimal.py @@ -3,7 +3,6 @@ from __future__ import annotations import warnings -from collections.abc import Sequence from decimal import Decimal from typing import TYPE_CHECKING, cast @@ -217,19 +216,6 @@ def normalize_binop_value(self, other): ) return NotImplemented - def _decimal_quantile( - self, q: float | Sequence[float], interpolation: str, exact: bool - ) -> ColumnBase: - quant = [float(q)] if not isinstance(q, (Sequence, np.ndarray)) else q - # get sorted indices and exclude nulls - indices = libcudf.sort.order_by( - [self], [True], "first", stable=True - ).slice(self.null_count, len(self)) - result = libcudf.quantiles.quantile( - self, quant, interpolation, indices, exact - ) - return result._with_type_metadata(self.dtype) - def as_numerical_column( self, dtype: Dtype ) -> "cudf.core.column.NumericalColumn": diff --git a/python/cudf/cudf/core/column/numerical_base.py b/python/cudf/cudf/core/column/numerical_base.py index f6ab91f2f01..6d639337401 100644 --- a/python/cudf/cudf/core/column/numerical_base.py +++ b/python/cudf/cudf/core/column/numerical_base.py @@ -7,9 +7,11 @@ import numpy as np +import pylibcudf as plc + import cudf from cudf import _lib as libcudf -from cudf.core.buffer import Buffer +from cudf.core.buffer import Buffer, acquire_spill_lock from cudf.core.column import ColumnBase from cudf.core.missing import NA from cudf.core.mixins import Scannable @@ -145,9 +147,15 @@ def quantile( indices = libcudf.sort.order_by( [self], [True], "first", stable=True ).slice(self.null_count, len(self)) - result = libcudf.quantiles.quantile( - self, q, interpolation, indices, exact - ) + with acquire_spill_lock(): + plc_column = plc.quantiles.quantile( + self.to_pylibcudf(mode="read"), + q, + plc.types.Interpolation[interpolation.upper()], + indices.to_pylibcudf(mode="read"), + exact, + ) + result = type(self).from_pylibcudf(plc_column) # type: ignore[assignment] if return_scalar: scalar_result = result.element_indexing(0) if interpolation in {"lower", "higher", "nearest"}: diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 2b4a17f9559..30868924bcd 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -799,15 +799,20 @@ def _quantile_table( null_precedence = [plc.types.NullOrder[key] for key in null_precedence] - return self._from_columns_like_self( - libcudf.quantiles.quantile_table( - [*self._columns], + with acquire_spill_lock(): + plc_table = plc.quantiles.quantiles( + plc.Table( + [c.to_pylibcudf(mode="read") for c in self._columns] + ), q, interpolation, is_sorted, column_order, null_precedence, - ), + ) + columns = libcudf.utils.columns_from_pylibcudf_table(plc_table) + return self._from_columns_like_self( + columns, column_names=self._column_names, ) From 302e625bf87dce4059eb7c383dced848ad9d8f4c Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:49:31 -0800 Subject: [PATCH 7/7] Remove cudf._lib.labeling in favor of inlining pylibcudf (#17346) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17346 --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/labeling.pyx | 24 --------------- python/cudf/cudf/core/column/datetime.py | 39 ++++++++++++++---------- python/cudf/cudf/core/cut.py | 22 ++++++++++--- python/cudf/cudf/core/resample.py | 32 +++++++++++-------- 6 files changed, 61 insertions(+), 58 deletions(-) delete mode 100644 python/cudf/cudf/_lib/labeling.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index b274f771479..2958c286d20 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -24,7 +24,6 @@ set(cython_sources interop.pyx join.pyx json.pyx - labeling.pyx lists.pyx merge.pyx null_mask.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 8455db5d2b5..19dc4488560 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -11,7 +11,6 @@ interop, join, json, - labeling, merge, null_mask, nvtext, diff --git a/python/cudf/cudf/_lib/labeling.pyx b/python/cudf/cudf/_lib/labeling.pyx deleted file mode 100644 index 524bfd3b2e8..00000000000 --- a/python/cudf/cudf/_lib/labeling.pyx +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. - -from libcpp cimport bool as cbool - -import pylibcudf as plc - -from cudf._lib.column cimport Column -from cudf.core.buffer import acquire_spill_lock - - -# Note that the parameter input shadows a Python built-in in the local scope, -# but I'm not too concerned about that since there's no use-case for actual -# input in this context. -@acquire_spill_lock() -def label_bins(Column input, Column left_edges, cbool left_inclusive, - Column right_edges, cbool right_inclusive): - plc_column = plc.labeling.label_bins( - input.to_pylibcudf(mode="read"), - left_edges.to_pylibcudf(mode="read"), - plc.labeling.Inclusive.YES if left_inclusive else plc.labeling.Inclusive.NO, - right_edges.to_pylibcudf(mode="read"), - plc.labeling.Inclusive.YES if right_inclusive else plc.labeling.Inclusive.NO, - ) - return Column.from_pylibcudf(plc_column) diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index d8fa236d53c..16124cf0a7d 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -14,9 +14,10 @@ import pandas as pd import pyarrow as pa +import pylibcudf as plc + import cudf from cudf import _lib as libcudf -from cudf._lib.labeling import label_bins from cudf._lib.search import search_sorted from cudf.core._compat import PANDAS_GE_220 from cudf.core._internals import unary @@ -25,7 +26,7 @@ get_compatible_timezone, get_tz_data, ) -from cudf.core.buffer import Buffer +from cudf.core.buffer import Buffer, acquire_spill_lock from cudf.core.column import ColumnBase, as_column, column, string from cudf.core.column.timedelta import _unit_to_nanoseconds_conversion from cudf.utils.dtypes import _get_base_dtype @@ -819,13 +820,16 @@ def _find_ambiguous_and_nonexistent( # The end of an ambiguous time period is what Clock 2 reads at # the moment of transition: ambiguous_end = clock_2.apply_boolean_mask(cond) - ambiguous = label_bins( - self, - left_edges=ambiguous_begin, - left_inclusive=True, - right_edges=ambiguous_end, - right_inclusive=False, - ).notnull() + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + self.to_pylibcudf(mode="read"), + ambiguous_begin.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES, + ambiguous_end.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.NO, + ) + ambiguous = libcudf.column.Column.from_pylibcudf(plc_column) + ambiguous = ambiguous.notnull() # At the start of a non-existent time period, Clock 2 reads less # than Clock 1 (which has been turned forward): @@ -835,13 +839,16 @@ def _find_ambiguous_and_nonexistent( # The end of the non-existent time period is what Clock 1 reads # at the moment of transition: nonexistent_end = clock_1.apply_boolean_mask(cond) - nonexistent = label_bins( - self, - left_edges=nonexistent_begin, - left_inclusive=True, - right_edges=nonexistent_end, - right_inclusive=False, - ).notnull() + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + self.to_pylibcudf(mode="read"), + nonexistent_begin.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES, + nonexistent_end.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.NO, + ) + nonexistent = libcudf.column.Column.from_pylibcudf(plc_column) + nonexistent = nonexistent.notnull() return ambiguous, nonexistent diff --git a/python/cudf/cudf/core/cut.py b/python/cudf/cudf/core/cut.py index c9b1fa2669c..a4d12cfc7f0 100644 --- a/python/cudf/cudf/core/cut.py +++ b/python/cudf/cudf/core/cut.py @@ -6,8 +6,12 @@ import numpy as np import pandas as pd +import pylibcudf as plc + import cudf +from cudf._lib.column import Column from cudf.api.types import is_list_like +from cudf.core.buffer import acquire_spill_lock from cudf.core.column import as_column from cudf.core.column.categorical import CategoricalColumn, as_unsigned_codes from cudf.core.index import IntervalIndex, interval_range @@ -256,9 +260,19 @@ def cut( # the input arr must be changed to the same type as the edges input_arr = input_arr.astype(left_edges.dtype) # get the indexes for the appropriate number - index_labels = cudf._lib.labeling.label_bins( - input_arr, left_edges, left_inclusive, right_edges, right_inclusive - ) + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + input_arr.to_pylibcudf(mode="read"), + left_edges.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if left_inclusive + else plc.labeling.Inclusive.NO, + right_edges.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if right_inclusive + else plc.labeling.Inclusive.NO, + ) + index_labels = Column.from_pylibcudf(plc_column) if labels is False: # if labels is false we return the index labels, we return them @@ -283,7 +297,7 @@ def cut( # should allow duplicate categories. return interval_labels[index_labels] - index_labels = as_unsigned_codes(len(interval_labels), index_labels) + index_labels = as_unsigned_codes(len(interval_labels), index_labels) # type: ignore[arg-type] col = CategoricalColumn( data=None, diff --git a/python/cudf/cudf/core/resample.py b/python/cudf/cudf/core/resample.py index e0aee28bfeb..d95d252559f 100644 --- a/python/cudf/cudf/core/resample.py +++ b/python/cudf/cudf/core/resample.py @@ -22,9 +22,11 @@ import numpy as np import pandas as pd +import pylibcudf as plc + import cudf -import cudf._lib.labeling -import cudf.core.index +from cudf._lib.column import Column +from cudf.core.buffer import acquire_spill_lock from cudf.core.groupby.groupby import ( DataFrameGroupBy, GroupBy, @@ -48,7 +50,7 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs): func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs ) if len(self.grouping.bin_labels) != len(result): - index = cudf.core.index.Index( + index = cudf.Index( self.grouping.bin_labels, name=self.grouping.names[0] ) return result._align_to_index( @@ -125,7 +127,7 @@ class SeriesResampler(_Resampler, SeriesGroupBy): class _ResampleGrouping(_Grouping): - bin_labels: cudf.core.index.Index + bin_labels: cudf.Index def __init__(self, obj, by=None, level=None): self._freq = getattr(by, "freq", None) @@ -170,7 +172,7 @@ def deserialize(cls, header, frames): out.names = names out._named_columns = _named_columns out._key_columns = key_columns - out.bin_labels = cudf.core.index.Index.deserialize( + out.bin_labels = cudf.Index.deserialize( header["__bin_labels"], frames[-header["__bin_labels_count"] :] ) out._freq = header["_freq"] @@ -268,13 +270,19 @@ def _handle_frequency_grouper(self, by): cast_bin_labels = bin_labels.astype(result_type) # bin the key column: - bin_numbers = cudf._lib.labeling.label_bins( - cast_key_column, - left_edges=cast_bin_labels[:-1]._column, - left_inclusive=(closed == "left"), - right_edges=cast_bin_labels[1:]._column, - right_inclusive=(closed == "right"), - ) + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + cast_key_column.to_pylibcudf(mode="read"), + cast_bin_labels[:-1]._column.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if closed == "left" + else plc.labeling.Inclusive.NO, + cast_bin_labels[1:]._column.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if closed == "right" + else plc.labeling.Inclusive.NO, + ) + bin_numbers = Column.from_pylibcudf(plc_column) if label == "right": cast_bin_labels = cast_bin_labels[1:]