Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gluten use the velox arrow 8.0 #381

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 5 additions & 48 deletions third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,57 +24,11 @@ if(VELOX_ENABLE_ARROW)
else()
set(THRIFT_SOURCE "BUNDLED")
endif()
# Use external arrow & parquet only if <package>_DIR is defined.
if(DEFINED Arrow_HOME)
find_package(Arrow PATHS "${Arrow_HOME}/arrow_install" NO_DEFAULT_PATH)
find_package(Parquet PATHS "${Arrow_HOME}/arrow_install" NO_DEFAULT_PATH)
if(Arrow_FOUND AND Parquet_FOUND)
add_library(arrow INTERFACE)
add_library(parquet INTERFACE)

if(TARGET Arrow::arrow_static)
target_link_libraries(arrow INTERFACE Arrow::arrow_static)
else()
target_link_libraries(arrow INTERFACE Arrow::arrow_shared)
endif()

if(TARGET Parquet::parquet_static)
target_link_libraries(parquet INTERFACE Parquet::parquet_static)
else()
target_link_libraries(parquet INTERFACE Parquet::parquet_shared)
endif()

message(STATUS "Using pre-builded arrow")
endif()

if(Thrift_FOUND)
add_library(thrift INTERFACE)
target_link_libraries(thrift INTERFACE thrift::thrift)
message(STATUS "Using system thrift")
else()
add_library(thrift STATIC IMPORTED GLOBAL)
if(NOT Thrift_FOUND)
set(THRIFT_ROOT ${Arrow_HOME}/arrow_ep/cpp/build/thrift_ep-install)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(THRIFT_LIB ${THRIFT_ROOT}/lib/libthriftd.a)
else()
set(THRIFT_LIB ${THRIFT_ROOT}/lib/libthrift.a)
endif()

file(MAKE_DIRECTORY ${THRIFT_ROOT}/include)
set(THRIFT_INCLUDE_DIR ${THRIFT_ROOT}/include)
endif()

set_property(TARGET thrift PROPERTY INTERFACE_INCLUDE_DIRECTORIES
${THRIFT_INCLUDE_DIR})
set_property(TARGET thrift PROPERTY IMPORTED_LOCATION ${THRIFT_LIB})
message(STATUS "Using pre-builded thrift")
endif()
return()
endif()
set(ARROW_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/arrow_ep")
set(ARROW_CMAKE_ARGS
-DARROW_PARQUET=ON
-DARROW_FILESYSTEM=ON
-DARROW_WITH_LZ4=ON
-DARROW_WITH_SNAPPY=ON
-DARROW_WITH_ZLIB=ON
Expand All @@ -86,7 +40,8 @@ if(VELOX_ENABLE_ARROW)
-DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX}/install
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DARROW_BUILD_STATIC=ON
-DThrift_SOURCE=${THRIFT_SOURCE})
-DThrift_SOURCE=${THRIFT_SOURCE}
-DSnappy_SOURCE=AUTO)
set(ARROW_LIBDIR ${ARROW_PREFIX}/install/${CMAKE_INSTALL_LIBDIR})

add_library(thrift STATIC IMPORTED GLOBAL)
Expand Down Expand Up @@ -119,6 +74,8 @@ if(VELOX_ENABLE_ARROW)
arrow_ep
PREFIX ${ARROW_PREFIX}
URL ${VELOX_ARROW_SOURCE_URL}
PATCH_COMMAND patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/arrow_patches/memorypool.patch &&
patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/arrow_patches/custom-codec.patch
URL_HASH ${VELOX_ARROW_BUILD_SHA256_CHECKSUM}
SOURCE_SUBDIR cpp
CMAKE_ARGS ${ARROW_CMAKE_ARGS}
Expand Down
222 changes: 222 additions & 0 deletions third_party/arrow_patches/custom-codec.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
From 6d7aa045030a7714a28b96a48ceaba6ec49fe0d1 Mon Sep 17 00:00:00 2001
From: Rong Ma <[email protected]>
Date: Mon, 16 Jan 2023 19:05:02 +0800
Subject: [PATCH] Add custom codec

---
cpp/src/arrow/ipc/metadata_internal.cc | 2 ++
cpp/src/arrow/ipc/options.cc | 6 ++++--
cpp/src/arrow/ipc/options.h | 3 +++
cpp/src/arrow/ipc/reader.cc | 2 ++
cpp/src/arrow/util/compression.cc | 14 ++++++++++++++
cpp/src/arrow/util/compression.h | 11 +++++++++++
cpp/src/arrow/util/type_fwd.h | 3 ++-
cpp/src/generated/Message_generated.h | 13 ++++++++-----
format/Message.fbs | 5 ++++-
9 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index d2f2b20d1..f9a67134d 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -936,6 +936,8 @@ static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
codec = flatbuf::CompressionType::LZ4_FRAME;
} else if (options.codec->compression_type() == Compression::ZSTD) {
codec = flatbuf::CompressionType::ZSTD;
+ } else if (options.codec->compression_type() == Compression::CUSTOM) {
+ codec = flatbuf::CompressionType::CUSTOM;
} else {
return Status::Invalid("Unsupported IPC compression codec: ",
options.codec->name());
diff --git a/cpp/src/arrow/ipc/options.cc b/cpp/src/arrow/ipc/options.cc
index e5b14a47f..b038dd6e3 100644
--- a/cpp/src/arrow/ipc/options.cc
+++ b/cpp/src/arrow/ipc/options.cc
@@ -29,8 +29,10 @@ IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }
namespace internal {

Status CheckCompressionSupported(Compression::type codec) {
- if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
- return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
+ if (std::none_of(
+ kSupportedCodec.cbegin(), kSupportedCodec.cend(),
+ [&codec](const Compression::type& supported) { return codec == supported; })) {
+ return Status::Invalid("Only LZ4_FRAME, ZSTD and CUSTOM compression allowed");
}
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index 2af9d8e9c..037c2fe9c 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -159,6 +159,9 @@ struct ARROW_EXPORT IpcReadOptions {

namespace internal {

+static const std::vector<Compression::type> kSupportedCodec = {
+ Compression::LZ4_FRAME, Compression::ZSTD, Compression::CUSTOM};
+
Status CheckCompressionSupported(Compression::type codec);

} // namespace internal
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 7875cd3cd..74f4b2919 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -598,6 +598,8 @@ Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out)
*out = Compression::LZ4_FRAME;
} else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
*out = Compression::ZSTD;
+ } else if (compression->codec() == flatbuf::CompressionType::CUSTOM) {
+ *out = Compression::CUSTOM;
} else {
return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
}
diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc
index c67cb4539..a0da548bf 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -98,6 +98,8 @@ Result<Compression::type> Codec::GetCompressionType(const std::string& name) {
return Compression::ZSTD;
} else if (name == "bz2") {
return Compression::BZ2;
+ } else if (name == "custom") {
+ return Compression::CUSTOM;
} else {
return Status::Invalid("Unrecognized compression type: ", name);
}
@@ -201,6 +203,12 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
codec = internal::MakeBZ2Codec(compression_level);
#endif
break;
+ case Compression::CUSTOM:
+ if (codec_factory == nullptr) {
+ return Status::Invalid("Custom codec is not registered.");
+ }
+ codec = codec_factory(compression_level);
+ break;
default:
break;
}
@@ -254,10 +262,16 @@ bool Codec::IsAvailable(Compression::type codec_type) {
#else
return false;
#endif
+ case Compression::CUSTOM:
+ return codec_factory != nullptr;
default:
return false;
}
}

+void RegisterCustomCodec(const CodecFactory& factory) {
+ std::call_once(custom_codec_registered, [&factory]() { codec_factory = factory; });
+}
+
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 0832e82a6..b8767186b 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -18,8 +18,10 @@
#pragma once

#include <cstdint>
+#include <functional>
#include <limits>
#include <memory>
+#include <mutex>
#include <string>

#include "arrow/result.h"
@@ -198,5 +200,14 @@ class ARROW_EXPORT Codec {
virtual Status Init();
};

+typedef std::function<std::unique_ptr<Codec>(int compression_level)> CodecFactory;
+
+static CodecFactory codec_factory;
+static std::once_flag custom_codec_registered;
+
+/// Register a factory that is used to create user-defined codec.
+ARROW_EXPORT
+void RegisterCustomCodec(const CodecFactory& codec_factory);
+
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/type_fwd.h b/cpp/src/arrow/util/type_fwd.h
index ca107c2c6..d8f00c140 100644
--- a/cpp/src/arrow/util/type_fwd.h
+++ b/cpp/src/arrow/util/type_fwd.h
@@ -49,7 +49,8 @@ struct Compression {
LZ4_FRAME,
LZO,
BZ2,
- LZ4_HADOOP
+ LZ4_HADOOP,
+ CUSTOM
};
};

diff --git a/cpp/src/generated/Message_generated.h b/cpp/src/generated/Message_generated.h
index 1c51c6eaf..5308808c9 100644
--- a/cpp/src/generated/Message_generated.h
+++ b/cpp/src/generated/Message_generated.h
@@ -32,29 +32,32 @@ struct MessageBuilder;
enum class CompressionType : int8_t {
LZ4_FRAME = 0,
ZSTD = 1,
+ CUSTOM = 2,
MIN = LZ4_FRAME,
- MAX = ZSTD
+ MAX = CUSTOM
};

-inline const CompressionType (&EnumValuesCompressionType())[2] {
+inline const CompressionType (&EnumValuesCompressionType())[3] {
static const CompressionType values[] = {
CompressionType::LZ4_FRAME,
- CompressionType::ZSTD
+ CompressionType::ZSTD,
+ CompressionType::CUSTOM
};
return values;
}

inline const char * const *EnumNamesCompressionType() {
- static const char * const names[3] = {
+ static const char * const names[4] = {
"LZ4_FRAME",
"ZSTD",
+ "CUSTOM",
nullptr
};
return names;
}

inline const char *EnumNameCompressionType(CompressionType e) {
- if (flatbuffers::IsOutRange(e, CompressionType::LZ4_FRAME, CompressionType::ZSTD)) return "";
+ if (flatbuffers::IsOutRange(e, CompressionType::LZ4_FRAME, CompressionType::CUSTOM)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesCompressionType()[index];
}
diff --git a/format/Message.fbs b/format/Message.fbs
index 170ea8fbc..98a7653d6 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -49,7 +49,10 @@ enum CompressionType:byte {
LZ4_FRAME,

// Zstandard
- ZSTD
+ ZSTD,
+
+ // Pluggable custom codec
+ CUSTOM
}

/// Provided for forward compatibility in case we need to support different
--
2.25.1

22 changes: 22 additions & 0 deletions third_party/arrow_patches/memorypool.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 4a7671e15..1c38e8f95 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -181,12 +181,16 @@ class RecordBatchSerializer {
std::shared_ptr<Buffer>* out) {
// Convert buffer to uncompressed-length-prefixed compressed buffer
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
- ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length + sizeof(int64_t)));
+ ARROW_ASSIGN_OR_RAISE(
+ auto result,
+ AllocateResizableBuffer(maximum_length + sizeof(int64_t), options_.memory_pool));

int64_t actual_length;
ARROW_ASSIGN_OR_RAISE(actual_length,
codec->Compress(buffer.size(), buffer.data(), maximum_length,
result->mutable_data() + sizeof(int64_t)));
+ RETURN_NOT_OK(
+ result->Resize(actual_length + sizeof(int64_t), /* shrink_to_fit= */ true));
*reinterpret_cast<int64_t*>(result->mutable_data()) =
bit_util::ToLittleEndian(buffer.size());
*out = SliceBuffer(std::move(result), /*offset=*/0, actual_length + sizeof(int64_t));