From 0824c9578be17074dbc38b043600398d3d963847 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 29 Oct 2024 20:52:20 +0800 Subject: [PATCH] Support static link libhdfs3 --- CMakeLists.txt | 14 ++++ velox/CMakeLists.txt | 5 +- .../hive/storage_adapters/hdfs/CMakeLists.txt | 11 +++ .../storage_adapters/hdfs/HdfsFileSystem.cpp | 49 ++++++++++++ .../storage_adapters/hdfs/HdfsFileSystem.h | 2 + .../storage_adapters/hdfs/HdfsReadFile.cpp | 78 ++++++++++++++++++ .../hive/storage_adapters/hdfs/HdfsReadFile.h | 18 ++++- .../storage_adapters/hdfs/HdfsWriteFile.cpp | 79 +++++++++++++++++++ .../storage_adapters/hdfs/HdfsWriteFile.h | 32 +++++++- .../hdfs/RegisterHdfsFileSystem.cpp | 6 +- velox/external/hdfs/ArrowHdfsInternal.h | 2 +- .../external/hdfs/{hdfs.h => HdfsInternal.h} | 0 12 files changed, 289 insertions(+), 7 deletions(-) rename velox/external/hdfs/{hdfs.h => HdfsInternal.h} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7fd99b6dc744..12a052a5b578 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -113,6 +113,7 @@ option(VELOX_ENABLE_S3 "Build S3 Connector" OFF) option(VELOX_ENABLE_GCS "Build GCS Connector" OFF) option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF) option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF) +option(VELOX_ENABLE_HDFS3 "Build Hdfs3 Connector" OFF) option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) @@ -266,6 +267,19 @@ if(VELOX_ENABLE_HDFS) set(VELOX_ENABLE_ARROW ON) endif() +if(VELOX_ENABLE_HDFS3) + find_package(libhdfs3) + if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) + set(LIBHDFS3 HDFS::hdfs3) + else() + find_library( + LIBHDFS3 + NAMES libhdfs3.so libhdfs3.dylib + HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) + endif() + add_definitions(-DVELOX_ENABLE_HDFS3) +endif() + if(VELOX_ENABLE_PARQUET) add_definitions(-DVELOX_ENABLE_PARQUET) # Native Parquet reader requires Apache Thrift and Arrow Parquet writer, which diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 06ae8bf1c053..f1694f17bfa6 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -24,7 +24,10 @@ add_subdirectory(row) add_subdirectory(flag_definitions) add_subdirectory(external/date) add_subdirectory(external/md5) -add_subdirectory(external/hdfs) + +if(${VELOX_ENABLE_HDFS}) + add_subdirectory(external/hdfs) +endif() # # examples depend on expression diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt index 44aa7be3489c..de382770382c 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt @@ -34,3 +34,14 @@ if(VELOX_ENABLE_HDFS) add_subdirectory(tests) endif() endif() + +if(VELOX_ENABLE_HDFS3) + velox_sources( + velox_hdfs + PRIVATE + HdfsFileSystem.cpp + HdfsReadFile.cpp + HdfsWriteFile.cpp) + velox_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd) + +endif() diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index 9d3f6e30a67a..86cfcca6fb39 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -18,7 +18,13 @@ #include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" +#ifdef VELOX_ENABLE_HDFS #include "velox/external/hdfs/ArrowHdfsInternal.h" +#endif + +#ifdef VELOX_ENABLE_HDFS3 +#include +#endif namespace facebook::velox::filesystems { std::string_view HdfsFileSystem::kScheme("hdfs://"); @@ -29,6 +35,7 @@ class HdfsFileSystem::Impl { explicit Impl( const config::ConfigBase* config, const HdfsServiceEndpoint& endpoint) { +#ifdef VELOX_ENABLE_HDFS auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&driver_); if (!status.ok()) { LOG(ERROR) << "ConnectLibHdfs failed "; @@ -45,28 +52,58 @@ class HdfsFileSystem::Impl { "Unable to connect to HDFS: {}, got error: {}.", endpoint.identity(), driver_->GetLastExceptionRootCause()); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + auto builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, endpoint.host.c_str()); + hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + hdfsClient_ = hdfsBuilderConnect(builder); + hdfsFreeBuilder(builder); + VELOX_CHECK_NOT_NULL( + hdfsClient_, + "Unable to connect to HDFS: {}, got error: {}.", + endpoint.identity(), + hdfsGetLastError()); +#endif } ~Impl() { LOG(INFO) << "Disconnecting HDFS file system"; +#ifdef VELOX_ENABLE_HDFS int disconnectResult = driver_->Disconnect(hdfsClient_); if (disconnectResult != 0) { LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: " << errno; } + +#endif + +#ifdef VELOX_ENABLE_HDFS3 + int disconnectResult = hdfsDisconnect(hdfsClient_); + if (disconnectResult != 0) { + LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: " + << errno; + } + +#endif } hdfsFS hdfsClient() { return hdfsClient_; } +#ifdef VELOX_ENABLE_HDFS filesystems::arrow::io::internal::LibHdfsShim* hdfsShim() { return driver_; } +#endif private: hdfsFS hdfsClient_; +#ifdef VELOX_ENABLE_HDFS filesystems::arrow::io::internal::LibHdfsShim* driver_; +#endif }; HdfsFileSystem::HdfsFileSystem( @@ -90,15 +127,27 @@ std::unique_ptr HdfsFileSystem::openFileForRead( path.remove_prefix(index); } +#ifdef VELOX_ENABLE_HDFS return std::make_unique( impl_->hdfsShim(), impl_->hdfsClient(), path); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + return std::make_unique(impl_->hdfsClient(), path); +#endif } std::unique_ptr HdfsFileSystem::openFileForWrite( std::string_view path, const FileOptions& /*unused*/) { +#ifdef VELOX_ENABLE_HDFS return std::make_unique( impl_->hdfsShim(), impl_->hdfsClient(), path); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + return std::make_unique(impl_->hdfsClient(), path); +#endif } bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index d2dbbf17afb5..9b63bf514c50 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -17,9 +17,11 @@ namespace facebook::velox::filesystems { +#ifdef VELOX_ENABLE_HDFS namespace arrow::io::internal { class LibHdfsShim; } +#endif struct HdfsServiceEndpoint { HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort) diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index d48dd373d344..acbd589ea9c4 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -16,10 +16,17 @@ #include "HdfsReadFile.h" #include +#ifdef VELOX_ENABLE_HDFS #include "velox/external/hdfs/ArrowHdfsInternal.h" +#endif + +#ifdef VELOX_ENABLE_HDFS3 +#include +#endif namespace facebook::velox { +#ifdef VELOX_ENABLE_HDFS struct HdfsFile { filesystems::arrow::io::internal::LibHdfsShim* driver_; hdfsFS client_; @@ -60,7 +67,47 @@ struct HdfsFile { return bytesRead; } }; +#endif + +#ifdef VELOX_ENABLE_HDFS3 +struct HdfsFile { + hdfsFS client_; + hdfsFile handle_; + + HdfsFile() : client_(nullptr), handle_(nullptr) {} + ~HdfsFile() { + if (handle_ && hdfsCloseFile(client_, handle_) == -1) { + LOG(ERROR) << "Unable to close file, errno: " << errno; + } + } + + void open(hdfsFS client, const std::string& path) { + client_ = client; + handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0); + VELOX_CHECK_NOT_NULL( + handle_, + "Unable to open file {}. got error: {}", + path, + hdfsGetLastError()); + } + + void seek(uint64_t offset) const { + VELOX_CHECK_EQ( + hdfsSeek(client_, handle_, offset), + 0, + "Cannot seek through HDFS file, error is : {}", + std::string(hdfsGetLastError())); + } + + int32_t read(char* pos, uint64_t length) const { + auto bytesRead = hdfsRead(client_, handle_, pos, length); + VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal."); + return bytesRead; + } +}; +#endif +#ifdef VELOX_ENABLE_HDFS HdfsReadFile::HdfsReadFile( filesystems::arrow::io::internal::LibHdfsShim* driver, hdfsFS hdfs, @@ -80,12 +127,37 @@ HdfsReadFile::HdfsReadFile( VELOX_FAIL(errMsg); } } +#endif + +#ifdef VELOX_ENABLE_HDFS3 +HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) + : hdfsClient_(hdfs), filePath_(path) { + fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data()); + if (fileInfo_ == nullptr) { + auto error = hdfsGetLastError(); + auto errMsg = fmt::format( + "Unable to get file path info for file: {}. got error: {}", + filePath_, + error); + if (std::strstr(error, "FileNotFoundException") != nullptr) { + VELOX_FILE_NOT_FOUND_ERROR(errMsg); + } + VELOX_FAIL(errMsg); + } +} +#endif HdfsReadFile::~HdfsReadFile() { +#ifdef VELOX_ENABLE_HDFS // should call hdfsFreeFileInfo to avoid memory leak if (fileInfo_) { driver_->FreeFileInfo(fileInfo_, 1); } +#endif + +#ifdef VELOX_ENABLE_HDFS3 + hdfsFreeFileInfo(fileInfo_, 1); +#endif } void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) @@ -93,7 +165,13 @@ void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) checkFileReadParameters(offset, length); folly::ThreadLocal file; if (!file->handle_) { +#ifdef VELOX_ENABLE_HDFS file->open(driver_, hdfsClient_, filePath_); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + file->open(hdfsClient_, filePath_); +#endif } file->seek(offset); uint64_t totalBytesRead = 0; diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index b63c2dd933dd..27a980d518dd 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -15,23 +15,37 @@ */ #include "velox/common/file/File.h" -#include "velox/external/hdfs/hdfs.h" +#ifdef VELOX_ENABLE_HDFS +#include "velox/external/hdfs/HdfsInternal.h" +#endif + +#ifdef VELOX_ENABLE_HDFS3 +#include +#endif namespace facebook::velox { +#ifdef VELOX_ENABLE_HDFS namespace filesystems::arrow::io::internal { class LibHdfsShim; } +#endif /** * Implementation of hdfs read file. */ class HdfsReadFile final : public ReadFile { public: +#ifdef VELOX_ENABLE_HDFS explicit HdfsReadFile( filesystems::arrow::io::internal::LibHdfsShim* driver, hdfsFS hdfs, std::string_view path); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + explicit HdfsReadFile(hdfsFS hdfs, std::string_view path); +#endif ~HdfsReadFile() override; std::string_view pread(uint64_t offset, uint64_t length, void* buf) @@ -57,7 +71,9 @@ class HdfsReadFile final : public ReadFile { void preadInternal(uint64_t offset, uint64_t length, char* pos) const; void checkFileReadParameters(uint64_t offset, uint64_t length) const; +#ifdef VELOX_ENABLE_HDFS filesystems::arrow::io::internal::LibHdfsShim* driver_; +#endif hdfsFS hdfsClient_; hdfsFileInfo* fileInfo_; std::string filePath_; diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp index be668a3133e1..d0205a4bc980 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp @@ -15,9 +15,16 @@ */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" +#ifdef VELOX_ENABLE_HDFS #include "velox/external/hdfs/ArrowHdfsInternal.h" +#endif + +#ifdef VELOX_ENABLE_HDFS3 +#include +#endif namespace facebook::velox { +#ifdef VELOX_ENABLE_HDFS HdfsWriteFile::HdfsWriteFile( filesystems::arrow::io::internal::LibHdfsShim* driver, hdfsFS hdfsClient, @@ -45,6 +52,36 @@ HdfsWriteFile::HdfsWriteFile( filePath_, driver_->GetLastExceptionRootCause()); } +#endif + +#ifdef VELOX_ENABLE_HDFS3 +HdfsWriteFile::HdfsWriteFile( + hdfsFS hdfsClient, + std::string_view path, + int bufferSize, + short replication, + int blockSize) + : hdfsClient_(hdfsClient), filePath_(path) { + auto pos = filePath_.rfind("/"); + auto parentDir = filePath_.substr(0, pos + 1); + if (hdfsExists(hdfsClient_, parentDir.c_str()) == -1) { + hdfsCreateDirectory(hdfsClient_, parentDir.c_str()); + } + + hdfsFile_ = hdfsOpenFile( + hdfsClient_, + filePath_.c_str(), + O_WRONLY, + bufferSize, + replication, + blockSize); + VELOX_CHECK_NOT_NULL( + hdfsFile_, + "Failed to open hdfs file: {}, with error: {}", + filePath_, + std::string(hdfsGetLastError())); +} +#endif HdfsWriteFile::~HdfsWriteFile() { if (hdfsFile_) { @@ -53,12 +90,24 @@ HdfsWriteFile::~HdfsWriteFile() { } void HdfsWriteFile::close() { +#ifdef VELOX_ENABLE_HDFS int success = driver_->CloseFile(hdfsClient_, hdfsFile_); VELOX_CHECK_EQ( success, 0, "Failed to close hdfs file: {}", driver_->GetLastExceptionRootCause()); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + int success = hdfsCloseFile(hdfsClient_, hdfsFile_); + VELOX_CHECK_EQ( + success, + 0, + "Failed to close hdfs file: {}", + std::string(hdfsGetLastError())); +#endif + hdfsFile_ = nullptr; } @@ -67,9 +116,17 @@ void HdfsWriteFile::flush() { hdfsFile_, "Cannot flush HDFS file because file handle is null, file path: {}", filePath_); +#ifdef VELOX_ENABLE_HDFS int success = driver_->Flush(hdfsClient_, hdfsFile_); VELOX_CHECK_EQ( success, 0, "Hdfs flush error: {}", driver_->GetLastExceptionRootCause()); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + int success = hdfsFlush(hdfsClient_, hdfsFile_); + VELOX_CHECK_EQ( + success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError())); +#endif } void HdfsWriteFile::append(std::string_view data) { @@ -80,6 +137,7 @@ void HdfsWriteFile::append(std::string_view data) { hdfsFile_, "Cannot append to HDFS file because file handle is null, file path: {}", filePath_); +#ifdef VELOX_ENABLE_HDFS int64_t totalWrittenBytes = driver_->Write( hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); VELOX_CHECK_EQ( @@ -87,13 +145,34 @@ void HdfsWriteFile::append(std::string_view data) { data.size(), "Write failure in HDFSWriteFile::append {}", driver_->GetLastExceptionRootCause()); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + int64_t totalWrittenBytes = + hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); + VELOX_CHECK_EQ( + totalWrittenBytes, + data.size(), + "Write failure in HDFSWriteFile::append {}", + std::string(hdfsGetLastError())); +#endif } uint64_t HdfsWriteFile::size() const { +#ifdef VELOX_ENABLE_HDFS auto fileInfo = driver_->GetPathInfo(hdfsClient_, filePath_.c_str()); uint64_t size = fileInfo->mSize; // should call hdfsFreeFileInfo to avoid memory leak driver_->FreeFileInfo(fileInfo, 1); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + auto fileInfo = hdfsGetPathInfo(hdfsClient_, filePath_.c_str()); + uint64_t size = fileInfo->mSize; + // should call hdfsFreeFileInfo to avoid memory leak + hdfsFreeFileInfo(fileInfo, 1); +#endif + return size; } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h index fb311b1a6c3d..f4087b77d48d 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h @@ -16,18 +16,27 @@ #pragma once #include "velox/common/file/File.h" -#include "velox/external/hdfs/hdfs.h" +#ifdef VELOX_ENABLE_HDFS +#include "velox/external/hdfs/HdfsInternal.h" +#endif + +#ifdef VELOX_ENABLE_HDFS3 +#include +#endif namespace facebook::velox { +#ifdef VELOX_ENABLE_HDFS namespace filesystems::arrow::io::internal { class LibHdfsShim; } +#endif /// Implementation of hdfs write file. Nothing written to the file should be /// read back until it is closed. class HdfsWriteFile : public WriteFile { public: +#ifdef VELOX_ENABLE_HDFS /// The constructor. /// @param hdfsClient The configured hdfs filesystem handle. /// @param path The file path to write. @@ -44,6 +53,25 @@ class HdfsWriteFile : public WriteFile { int bufferSize = 0, short replication = 0, int blockSize = 0); +#endif + +#ifdef VELOX_ENABLE_HDFS3 + /// The constructor. + /// @param hdfsClient The configured hdfs filesystem handle. + /// @param path The file path to write. + /// @param bufferSize Size of buffer for write - pass 0 if you want + /// to use the default configured values. + /// @param replication Block replication - pass 0 if you want to use + /// the default configured values. + /// @param blockSize Size of block - pass 0 if you want to use the + /// default configured values. + HdfsWriteFile( + hdfsFS hdfsClient, + std::string_view path, + int bufferSize = 0, + short replication = 0, + int blockSize = 0); +#endif ~HdfsWriteFile() override; @@ -60,7 +88,9 @@ class HdfsWriteFile : public WriteFile { void close() override; private: +#ifdef VELOX_ENABLE_HDFS filesystems::arrow::io::internal::LibHdfsShim* driver_; +#endif /// The configured hdfs filesystem handle. hdfsFS hdfsClient_; /// The hdfs file handle for write. diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp index 1f23179f0a72..756b3f9bce68 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#ifdef VELOX_ENABLE_HDFS +#if defined(VELOX_ENABLE_HDFS3) || defined(VELOX_ENABLE_HDFS) #include "folly/concurrency/ConcurrentHashMap.h" #include "velox/common/config/Config.h" @@ -25,7 +25,7 @@ namespace facebook::velox::filesystems { -#ifdef VELOX_ENABLE_HDFS +#if defined(VELOX_ENABLE_HDFS3) || defined(VELOX_ENABLE_HDFS) std::mutex mtx; std::function #include -#include "hdfs.h" +#include "HdfsInternal.h" #include diff --git a/velox/external/hdfs/hdfs.h b/velox/external/hdfs/HdfsInternal.h similarity index 100% rename from velox/external/hdfs/hdfs.h rename to velox/external/hdfs/HdfsInternal.h