Skip to content

Commit

Permalink
Support static link libhdfs3
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 13, 2024
1 parent 73cba3d commit 0824c95
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 7 deletions.
14 changes: 14 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ option(VELOX_ENABLE_S3 "Build S3 Connector" OFF)
option(VELOX_ENABLE_GCS "Build GCS Connector" OFF)
option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF)
option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF)
option(VELOX_ENABLE_HDFS3 "Build Hdfs3 Connector" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
Expand Down Expand Up @@ -266,6 +267,19 @@ if(VELOX_ENABLE_HDFS)
set(VELOX_ENABLE_ARROW ON)
endif()

if(VELOX_ENABLE_HDFS3)
find_package(libhdfs3)
if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
set(LIBHDFS3 HDFS::hdfs3)
else()
find_library(
LIBHDFS3
NAMES libhdfs3.so libhdfs3.dylib
HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
endif()
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()

if(VELOX_ENABLE_PARQUET)
add_definitions(-DVELOX_ENABLE_PARQUET)
# Native Parquet reader requires Apache Thrift and Arrow Parquet writer, which
Expand Down
5 changes: 4 additions & 1 deletion velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ add_subdirectory(row)
add_subdirectory(flag_definitions)
add_subdirectory(external/date)
add_subdirectory(external/md5)
add_subdirectory(external/hdfs)

if(${VELOX_ENABLE_HDFS})
add_subdirectory(external/hdfs)
endif()
#

# examples depend on expression
Expand Down
11 changes: 11 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,14 @@ if(VELOX_ENABLE_HDFS)
add_subdirectory(tests)
endif()
endif()

if(VELOX_ENABLE_HDFS3)
velox_sources(
velox_hdfs
PRIVATE
HdfsFileSystem.cpp
HdfsReadFile.cpp
HdfsWriteFile.cpp)
velox_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd)

endif()
49 changes: 49 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#ifdef VELOX_ENABLE_HDFS
#include "velox/external/hdfs/ArrowHdfsInternal.h"
#endif

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#endif

namespace facebook::velox::filesystems {
std::string_view HdfsFileSystem::kScheme("hdfs://");
Expand All @@ -29,6 +35,7 @@ class HdfsFileSystem::Impl {
explicit Impl(
const config::ConfigBase* config,
const HdfsServiceEndpoint& endpoint) {
#ifdef VELOX_ENABLE_HDFS
auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&driver_);
if (!status.ok()) {
LOG(ERROR) << "ConnectLibHdfs failed ";
Expand All @@ -45,28 +52,58 @@ class HdfsFileSystem::Impl {
"Unable to connect to HDFS: {}, got error: {}.",
endpoint.identity(),
driver_->GetLastExceptionRootCause());
#endif

#ifdef VELOX_ENABLE_HDFS3
auto builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(builder, endpoint.host.c_str());
hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
hdfsClient_ = hdfsBuilderConnect(builder);
hdfsFreeBuilder(builder);
VELOX_CHECK_NOT_NULL(
hdfsClient_,
"Unable to connect to HDFS: {}, got error: {}.",
endpoint.identity(),
hdfsGetLastError());
#endif
}

~Impl() {
LOG(INFO) << "Disconnecting HDFS file system";
#ifdef VELOX_ENABLE_HDFS
int disconnectResult = driver_->Disconnect(hdfsClient_);
if (disconnectResult != 0) {
LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: "
<< errno;
}

#endif

#ifdef VELOX_ENABLE_HDFS3
int disconnectResult = hdfsDisconnect(hdfsClient_);
if (disconnectResult != 0) {
LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: "
<< errno;
}

#endif
}

hdfsFS hdfsClient() {
return hdfsClient_;
}

#ifdef VELOX_ENABLE_HDFS
filesystems::arrow::io::internal::LibHdfsShim* hdfsShim() {
return driver_;
}
#endif

private:
hdfsFS hdfsClient_;
#ifdef VELOX_ENABLE_HDFS
filesystems::arrow::io::internal::LibHdfsShim* driver_;
#endif
};

HdfsFileSystem::HdfsFileSystem(
Expand All @@ -90,15 +127,27 @@ std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
path.remove_prefix(index);
}

#ifdef VELOX_ENABLE_HDFS
return std::make_unique<HdfsReadFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
#endif

#ifdef VELOX_ENABLE_HDFS3
return std::make_unique<HdfsReadFile>(impl_->hdfsClient(), path);
#endif
}

std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
#ifdef VELOX_ENABLE_HDFS
return std::make_unique<HdfsWriteFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
#endif

#ifdef VELOX_ENABLE_HDFS3
return std::make_unique<HdfsWriteFile>(impl_->hdfsClient(), path);
#endif
}

bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) {
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

namespace facebook::velox::filesystems {

#ifdef VELOX_ENABLE_HDFS
namespace arrow::io::internal {
class LibHdfsShim;
}
#endif

struct HdfsServiceEndpoint {
HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort)
Expand Down
78 changes: 78 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>
#ifdef VELOX_ENABLE_HDFS
#include "velox/external/hdfs/ArrowHdfsInternal.h"
#endif

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#endif

namespace facebook::velox {

#ifdef VELOX_ENABLE_HDFS
struct HdfsFile {
filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS client_;
Expand Down Expand Up @@ -60,7 +67,47 @@ struct HdfsFile {
return bytesRead;
}
};
#endif

#ifdef VELOX_ENABLE_HDFS3
struct HdfsFile {
hdfsFS client_;
hdfsFile handle_;

HdfsFile() : client_(nullptr), handle_(nullptr) {}
~HdfsFile() {
if (handle_ && hdfsCloseFile(client_, handle_) == -1) {
LOG(ERROR) << "Unable to close file, errno: " << errno;
}
}

void open(hdfsFS client, const std::string& path) {
client_ = client;
handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
path,
hdfsGetLastError());
}

void seek(uint64_t offset) const {
VELOX_CHECK_EQ(
hdfsSeek(client_, handle_, offset),
0,
"Cannot seek through HDFS file, error is : {}",
std::string(hdfsGetLastError()));
}

int32_t read(char* pos, uint64_t length) const {
auto bytesRead = hdfsRead(client_, handle_, pos, length);
VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.");
return bytesRead;
}
};
#endif

#ifdef VELOX_ENABLE_HDFS
HdfsReadFile::HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
Expand All @@ -80,20 +127,51 @@ HdfsReadFile::HdfsReadFile(
VELOX_FAIL(errMsg);
}
}
#endif

#ifdef VELOX_ENABLE_HDFS3
HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
: hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
auto error = hdfsGetLastError();
auto errMsg = fmt::format(
"Unable to get file path info for file: {}. got error: {}",
filePath_,
error);
if (std::strstr(error, "FileNotFoundException") != nullptr) {
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
}
VELOX_FAIL(errMsg);
}
}
#endif

HdfsReadFile::~HdfsReadFile() {
#ifdef VELOX_ENABLE_HDFS
// should call hdfsFreeFileInfo to avoid memory leak
if (fileInfo_) {
driver_->FreeFileInfo(fileInfo_, 1);
}
#endif

#ifdef VELOX_ENABLE_HDFS3
hdfsFreeFileInfo(fileInfo_, 1);
#endif
}

void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
folly::ThreadLocal<HdfsFile> file;
if (!file->handle_) {
#ifdef VELOX_ENABLE_HDFS
file->open(driver_, hdfsClient_, filePath_);
#endif

#ifdef VELOX_ENABLE_HDFS3
file->open(hdfsClient_, filePath_);
#endif
}
file->seek(offset);
uint64_t totalBytesRead = 0;
Expand Down
18 changes: 17 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,37 @@
*/

#include "velox/common/file/File.h"
#include "velox/external/hdfs/hdfs.h"
#ifdef VELOX_ENABLE_HDFS
#include "velox/external/hdfs/HdfsInternal.h"
#endif

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#endif

namespace facebook::velox {

#ifdef VELOX_ENABLE_HDFS
namespace filesystems::arrow::io::internal {
class LibHdfsShim;
}
#endif

/**
* Implementation of hdfs read file.
*/
class HdfsReadFile final : public ReadFile {
public:
#ifdef VELOX_ENABLE_HDFS
explicit HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
std::string_view path);
#endif

#ifdef VELOX_ENABLE_HDFS3
explicit HdfsReadFile(hdfsFS hdfs, std::string_view path);
#endif
~HdfsReadFile() override;

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
Expand All @@ -57,7 +71,9 @@ class HdfsReadFile final : public ReadFile {
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;
void checkFileReadParameters(uint64_t offset, uint64_t length) const;

#ifdef VELOX_ENABLE_HDFS
filesystems::arrow::io::internal::LibHdfsShim* driver_;
#endif
hdfsFS hdfsClient_;
hdfsFileInfo* fileInfo_;
std::string filePath_;
Expand Down
Loading

0 comments on commit 0824c95

Please sign in to comment.