Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into wence/fea/polars-1.14
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar authored Nov 19, 2024
2 parents 2b98347 + 302e625 commit 398b613
Show file tree
Hide file tree
Showing 39 changed files with 1,096 additions and 544 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ jobs:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
Expand Down Expand Up @@ -153,8 +151,6 @@ jobs:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
Expand All @@ -164,8 +160,6 @@ jobs:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
Expand Down
18 changes: 18 additions & 0 deletions ci/run_cudf_polars_polars_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@ DESELECTED_TESTS=(
"tests/unit/test_polars_import.py::test_fork_safety" # test started to hang in polars-1.14
)

if [[ $(arch) == "aarch64" ]]; then
# The binary used for TPC-H generation is compiled for x86_64, not aarch64.
DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh")
# The connectorx package is not available on arm
DESELECTED_TESTS+=("tests/unit/io/database/test_read.py::test_read_database")
# The necessary timezone information cannot be found in our CI image.
DESELECTED_TESTS+=("tests/unit/io/test_parquet.py::test_parametric_small_page_mask_filtering")
DESELECTED_TESTS+=("tests/unit/testing/test_assert_series_equal.py::test_assert_series_equal_parametric")
DESELECTED_TESTS+=("tests/unit/operations/test_join.py::test_join_4_columns_with_validity")
else
# Ensure that we don't run dbgen when it uses newer symbols than supported by the glibc version in the CI image.
glibc_minor_version=$(ldd --version | head -1 | grep -o "[0-9]\.[0-9]\+" | tail -1 | cut -d '.' -f2)
latest_glibc_symbol_found=$(nm py-polars/tests/benchmark/data/pdsh/dbgen/dbgen | grep GLIBC | grep -o "[0-9]\.[0-9]\+" | sort --version-sort | tail -1 | cut -d "." -f 2)
if [[ ${glibc_minor_version} -lt ${latest_glibc_symbol_found} ]]; then
DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh")
fi
fi

DESELECTED_TESTS=$(printf -- " --deselect %s" "${DESELECTED_TESTS[@]}")
python -m pytest \
--import-mode=importlib \
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ add_library(
src/io/avro/avro_gpu.cu
src/io/avro/reader_impl.cu
src/io/comp/brotli_dict.cpp
src/io/comp/comp.cpp
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
Expand Down
119 changes: 119 additions & 0 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "comp.hpp"

#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress

namespace cudf::io::detail {

namespace {

/**
* @brief GZIP host compressor (includes header)
*/
std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src)
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = src.size();
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));

std::vector<uint8_t> dst;
zs.avail_out = 0;
zs.next_out = nullptr;

int windowbits = 15;
int gzip_encoding = 16;
int ret = deflateInit2(
&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed.");

uint32_t estcomplen = deflateBound(&zs, src.size());
dst.resize(estcomplen);
zs.avail_out = estcomplen;
zs.next_out = dst.data();

ret = deflate(&zs, Z_FINISH);
CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!");
dst.resize(std::distance(dst.data(), zs.next_out));

ret = deflateEnd(&zs);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation");

return dst;
}

/**
* @brief SNAPPY device compressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
}

} // namespace

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
switch (compression) {
case compression_type::GZIP: return compress_gzip(src);
case compression_type::SNAPPY: return compress_snappy(src, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

} // namespace cudf::io::detail
43 changes: 43 additions & 0 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief Compresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Decompressed host buffer
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Vector containing the Compressed output
*/
std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
8 changes: 4 additions & 4 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -980,27 +980,27 @@ __device__ int parse_gzip_header(uint8_t const* src, size_t src_size)
{
uint8_t flags = src[3];
hdr_len = 10;
if (flags & GZIPHeaderFlag::fextra) // Extra fields present
if (flags & detail::GZIPHeaderFlag::fextra) // Extra fields present
{
int xlen = src[hdr_len] | (src[hdr_len + 1] << 8);
hdr_len += xlen;
if (hdr_len >= src_size) return -1;
}
if (flags & GZIPHeaderFlag::fname) // Original file name present
if (flags & detail::GZIPHeaderFlag::fname) // Original file name present
{
// Skip zero-terminated string
do {
if (hdr_len >= src_size) return -1;
} while (src[hdr_len++] != 0);
}
if (flags & GZIPHeaderFlag::fcomment) // Comment present
if (flags & detail::GZIPHeaderFlag::fcomment) // Comment present
{
// Skip zero-terminated string
do {
if (hdr_len >= src_size) return -1;
} while (src[hdr_len++] != 0);
}
if (flags & GZIPHeaderFlag::fhcrc) // Header CRC present
if (flags & detail::GZIPHeaderFlag::fhcrc) // Header CRC present
{
hdr_len += 2;
}
Expand Down
34 changes: 28 additions & 6 deletions cpp/src/io/comp/io_uncomp.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,8 +25,8 @@

using cudf::host_span;

namespace cudf {
namespace io {
namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief Decompresses a system memory buffer.
Expand All @@ -36,13 +36,35 @@ namespace io {
*
* @return Vector containing the Decompressed output
*/
std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t const> src);
[[nodiscard]] std::vector<uint8_t> decompress(compression_type compression,
host_span<uint8_t const> src);

/**
* @brief Decompresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
* @param dst Destination host span to place decompressed buffer
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Size of decompressed output
*/
size_t decompress(compression_type compression,
host_span<uint8_t const> src,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream);

/**
* @brief Without actually decompressing the compressed input buffer passed, return the size of
* decompressed output. If the decompressed size cannot be extracted apriori, return zero.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
*
* @return Size of decompressed output
*/
size_t get_uncompressed_size(compression_type compression, host_span<uint8_t const> src);

/**
* @brief GZIP header flags
* See https://tools.ietf.org/html/rfc1952
Expand All @@ -55,5 +77,5 @@ constexpr uint8_t fname = 0x08; // Original file name present
constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

} // namespace io
} // namespace cudf
} // namespace io::detail
} // namespace CUDF_EXPORT cudf
Loading

0 comments on commit 398b613

Please sign in to comment.