diff --git a/.github/workflows/linux-build.yml b/.github/workflows/linux-build.yml index 1ca5cce9c7ecf..a3b2f7b92e73c 100644 --- a/.github/workflows/linux-build.yml +++ b/.github/workflows/linux-build.yml @@ -134,6 +134,7 @@ jobs: LIBHDFS3_CONF: "${{ github.workspace }}/scripts/hdfs-client.xml" working-directory: _build/release run: | + export CLASSPATH=`/usr/local/hadoop/bin/hdfs classpath --glob` ctest -j 8 --output-on-failure --no-tests=error ubuntu-debug: diff --git a/CMakeLists.txt b/CMakeLists.txt index 8b0d3a67c21c1..2ff1428a47bff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -250,11 +250,9 @@ if(VELOX_ENABLE_ABFS) endif() if(VELOX_ENABLE_HDFS) - find_library( - LIBHDFS3 - NAMES libhdfs3.so libhdfs3.dylib - HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) - add_definitions(-DVELOX_ENABLE_HDFS3) + add_definitions(-DVELOX_ENABLE_HDFS) + # JVM libhdfs requires arrow dependency. + set(VELOX_ENABLE_ARROW ON) endif() if(VELOX_ENABLE_PARQUET) diff --git a/NOTICE.txt b/NOTICE.txt index 58655beb3ca76..8b812aa41ab2e 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -9,3 +9,103 @@ This product includes software from the QT project (BSD, 3-clause). This product includes software from HowardHinnant's date library (MIT License). * https://github.com/HowardHinnant/date/tree/master + +This product includes software from the The Arrow project. +* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.h +* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.cc +Which contain the following NOTICE file: +------- + Apache Arrow + Copyright 2016-2024 The Apache Software Foundation + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + This product includes software from the SFrame project (BSD, 3-clause). + * Copyright (C) 2015 Dato, Inc. + * Copyright (c) 2009 Carnegie Mellon University. + This product includes software from the Feather project (Apache 2.0) + https://github.com/wesm/feather + This product includes software from the DyND project (BSD 2-clause) + https://github.com/libdynd + This product includes software from the LLVM project + * distributed under the University of Illinois Open Source + This product includes software from the google-lint project + * Copyright (c) 2009 Google Inc. All rights reserved. + This product includes software from the mman-win32 project + * Copyright https://code.google.com/p/mman-win32/ + * Licensed under the MIT License; + This product includes software from the LevelDB project + * Copyright (c) 2011 The LevelDB Authors. All rights reserved. + * Use of this source code is governed by a BSD-style license that can be + * Moved from Kudu http://github.com/cloudera/kudu + This product includes software from the CMake project + * Copyright 2001-2009 Kitware, Inc. + * Copyright 2012-2014 Continuum Analytics, Inc. + * All rights reserved. + This product includes software from https://github.com/matthew-brett/multibuild (BSD 2-clause) + * Copyright (c) 2013-2016, Matt Terry and Matthew Brett; all rights reserved. + This product includes software from the Ibis project (Apache 2.0) + * Copyright (c) 2015 Cloudera, Inc. + * https://github.com/cloudera/ibis + This product includes software from Dremio (Apache 2.0) + * Copyright (C) 2017-2018 Dremio Corporation + * https://github.com/dremio/dremio-oss + This product includes software from Google Guava (Apache 2.0) + * Copyright (C) 2007 The Guava Authors + * https://github.com/google/guava + This product include software from CMake (BSD 3-Clause) + * CMake - Cross Platform Makefile Generator + * Copyright 2000-2019 Kitware, Inc. and Contributors + The web site includes files generated by Jekyll. + -------------------------------------------------------------------------------- + This product includes code from Apache Kudu, which includes the following in + its NOTICE file: + Apache Kudu + Copyright 2016 The Apache Software Foundation + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + Portions of this software were developed at + Cloudera, Inc (http://www.cloudera.com/). + -------------------------------------------------------------------------------- + This product includes code from Apache ORC, which includes the following in + its NOTICE file: + Apache ORC + Copyright 2013-2019 The Apache Software Foundation + This product includes software developed by The Apache Software + Foundation (http://www.apache.org/). + This product includes software developed by Hewlett-Packard: + (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P +------- + +This product includes software from the The Hadoop project. +* https://github.com/apache/hadoop/blob/release-3.3.0-RC0/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h +Which contains the following NOTICE file: +---- + Apache Hadoop + Copyright 2006 and onwards The Apache Software Foundation. + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + Export Control Notice + --------------------- + This distribution includes cryptographic software. The country in + which you currently reside may have restrictions on the import, + possession, use, and/or re-export to another country, of + encryption software. BEFORE using any encryption software, please + check your country's laws, regulations and policies concerning the + import, possession, or use, and re-export of encryption software, to + see if this is permitted. See for more + information. + The U.S. Government Department of Commerce, Bureau of Industry and + Security (BIS), has classified this software as Export Commodity + Control Number (ECCN) 5D002.C.1, which includes information security + software using or performing cryptographic functions with asymmetric + algorithms. The form and manner of this Apache Software Foundation + distribution makes it eligible for export under the License Exception + ENC Technology Software Unrestricted (TSU) exception (see the BIS + Export Administration Regulations, Section 740.13) for both object + code and source code. + The following provides more details on the included cryptographic software: + This software uses the SSL libraries from the Jetty project written + by mortbay.org. + Hadoop Yarn Server Web Proxy uses the BouncyCastle Java + cryptography APIs written by the Legion of the Bouncy Castle Inc. +---- diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 00c969ccce7b9..06ae8bf1c0533 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -24,6 +24,7 @@ add_subdirectory(row) add_subdirectory(flag_definitions) add_subdirectory(external/date) add_subdirectory(external/md5) +add_subdirectory(external/hdfs) # # examples depend on expression diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt index 6c1e84aec4040..44aa7be3489ca 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt @@ -23,7 +23,12 @@ if(VELOX_ENABLE_HDFS) HdfsFileSystem.cpp HdfsReadFile.cpp HdfsWriteFile.cpp) - velox_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd) + velox_link_libraries( + velox_hdfs + velox_external_hdfs + velox_dwio_common + Folly::folly + xsimd) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index 4e7b9ddc0ec52..4c52f486017fa 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -14,7 +14,6 @@ * limitations under the License. */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" -#include #include #include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" @@ -29,21 +28,27 @@ class HdfsFileSystem::Impl { explicit Impl( const config::ConfigBase* config, const HdfsServiceEndpoint& endpoint) { - auto builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, endpoint.host.c_str()); - hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data())); - hdfsClient_ = hdfsBuilderConnect(builder); - hdfsFreeBuilder(builder); + auto status = ConnectLibHdfs(&driver_); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver_->NewBuilder(); + driver_->BuilderSetNameNode(builder, endpoint.host.c_str()); + driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + driver_->BuilderSetForceNewInstance(builder); + hdfsClient_ = driver_->BuilderConnect(builder); VELOX_CHECK_NOT_NULL( hdfsClient_, "Unable to connect to HDFS: {}, got error: {}.", endpoint.identity(), - hdfsGetLastError()); + driver_->GetLastExceptionRootCause()); } ~Impl() { LOG(INFO) << "Disconnecting HDFS file system"; - int disconnectResult = hdfsDisconnect(hdfsClient_); + int disconnectResult = driver_->Disconnect(hdfsClient_); if (disconnectResult != 0) { LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: " << errno; @@ -54,8 +59,13 @@ class HdfsFileSystem::Impl { return hdfsClient_; } + LibHdfsShim* hdfsShim() { + return driver_; + } + private: hdfsFS hdfsClient_; + LibHdfsShim* driver_; }; HdfsFileSystem::HdfsFileSystem( @@ -79,13 +89,15 @@ std::unique_ptr HdfsFileSystem::openFileForRead( path.remove_prefix(index); } - return std::make_unique(impl_->hdfsClient(), path); + return std::make_unique( + impl_->hdfsShim(), impl_->hdfsClient(), path); } std::unique_ptr HdfsFileSystem::openFileForWrite( std::string_view path, const FileOptions& /*unused*/) { - return std::make_unique(impl_->hdfsClient(), path); + return std::make_unique( + impl_->hdfsShim(), impl_->hdfsClient(), path); } bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index 25602a470f169..a1ff04bfeb500 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -14,8 +14,12 @@ * limitations under the License. */ #include "velox/common/file/FileSystems.h" +#include "velox/external/hdfs/ArrowHdfsInternal.h" namespace facebook::velox::filesystems { + +using filesystems::arrow::io::internal::LibHdfsShim; + struct HdfsServiceEndpoint { HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort) : host(hdfsHost), port(hdfsPort) {} diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index dedc2bb4a4c9a..9cd32ae0eb995 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -16,20 +16,23 @@ #include "HdfsReadFile.h" #include -#include namespace facebook::velox { -HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) - : hdfsClient_(hdfs), filePath_(path) { - fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data()); +HdfsReadFile::HdfsReadFile( + LibHdfsShim* driver, + hdfsFS hdfs, + const std::string_view path) + : driver_(driver), hdfsClient_(hdfs), filePath_(path) { + fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data()); if (fileInfo_ == nullptr) { - auto error = hdfsGetLastError(); + auto error = fmt::format( + "FileNotFoundException: Path {} does not exist.", filePath_); auto errMsg = fmt::format( "Unable to get file path info for file: {}. got error: {}", filePath_, error); - if (std::strstr(error, "FileNotFoundException") != nullptr) { + if (error.find("FileNotFoundException") != std::string::npos) { VELOX_FILE_NOT_FOUND_ERROR(errMsg); } VELOX_FAIL(errMsg); @@ -38,14 +41,16 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) HdfsReadFile::~HdfsReadFile() { // should call hdfsFreeFileInfo to avoid memory leak - hdfsFreeFileInfo(fileInfo_, 1); + if (fileInfo_) { + driver_->FreeFileInfo(fileInfo_, 1); + } } void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) const { checkFileReadParameters(offset, length); if (!file_->handle_) { - file_->open(hdfsClient_, filePath_); + file_->open(driver_, hdfsClient_, filePath_); } file_->seek(offset); uint64_t totalBytesRead = 0; diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index 1d531956f0eee..c16c2ef4462d4 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -14,42 +14,47 @@ * limitations under the License. */ -#include #include "velox/common/file/File.h" +#include "velox/external/hdfs/ArrowHdfsInternal.h" +#include "velox/external/hdfs/hdfs.h" namespace facebook::velox { +using filesystems::arrow::io::internal::LibHdfsShim; + struct HdfsFile { + LibHdfsShim* driver_; hdfsFS client_; hdfsFile handle_; - HdfsFile() : client_(nullptr), handle_(nullptr) {} + HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {} ~HdfsFile() { - if (handle_ && hdfsCloseFile(client_, handle_) == -1) { + if (handle_ && driver_->CloseFile(client_, handle_) == -1) { LOG(ERROR) << "Unable to close file, errno: " << errno; } } - void open(hdfsFS client, const std::string& path) { + void open(LibHdfsShim* driver, hdfsFS client, const std::string& path) { + driver_ = driver; client_ = client; - handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0); + handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0); VELOX_CHECK_NOT_NULL( handle_, "Unable to open file {}. got error: {}", path, - hdfsGetLastError()); + driver_->GetLastExceptionRootCause()); } void seek(uint64_t offset) const { VELOX_CHECK_EQ( - hdfsSeek(client_, handle_, offset), + driver_->Seek(client_, handle_, offset), 0, "Cannot seek through HDFS file, error is : {}", - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); } int32_t read(char* pos, uint64_t length) const { - auto bytesRead = hdfsRead(client_, handle_, pos, length); + auto bytesRead = driver_->Read(client_, handle_, pos, length); VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal."); return bytesRead; } @@ -60,7 +65,10 @@ struct HdfsFile { */ class HdfsReadFile final : public ReadFile { public: - explicit HdfsReadFile(hdfsFS hdfs, std::string_view path); + explicit HdfsReadFile( + LibHdfsShim* driver, + hdfsFS hdfs, + std::string_view path); ~HdfsReadFile() override; std::string_view pread(uint64_t offset, uint64_t length, void* buf) @@ -86,6 +94,7 @@ class HdfsReadFile final : public ReadFile { void preadInternal(uint64_t offset, uint64_t length, char* pos) const; void checkFileReadParameters(uint64_t offset, uint64_t length) const; + LibHdfsShim* driver_; hdfsFS hdfsClient_; hdfsFileInfo* fileInfo_; std::string filePath_; diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp index 60f98a88c972b..4a7c5d50cf886 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp @@ -15,23 +15,23 @@ */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" -#include namespace facebook::velox { HdfsWriteFile::HdfsWriteFile( + LibHdfsShim* driver, hdfsFS hdfsClient, std::string_view path, int bufferSize, short replication, int blockSize) - : hdfsClient_(hdfsClient), filePath_(path) { + : driver_(driver), hdfsClient_(hdfsClient), filePath_(path) { auto pos = filePath_.rfind("/"); auto parentDir = filePath_.substr(0, pos + 1); - if (hdfsExists(hdfsClient_, parentDir.c_str()) == -1) { - hdfsCreateDirectory(hdfsClient_, parentDir.c_str()); + if (driver_->Exists(hdfsClient_, parentDir.c_str()) == -1) { + driver_->MakeDirectory(hdfsClient_, parentDir.c_str()); } - hdfsFile_ = hdfsOpenFile( + hdfsFile_ = driver_->OpenFile( hdfsClient_, filePath_.c_str(), O_WRONLY, @@ -42,7 +42,7 @@ HdfsWriteFile::HdfsWriteFile( hdfsFile_, "Failed to open hdfs file: {}, with error: {}", filePath_, - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); } HdfsWriteFile::~HdfsWriteFile() { @@ -52,12 +52,12 @@ HdfsWriteFile::~HdfsWriteFile() { } void HdfsWriteFile::close() { - int success = hdfsCloseFile(hdfsClient_, hdfsFile_); + int success = driver_->CloseFile(hdfsClient_, hdfsFile_); VELOX_CHECK_EQ( success, 0, "Failed to close hdfs file: {}", - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); hdfsFile_ = nullptr; } @@ -66,9 +66,9 @@ void HdfsWriteFile::flush() { hdfsFile_, "Cannot flush HDFS file because file handle is null, file path: {}", filePath_); - int success = hdfsFlush(hdfsClient_, hdfsFile_); + int success = driver_->Flush(hdfsClient_, hdfsFile_); VELOX_CHECK_EQ( - success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError())); + success, 0, "Hdfs flush error: {}", driver_->GetLastExceptionRootCause()); } void HdfsWriteFile::append(std::string_view data) { @@ -79,20 +79,20 @@ void HdfsWriteFile::append(std::string_view data) { hdfsFile_, "Cannot append to HDFS file because file handle is null, file path: {}", filePath_); - int64_t totalWrittenBytes = - hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); + int64_t totalWrittenBytes = driver_->Write( + hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); VELOX_CHECK_EQ( totalWrittenBytes, data.size(), "Write failure in HDFSWriteFile::append {}", - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); } uint64_t HdfsWriteFile::size() const { - auto fileInfo = hdfsGetPathInfo(hdfsClient_, filePath_.c_str()); + auto fileInfo = driver_->GetPathInfo(hdfsClient_, filePath_.c_str()); uint64_t size = fileInfo->mSize; // should call hdfsFreeFileInfo to avoid memory leak - hdfsFreeFileInfo(fileInfo, 1); + driver_->FreeFileInfo(fileInfo, 1); return size; } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h index 7ed1819cd61f8..7c8436bdbf321 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h @@ -15,11 +15,14 @@ */ #pragma once -#include #include "velox/common/file/File.h" +#include "velox/external/hdfs/ArrowHdfsInternal.h" +#include "velox/external/hdfs/hdfs.h" namespace facebook::velox { +using filesystems::arrow::io::internal::LibHdfsShim; + /// Implementation of hdfs write file. Nothing written to the file should be /// read back until it is closed. class HdfsWriteFile : public WriteFile { @@ -34,6 +37,7 @@ class HdfsWriteFile : public WriteFile { /// @param blockSize Size of block - pass 0 if you want to use the /// default configured values. HdfsWriteFile( + LibHdfsShim* driver, hdfsFS hdfsClient, std::string_view path, int bufferSize = 0, @@ -55,6 +59,7 @@ class HdfsWriteFile : public WriteFile { void close() override; private: + LibHdfsShim* driver_; /// The configured hdfs filesystem handle. hdfsFS hdfsClient_; /// The hdfs file handle for write. diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp index bdff4a7a4fdc7..1f23179f0a725 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#ifdef VELOX_ENABLE_HDFS3 +#ifdef VELOX_ENABLE_HDFS #include "folly/concurrency/ConcurrentHashMap.h" #include "velox/common/config/Config.h" @@ -25,7 +25,7 @@ namespace facebook::velox::filesystems { -#ifdef VELOX_ENABLE_HDFS3 +#ifdef VELOX_ENABLE_HDFS std::mutex mtx; std::function #include -#include #include #include #include "HdfsMiniCluster.h" #include "gtest/gtest.h" +#include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" #include "velox/core/QueryConfig.h" #include "velox/exec/tests/utils/TempFilePath.h" +#include + using namespace facebook::velox; +using filesystems::arrow::io::internal::LibHdfsShim; + constexpr int kOneMB = 1 << 20; static const std::string destinationPath = "/test_file.txt"; static const std::string hdfsPort = "7878"; static const std::string localhost = "localhost"; static const std::string fullDestinationPath = "hdfs://" + localhost + ":" + hdfsPort + destinationPath; -static const std::string simpleDestinationPath = "hdfs:///" + destinationPath; +static const std::string simpleDestinationPath = "hdfs://" + destinationPath; static const std::unordered_map configurationValues( {{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}}); @@ -54,6 +58,7 @@ class HdfsFileSystemTest : public testing::Test { if (!miniCluster->isRunning()) { miniCluster->start(); } + filesystems::registerHdfsFileSystem(); } static void TearDownTestSuite() { @@ -118,6 +123,7 @@ void checkReadErrorMessages( } catch (VeloxException const& error) { EXPECT_THAT(error.message(), testing::HasSubstr(errorMessage)); } + try { auto buf = std::make_unique(8); readFile->pread(10 + kOneMB, endpoint, buf.get()); @@ -127,9 +133,9 @@ void checkReadErrorMessages( } } -void verifyFailures(hdfsFS hdfs) { - HdfsReadFile readFile(hdfs, destinationPath); - HdfsReadFile readFile2(hdfs, destinationPath); +void verifyFailures(LibHdfsShim* driver, hdfsFS hdfs) { + HdfsReadFile readFile(driver, hdfs, destinationPath); + HdfsReadFile readFile2(driver, hdfs, destinationPath); auto startPoint = 10 + kOneMB; auto size = 15 + kOneMB; auto endpoint = 10 + 2 * kOneMB; @@ -138,43 +144,43 @@ void verifyFailures(hdfsFS hdfs) { "(%d vs. %d) Cannot read HDFS file beyond its size: %d, offset: %d, end point: %d") % size % endpoint % size % startPoint % endpoint) .str(); - auto serverAddress = (boost::format("%s:%s") % localhost % hdfsPort).str(); + auto readFailErrorMessage = (boost::format( - "Unable to open file %s. got error: HdfsIOException: InputStreamImpl: cannot open file: %s.\t" - "Caused by: Hdfs::HdfsRpcException: HdfsFailoverException: Failed to invoke RPC call \"getBlockLocations\" on server \"%s\"\t\t" - "Caused by: HdfsNetworkConnectException: Connect to \"%s\" failed") % - destinationPath % destinationPath % serverAddress % serverAddress) - .str(); - auto builderErrorMessage = - (boost::format( - "Unable to connect to HDFS: %s, got error: Hdfs::HdfsRpcException: HdfsFailoverException: " - "Failed to invoke RPC call \"getFsStats\" on server \"%s\"\tCaused by: " - "HdfsNetworkConnectException: Connect to \"%s\" failed") % - serverAddress % serverAddress % serverAddress) + "Unable to open file %s. got error: ConnectException: Connection refused") % + destinationPath) .str(); + checkReadErrorMessages(&readFile, offsetErrorMessage, kOneMB); HdfsFileSystemTest::miniCluster->stop(); + // Sleep for 10 seconds after stopping the miniCluster to ensure consistency + // in connection status. + // This prevents a scenario where the first pread call in the + // checkReadErrorMessages method might succeed, while a subsequent call fails, + // leading to a mismatch in readFailErrorMessage. + sleep(10); checkReadErrorMessages(&readFile2, readFailErrorMessage, 1); - try { - auto config = std::make_shared( - std::unordered_map(configurationValues)); - filesystems::HdfsFileSystem hdfsFileSystem( - config, - filesystems::HdfsFileSystem::getServiceEndpoint( - simpleDestinationPath, config.get())); - FAIL() << "expected VeloxException"; - } catch (VeloxException const& error) { - EXPECT_THAT(error.message(), testing::HasSubstr(builderErrorMessage)); - } } TEST_F(HdfsFileSystemTest, read) { - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, 7878); - auto hdfs = hdfsBuilderConnect(builder); - HdfsReadFile readFile(hdfs, destinationPath); + LibHdfsShim* driver; + auto status = ConnectLibHdfs(&driver); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver->NewBuilder(); + driver->BuilderSetNameNode(builder, localhost.c_str()); + driver->BuilderSetNameNodePort(builder, 7878); + driver->BuilderSetForceNewInstance(builder); + + auto hdfs = driver->BuilderConnect(builder); + VELOX_CHECK_NOT_NULL( + hdfs, + "Unable to connect to HDFS: {}, got error", + std::string(localhost.c_str()) + ":7878"); + HdfsReadFile readFile(driver, hdfs, destinationPath); readData(&readFile); } @@ -223,6 +229,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) { auto config = std::make_shared( std::unordered_map(configurationValues)); auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); + VELOX_ASSERT_RUNTIME_THROW_CODE( hdfsFileSystem->openFileForRead( "hdfs://localhost:7777/path/that/does/not/exist"), @@ -270,11 +277,24 @@ TEST_F(HdfsFileSystemTest, missingPort) { TEST_F(HdfsFileSystemTest, missingFileViaReadFile) { try { - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, std::stoi(hdfsPort)); - auto hdfs = hdfsBuilderConnect(builder); - HdfsReadFile readFile(hdfs, "/path/that/does/not/exist"); + LibHdfsShim* driver; + auto status = ConnectLibHdfs(&driver); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver->NewBuilder(); + driver->BuilderSetNameNode(builder, localhost.c_str()); + driver->BuilderSetNameNodePort(builder, stoi(hdfsPort)); + driver->BuilderSetForceNewInstance(builder); + + auto hdfs = driver->BuilderConnect(builder); + VELOX_CHECK_NOT_NULL( + hdfs, + "Unable to connect to HDFS: {}, got error", + std::string(localhost.c_str()) + hdfsPort); + HdfsReadFile readFile(driver, hdfs, "/path/that/does/not/exist"); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT( @@ -287,13 +307,13 @@ TEST_F(HdfsFileSystemTest, missingFileViaReadFile) { TEST_F(HdfsFileSystemTest, schemeMatching) { try { auto fs = std::dynamic_pointer_cast( - filesystems::getFileSystem("/", nullptr)); + filesystems::getFileSystem("file://", nullptr)); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT( error.message(), testing::HasSubstr( - "No registered file system matched with file path '/'")); + "No registered file system matched with file path 'file://'")); } auto fs = std::dynamic_pointer_cast( filesystems::getFileSystem(fullDestinationPath, nullptr)); @@ -326,11 +346,25 @@ TEST_F(HdfsFileSystemTest, removeNotSupported) { TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) { startThreads = false; - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, 7878); - auto hdfs = hdfsBuilderConnect(builder); - HdfsReadFile readFile(hdfs, destinationPath); + + LibHdfsShim* driver; + auto status = ConnectLibHdfs(&driver); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver->NewBuilder(); + driver->BuilderSetNameNode(builder, localhost.c_str()); + driver->BuilderSetNameNodePort(builder, 7878); + driver->BuilderSetForceNewInstance(builder); + + auto hdfs = driver->BuilderConnect(builder); + VELOX_CHECK_NOT_NULL( + hdfs, + "Unable to connect to HDFS: {}, got error", + std::string(localhost.c_str()) + ":7878"); + std::vector threads; std::mt19937 generator(std::random_device{}()); std::vector sleepTimesInMicroseconds = {0, 500, 50000}; @@ -338,13 +372,14 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) { 0, sleepTimesInMicroseconds.size() - 1); for (int i = 0; i < 25; i++) { auto thread = std::thread( - [&readFile, &distribution, &generator, &sleepTimesInMicroseconds] { + [&driver, &hdfs, &distribution, &generator, &sleepTimesInMicroseconds] { int index = distribution(generator); while (!HdfsFileSystemTest::startThreads) { std::this_thread::yield(); } std::this_thread::sleep_for( std::chrono::microseconds(sleepTimesInMicroseconds[index])); + HdfsReadFile readFile(driver, hdfs, destinationPath); readData(&readFile); }); threads.emplace_back(std::move(thread)); @@ -440,9 +475,22 @@ TEST_F(HdfsFileSystemTest, writeWithParentDirNotExist) { } TEST_F(HdfsFileSystemTest, readFailures) { - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, stoi(hdfsPort)); - auto hdfs = hdfsBuilderConnect(builder); - verifyFailures(hdfs); + LibHdfsShim* driver; + auto status = ConnectLibHdfs(&driver); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver->NewBuilder(); + driver->BuilderSetNameNode(builder, localhost.c_str()); + driver->BuilderSetNameNodePort(builder, stoi(hdfsPort)); + driver->BuilderSetForceNewInstance(builder); + + auto hdfs = driver->BuilderConnect(builder); + VELOX_CHECK_NOT_NULL( + hdfs, + "Unable to connect to HDFS: {}, got error", + std::string(localhost.c_str()) + hdfsPort); + verifyFailures(driver, hdfs); } diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp index 2765d73142e2b..df46db159c7bb 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp @@ -104,7 +104,7 @@ TEST_F(InsertIntoHdfsTest, insertIntoHdfsTest) { plan = PlanBuilder().tableScan(rowType_).planNode(); auto splits = HiveConnectorTestBase::makeHiveConnectorSplits( - fmt::format("{}/{}", outputDirectory, writeFileName), + fmt::format("{}{}", outputDirectory, writeFileName), 1, dwio::common::FileFormat::DWRF); auto copy = AssertQueryBuilder(plan).split(splits[0]).copyResults(pool()); diff --git a/velox/external/hdfs/ArrowHdfsInternal.cpp b/velox/external/hdfs/ArrowHdfsInternal.cpp new file mode 100644 index 0000000000000..c9a0c7dd81f0f --- /dev/null +++ b/velox/external/hdfs/ArrowHdfsInternal.cpp @@ -0,0 +1,641 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This shim interface to libhdfs (for runtime shared library loading) has been +// adapted from the SFrame project, released under the ASF-compatible 3-clause +// BSD license +// +// Using this required having the $JAVA_HOME and $HADOOP_HOME environment +// variables set, so that libjvm and libhdfs can be located easily + +// Copyright (C) 2015 Dato, Inc. +// All rights reserved. +// +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +#include "ArrowHdfsInternal.h" + +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace facebook::velox::filesystems::arrow { + +using ::arrow::internal::GetEnvVarNative; +using ::arrow::internal::PlatformFilename; +#ifdef _WIN32 +using internal::WinErrorMessage; +#endif + +namespace io { +namespace internal { + +namespace { + +void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) { + if (handle == NULL) { + printf("handle is null\n"); + return NULL; + } + +#ifndef _WIN32 + return dlsym(handle, symbol); +#else + + void* ret = reinterpret_cast(GetProcAddress(, symbol)); + if (ret == NULL) { + printf("ret is null\n"); + // logstream(LOG_INFO) << "GetProcAddress error: " + // << get_last_err_str(GetLastError()) << std::endl; + } + return ret; +#endif +} + +#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ + do { \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } \ + if (!SHIM->SYMBOL_NAME) \ + return ::arrow::Status::IOError("Getting symbol " #SYMBOL_NAME \ + "failed"); \ + } while (0) + +#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } + +LibraryHandle libjvm_handle = nullptr; + +// Helper functions for dlopens +::arrow::Result> get_potential_libjvm_paths(); +::arrow::Result> get_potential_libhdfs_paths(); +::arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name); + +::arrow::Result> MakeFilenameVector( + const std::vector& names) { + std::vector filenames(names.size()); + for (size_t i = 0; i < names.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(filenames[i], PlatformFilename::FromString(names[i])); + } + return filenames; +} + +void AppendEnvVarFilename( + const char* var_name, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + filenames->emplace_back(std::move(*maybe_env_var)); + } +} + +void AppendEnvVarFilename( + const char* var_name, + const char* suffix, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + auto maybe_env_var_with_suffix = + PlatformFilename(std::move(*maybe_env_var)).Join(suffix); + if (maybe_env_var_with_suffix.ok()) { + filenames->emplace_back(std::move(*maybe_env_var_with_suffix)); + } + } +} + +void InsertEnvVarFilename( + const char* var_name, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + filenames->emplace( + filenames->begin(), PlatformFilename(std::move(*maybe_env_var))); + } +} + +::arrow::Result> get_potential_libhdfs_paths() { + std::vector potential_paths; + std::string file_name; + +// OS-specific file name +#ifdef _WIN32 + file_name = "hdfs.dll"; +#elif __APPLE__ + file_name = "libhdfs.dylib"; +#else + file_name = "libhdfs.so"; +#endif + + // Common paths + ARROW_ASSIGN_OR_RAISE(auto search_paths, MakeFilenameVector({"", "."})); + + // Path from environment variable + AppendEnvVarFilename("HADOOP_HOME", "lib/native", &search_paths); + AppendEnvVarFilename("ARROW_LIBHDFS_DIR", &search_paths); + + // All paths with file name + for (const auto& path : search_paths) { + ARROW_ASSIGN_OR_RAISE(auto full_path, path.Join(file_name)); + potential_paths.push_back(std::move(full_path)); + } + + return potential_paths; +} + +::arrow::Result> get_potential_libjvm_paths() { + std::vector potential_paths; + + std::vector search_prefixes; + std::vector search_suffixes; + std::string file_name; + +// From heuristics +#ifdef _WIN32 + ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""})); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, MakeFilenameVector({"/jre/bin/server", "/bin/server"})); + file_name = "jvm.dll"; +#elif __APPLE__ + ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""})); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, MakeFilenameVector({"/jre/lib/server", "/lib/server"})); + file_name = "libjvm.dylib"; + +// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are +// expecting users to set an environment variable +#else +#if defined(__aarch64__) + const std::string prefix_arch{"arm64"}; + const std::string suffix_arch{"aarch64"}; +#else + const std::string prefix_arch{"amd64"}; + const std::string suffix_arch{"amd64"}; +#endif + ARROW_ASSIGN_OR_RAISE( + search_prefixes, + MakeFilenameVector({ + "/usr/lib/jvm/default-java", // ubuntu / debian distros + "/usr/lib/jvm/java", // rhel6 + "/usr/lib/jvm", // centos6 + "/usr/lib64/jvm", // opensuse 13 + "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java", // alt rhel6 + "/usr/local/lib/jvm", // alt centos6 + "/usr/local/lib64/jvm", // alt opensuse 13 + "/usr/local/lib/jvm/java-8-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-8-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-7-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-6-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-6-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/lib/jvm/default", // alt centos + "/usr/java/latest" // alt centos + })); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, + MakeFilenameVector( + {"", + "/lib/server", + "/jre/lib/" + suffix_arch + "/server", + "/lib/" + suffix_arch + "/server"})); + file_name = "libjvm.so"; +#endif + + // From direct environment variable + InsertEnvVarFilename("JAVA_HOME", &search_prefixes); + + // Generate cross product between search_prefixes, search_suffixes, and + // file_name + for (auto& prefix : search_prefixes) { + for (auto& suffix : search_suffixes) { + ARROW_ASSIGN_OR_RAISE(auto path, prefix.Join(suffix).Join(file_name)); + potential_paths.push_back(std::move(path)); + } + } + + return potential_paths; +} + +#ifndef _WIN32 +::arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name) { + std::string error_message = "unknown error"; + LibraryHandle handle; + + for (const auto& p : potential_paths) { + handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL); + + if (handle != NULL) { + return handle; + } else { + const char* err_msg = dlerror(); + if (err_msg != NULL) { + error_message = err_msg; + } + } + } + + return ::arrow::Status::IOError("Unable to load ", name, ": ", error_message); +} + +#else +::arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name) { + std::string error_message; + LibraryHandle handle; + + for (const auto& p : potential_paths) { + handle = LoadLibraryW(p.ToNative().c_str()); + if (handle != NULL) { + return handle; + } else { + error_message = WinErrorMessage(GetLastError()); + } + } + + return ::arrow::Status::IOError("Unable to load ", name, ": ", error_message); +} +#endif // _WIN32 + +LibHdfsShim libhdfs_shim; + +} // namespace + +::arrow::Status LibHdfsShim::GetRequiredSymbols() { + GET_SYMBOL_REQUIRED(this, hdfsNewBuilder); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect); + GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory); + GET_SYMBOL_REQUIRED(this, hdfsDelete); + GET_SYMBOL_REQUIRED(this, hdfsDisconnect); + GET_SYMBOL_REQUIRED(this, hdfsExists); + GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo); + GET_SYMBOL_REQUIRED(this, hdfsGetCapacity); + GET_SYMBOL_REQUIRED(this, hdfsGetUsed); + GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo); + GET_SYMBOL_REQUIRED(this, hdfsListDirectory); + GET_SYMBOL_REQUIRED(this, hdfsChown); + GET_SYMBOL_REQUIRED(this, hdfsChmod); + + // File methods + GET_SYMBOL_REQUIRED(this, hdfsCloseFile); + GET_SYMBOL_REQUIRED(this, hdfsFlush); + GET_SYMBOL_REQUIRED(this, hdfsOpenFile); + GET_SYMBOL_REQUIRED(this, hdfsRead); + GET_SYMBOL_REQUIRED(this, hdfsSeek); + GET_SYMBOL_REQUIRED(this, hdfsTell); + GET_SYMBOL_REQUIRED(this, hdfsWrite); + + return ::arrow::Status::OK(); +} + +::arrow::Status ConnectLibHdfs(LibHdfsShim** driver) { + static std::mutex lock; + std::lock_guard guard(lock); + + LibHdfsShim* shim = &libhdfs_shim; + + static bool shim_attempted = false; + if (!shim_attempted) { + shim_attempted = true; + + shim->Initialize(); + + ARROW_ASSIGN_OR_RAISE( + auto libjvm_potential_paths, get_potential_libjvm_paths()); + ARROW_ASSIGN_OR_RAISE( + libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm")); + + ARROW_ASSIGN_OR_RAISE( + auto libhdfs_potential_paths, get_potential_libhdfs_paths()); + ARROW_ASSIGN_OR_RAISE( + shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs")); + } else if (shim->handle == nullptr) { + return ::arrow::Status::IOError("Prior attempt to load libhdfs failed"); + } + + *driver = shim; + return shim->GetRequiredSymbols(); +} + +/////////////////////////////////////////////////////////////////////////// +// HDFS thin wrapper methods + +hdfsBuilder* LibHdfsShim::NewBuilder(void) { + return this->hdfsNewBuilder(); +} + +void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { + this->hdfsBuilderSetNameNode(bld, nn); +} + +void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) { + this->hdfsBuilderSetNameNodePort(bld, port); +} + +void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) { + this->hdfsBuilderSetUserName(bld, userName); +} + +void LibHdfsShim::BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, + const char* kerbTicketCachePath) { + this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); +} + +void LibHdfsShim::BuilderSetForceNewInstance(hdfsBuilder* bld) { + this->hdfsBuilderSetForceNewInstance(bld); +} + +hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) { + return this->hdfsBuilderConnect(bld); +} + +int LibHdfsShim::BuilderConfSetStr( + hdfsBuilder* bld, + const char* key, + const char* val) { + return this->hdfsBuilderConfSetStr(bld, key, val); +} + +int LibHdfsShim::Disconnect(hdfsFS fs) { + return this->hdfsDisconnect(fs); +} + +hdfsFile LibHdfsShim::OpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize) { // NOLINT + return this->hdfsOpenFile( + fs, path, flags, bufferSize, replication, blocksize); +} + +int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) { + return this->hdfsCloseFile(fs, file); +} + +int LibHdfsShim::Exists(hdfsFS fs, const char* path) { + return this->hdfsExists(fs, path); +} + +int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { + return this->hdfsSeek(fs, file, desiredPos); +} + +tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { + return this->hdfsTell(fs, file); +} + +tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { + return this->hdfsRead(fs, file, buffer, length); +} + +bool LibHdfsShim::HasPread() { + GET_SYMBOL(this, hdfsPread); + return this->hdfsPread != nullptr; +} + +tSize LibHdfsShim::Pread( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length) { + GET_SYMBOL(this, hdfsPread); + DCHECK(this->hdfsPread); + return this->hdfsPread(fs, file, position, buffer, length); +} + +tSize LibHdfsShim::Write( + hdfsFS fs, + hdfsFile file, + const void* buffer, + tSize length) { + return this->hdfsWrite(fs, file, buffer, length); +} + +int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { + return this->hdfsFlush(fs, file); +} + +int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { + GET_SYMBOL(this, hdfsAvailable); + if (this->hdfsAvailable) + return this->hdfsAvailable(fs, file); + else + return 0; +} + +int LibHdfsShim::Copy( + hdfsFS srcFS, + const char* src, + hdfsFS dstFS, + const char* dst) { + GET_SYMBOL(this, hdfsCopy); + if (this->hdfsCopy) + return this->hdfsCopy(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Move( + hdfsFS srcFS, + const char* src, + hdfsFS dstFS, + const char* dst) { + GET_SYMBOL(this, hdfsMove); + if (this->hdfsMove) + return this->hdfsMove(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { + return this->hdfsDelete(fs, path, recursive); +} + +int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { + GET_SYMBOL(this, hdfsRename); + if (this->hdfsRename) + return this->hdfsRename(fs, oldPath, newPath); + else + return 0; +} + +char* LibHdfsShim::GetWorkingDirectory( + hdfsFS fs, + char* buffer, + size_t bufferSize) { + GET_SYMBOL(this, hdfsGetWorkingDirectory); + if (this->hdfsGetWorkingDirectory) { + return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); + } else { + return NULL; + } +} + +int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) { + GET_SYMBOL(this, hdfsSetWorkingDirectory); + if (this->hdfsSetWorkingDirectory) { + return this->hdfsSetWorkingDirectory(fs, path); + } else { + return 0; + } +} + +int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) { + return this->hdfsCreateDirectory(fs, path); +} + +int LibHdfsShim::SetReplication( + hdfsFS fs, + const char* path, + int16_t replication) { + GET_SYMBOL(this, hdfsSetReplication); + if (this->hdfsSetReplication) { + return this->hdfsSetReplication(fs, path, replication); + } else { + return 0; + } +} + +hdfsFileInfo* +LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) { + return this->hdfsListDirectory(fs, path, numEntries); +} + +hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) { + return this->hdfsGetPathInfo(fs, path); +} + +void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { + this->hdfsFreeFileInfo(hdfsFileInfo, numEntries); +} + +char*** LibHdfsShim::GetHosts( + hdfsFS fs, + const char* path, + tOffset start, + tOffset length) { + GET_SYMBOL(this, hdfsGetHosts); + if (this->hdfsGetHosts) { + return this->hdfsGetHosts(fs, path, start, length); + } else { + return NULL; + } +} + +void LibHdfsShim::FreeHosts(char*** blockHosts) { + GET_SYMBOL(this, hdfsFreeHosts); + if (this->hdfsFreeHosts) { + this->hdfsFreeHosts(blockHosts); + } +} + +tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { + GET_SYMBOL(this, hdfsGetDefaultBlockSize); + if (this->hdfsGetDefaultBlockSize) { + return this->hdfsGetDefaultBlockSize(fs); + } else { + return 0; + } +} + +tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { + return this->hdfsGetCapacity(fs); +} + +tOffset LibHdfsShim::GetUsed(hdfsFS fs) { + return this->hdfsGetUsed(fs); +} + +int LibHdfsShim::Chown( + hdfsFS fs, + const char* path, + const char* owner, + const char* group) { + return this->hdfsChown(fs, path, owner, group); +} + +int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT + return this->hdfsChmod(fs, path, mode); +} + +int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { + GET_SYMBOL(this, hdfsUtime); + if (this->hdfsUtime) { + return this->hdfsUtime(fs, path, mtime, atime); + } else { + return 0; + } +} + +char* LibHdfsShim::GetLastExceptionRootCause() { + GET_SYMBOL(this, hdfsGetLastExceptionRootCause); + if (this->hdfsGetLastExceptionRootCause) { + return this->hdfsGetLastExceptionRootCause(); + } else { + return strdup("GetLastExceptionRootCause return null"); + } +} + +} // namespace internal +} // namespace io +} // namespace facebook::velox::filesystems::arrow diff --git a/velox/external/hdfs/ArrowHdfsInternal.h b/velox/external/hdfs/ArrowHdfsInternal.h new file mode 100644 index 0000000000000..1ee57516fd171 --- /dev/null +++ b/velox/external/hdfs/ArrowHdfsInternal.h @@ -0,0 +1,260 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#pragma once + +#include +#include + +#include "hdfs.h" + +#include + +#include "arrow/util/visibility.h" +#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep + +using std::size_t; + +struct hdfsBuilder; + +namespace facebook::velox::filesystems::arrow { + +namespace io { +namespace internal { + +#ifndef _WIN32 +typedef void* LibraryHandle; +#else +typedef HINSTANCE LibraryHandle; +#endif + +// NOTE(wesm): cpplint does not like use of short and other imprecise C types +struct LibHdfsShim { + LibraryHandle handle; + + hdfsBuilder* (*hdfsNewBuilder)(void); + void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); + void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port); + void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName); + void (*hdfsBuilderSetKerbTicketCachePath)( + hdfsBuilder* bld, + const char* kerbTicketCachePath); + void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld); + hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld); + int (*hdfsBuilderConfSetStr)( + hdfsBuilder* bld, + const char* key, + const char* val); + + int (*hdfsDisconnect)(hdfsFS fs); + + hdfsFile (*hdfsOpenFile)( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); // NOLINT + + int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file); + int (*hdfsExists)(hdfsFS fs, const char* path); + int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos); + tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file); + tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + tSize (*hdfsPread)( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length); + tSize ( + *hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + int (*hdfsFlush)(hdfsFS fs, hdfsFile file); + int (*hdfsAvailable)(hdfsFS fs, hdfsFile file); + int (*hdfsCopy)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsMove)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsDelete)(hdfsFS fs, const char* path, int recursive); + int (*hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath); + char* (*hdfsGetWorkingDirectory)(hdfsFS fs, char* buffer, size_t bufferSize); + int (*hdfsSetWorkingDirectory)(hdfsFS fs, const char* path); + int (*hdfsCreateDirectory)(hdfsFS fs, const char* path); + int (*hdfsSetReplication)(hdfsFS fs, const char* path, int16_t replication); + hdfsFileInfo* ( + *hdfsListDirectory)(hdfsFS fs, const char* path, int* numEntries); + hdfsFileInfo* (*hdfsGetPathInfo)(hdfsFS fs, const char* path); + void (*hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries); + char*** (*hdfsGetHosts)( + hdfsFS fs, + const char* path, + tOffset start, + tOffset length); + void (*hdfsFreeHosts)(char*** blockHosts); + tOffset (*hdfsGetDefaultBlockSize)(hdfsFS fs); + tOffset (*hdfsGetCapacity)(hdfsFS fs); + tOffset (*hdfsGetUsed)(hdfsFS fs); + int (*hdfsChown)( + hdfsFS fs, + const char* path, + const char* owner, + const char* group); + int (*hdfsChmod)(hdfsFS fs, const char* path, short mode); // NOLINT + int (*hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime); + char* (*hdfsGetLastExceptionStackTrace)(); + char* (*hdfsGetLastExceptionRootCause)(); + + void Initialize() { + this->handle = nullptr; + this->hdfsNewBuilder = nullptr; + this->hdfsBuilderSetNameNode = nullptr; + this->hdfsBuilderSetNameNodePort = nullptr; + this->hdfsBuilderSetUserName = nullptr; + this->hdfsBuilderSetKerbTicketCachePath = nullptr; + this->hdfsBuilderSetForceNewInstance = nullptr; + this->hdfsBuilderConfSetStr = nullptr; + this->hdfsBuilderConnect = nullptr; + this->hdfsDisconnect = nullptr; + this->hdfsOpenFile = nullptr; + this->hdfsCloseFile = nullptr; + this->hdfsExists = nullptr; + this->hdfsSeek = nullptr; + this->hdfsTell = nullptr; + this->hdfsRead = nullptr; + this->hdfsPread = nullptr; + this->hdfsWrite = nullptr; + this->hdfsFlush = nullptr; + this->hdfsAvailable = nullptr; + this->hdfsCopy = nullptr; + this->hdfsMove = nullptr; + this->hdfsDelete = nullptr; + this->hdfsRename = nullptr; + this->hdfsGetWorkingDirectory = nullptr; + this->hdfsSetWorkingDirectory = nullptr; + this->hdfsCreateDirectory = nullptr; + this->hdfsSetReplication = nullptr; + this->hdfsListDirectory = nullptr; + this->hdfsGetPathInfo = nullptr; + this->hdfsFreeFileInfo = nullptr; + this->hdfsGetHosts = nullptr; + this->hdfsFreeHosts = nullptr; + this->hdfsGetDefaultBlockSize = nullptr; + this->hdfsGetCapacity = nullptr; + this->hdfsGetUsed = nullptr; + this->hdfsChown = nullptr; + this->hdfsChmod = nullptr; + this->hdfsUtime = nullptr; + this->hdfsGetLastExceptionStackTrace = nullptr; + this->hdfsGetLastExceptionRootCause = nullptr; + } + + hdfsBuilder* NewBuilder(void); + + void BuilderSetNameNode(hdfsBuilder* bld, const char* nn); + + void BuilderSetNameNodePort(hdfsBuilder* bld, tPort port); + + void BuilderSetUserName(hdfsBuilder* bld, const char* userName); + + void BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, + const char* kerbTicketCachePath); + + void BuilderSetForceNewInstance(hdfsBuilder* bld); + + int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val); + + hdfsFS BuilderConnect(hdfsBuilder* bld); + + int Disconnect(hdfsFS fs); + + hdfsFile OpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); // NOLINT + + int CloseFile(hdfsFS fs, hdfsFile file); + + int Exists(hdfsFS fs, const char* path); + + int Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + tOffset Tell(hdfsFS fs, hdfsFile file); + + tSize Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + bool HasPread(); + + tSize + Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + + tSize Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + + int Flush(hdfsFS fs, hdfsFile file); + + int Available(hdfsFS fs, hdfsFile file); + + int Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Delete(hdfsFS fs, const char* path, int recursive); + + int Rename(hdfsFS fs, const char* oldPath, const char* newPath); + + char* GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize); + + int SetWorkingDirectory(hdfsFS fs, const char* path); + + int MakeDirectory(hdfsFS fs, const char* path); + + int SetReplication(hdfsFS fs, const char* path, int16_t replication); + + hdfsFileInfo* ListDirectory(hdfsFS fs, const char* path, int* numEntries); + + hdfsFileInfo* GetPathInfo(hdfsFS fs, const char* path); + + void FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries); + + char*** GetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length); + + void FreeHosts(char*** blockHosts); + + tOffset GetDefaultBlockSize(hdfsFS fs); + tOffset GetCapacity(hdfsFS fs); + + tOffset GetUsed(hdfsFS fs); + + int Chown(hdfsFS fs, const char* path, const char* owner, const char* group); + + int Chmod(hdfsFS fs, const char* path, short mode); // NOLINT + + int Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + char* GetLastExceptionRootCause(); + + ::arrow::Status GetRequiredSymbols(); +}; + +// TODO(wesm): Remove these exports when we are linking statically +ARROW_EXPORT ::arrow::Status ConnectLibHdfs(LibHdfsShim** driver); + +} // namespace internal +} // namespace io +} // namespace facebook::velox::filesystems::arrow diff --git a/velox/external/hdfs/CMakeLists.txt b/velox/external/hdfs/CMakeLists.txt new file mode 100644 index 0000000000000..a35f728224c7c --- /dev/null +++ b/velox/external/hdfs/CMakeLists.txt @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_ENABLE_HDFS}) + velox_add_library(velox_external_hdfs ArrowHdfsInternal.cpp) + velox_link_libraries( + velox_external_hdfs + PRIVATE + arrow) +endif() diff --git a/velox/external/hdfs/hdfs.h b/velox/external/hdfs/hdfs.h new file mode 100644 index 0000000000000..d9c3a058ec221 --- /dev/null +++ b/velox/external/hdfs/hdfs.h @@ -0,0 +1,1078 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Hadoop. + +#ifndef LIBHDFS_HDFS_H +#define LIBHDFS_HDFS_H + +#include /* for EINTERNAL, etc. */ +#include /* for O_RDONLY, O_WRONLY */ +#include /* for uint64_t, etc. */ +#include /* for time_t */ + +/* + * Support export of DLL symbols during libhdfs build, and import of DLL symbols + * during client application build. A client application may optionally define + * symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but + * the compiler can produce more efficient code with it. + */ +#ifdef WIN32 +#ifdef LIBHDFS_DLL_EXPORT +#define LIBHDFS_EXTERNAL __declspec(dllexport) +#elif LIBHDFS_DLL_IMPORT +#define LIBHDFS_EXTERNAL __declspec(dllimport) +#else +#define LIBHDFS_EXTERNAL +#endif +#else +#ifdef LIBHDFS_DLL_EXPORT +#define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) +#elif LIBHDFS_DLL_IMPORT +#define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) +#else +#define LIBHDFS_EXTERNAL +#endif +#endif + +#ifndef O_RDONLY +#define O_RDONLY 1 +#endif + +#ifndef O_WRONLY +#define O_WRONLY 2 +#endif + +#ifndef EINTERNAL +#define EINTERNAL 255 +#endif + +#define ELASTIC_BYTE_BUFFER_POOL_CLASS \ + "org/apache/hadoop/io/ElasticByteBufferPool" + +/** All APIs set errno to meaningful values */ + +#ifdef __cplusplus +extern "C" { +#endif +/** + * Some utility decls used in libhdfs. + */ +struct hdfsBuilder; +typedef int32_t tSize; /// size of data for read/write io ops +typedef time_t tTime; /// time type in seconds +typedef int64_t tOffset; /// offset within the file +typedef uint16_t tPort; /// port +typedef enum tObjectKind { + kObjectKindFile = 'F', + kObjectKindDirectory = 'D', +} tObjectKind; +struct hdfsStreamBuilder; + +/** + * The C reflection of org.apache.org.hadoop.FileSystem . + */ +struct hdfs_internal; +typedef struct hdfs_internal* hdfsFS; + +struct hdfsFile_internal; +typedef struct hdfsFile_internal* hdfsFile; + +struct hadoopRzOptions; + +struct hadoopRzBuffer; + +/** + * Determine if a file is open for read. + * + * @param file The HDFS file + * @return 1 if the file is open for read; 0 otherwise + */ +LIBHDFS_EXTERNAL +int hdfsFileIsOpenForRead(hdfsFile file); + +/** + * Determine if a file is open for write. + * + * @param file The HDFS file + * @return 1 if the file is open for write; 0 otherwise + */ +LIBHDFS_EXTERNAL +int hdfsFileIsOpenForWrite(hdfsFile file); + +struct hdfsReadStatistics { + uint64_t totalBytesRead; + uint64_t totalLocalBytesRead; + uint64_t totalShortCircuitBytesRead; + uint64_t totalZeroCopyBytesRead; +}; + +/** + * Get read statistics about a file. This is only applicable to files + * opened for reading. + * + * @param file The HDFS file + * @param stats (out parameter) on a successful return, the read + * statistics. Unchanged otherwise. You must free the + * returned statistics with hdfsFileFreeReadStatistics. + * @return 0 if the statistics were successfully returned, + * -1 otherwise. On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support read statistics. + */ +LIBHDFS_EXTERNAL +int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics** stats); + +/** + * @param stats HDFS read statistics for a file. + * + * @return the number of remote bytes read. + */ +LIBHDFS_EXTERNAL +int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics* stats); + +/** + * Clear the read statistics for a file. + * + * @param file The file to clear the read statistics of. + * + * @return 0 on success; the error code otherwise. + * EINVAL: the file is not open for reading. + * ENOTSUP: the file does not support clearing the read + * statistics. + * Errno will also be set to this code on failure. + */ +LIBHDFS_EXTERNAL +int hdfsFileClearReadStatistics(hdfsFile file); + +/** + * Free some HDFS read statistics. + * + * @param stats The HDFS read statistics to free. + */ +LIBHDFS_EXTERNAL +void hdfsFileFreeReadStatistics(struct hdfsReadStatistics* stats); + +struct hdfsHedgedReadMetrics { + uint64_t hedgedReadOps; + uint64_t hedgedReadOpsWin; + uint64_t hedgedReadOpsInCurThread; +}; + +/** + * Get cluster wide hedged read metrics. + * + * @param fs The configured filesystem handle + * @param metrics (out parameter) on a successful return, the hedged read + * metrics. Unchanged otherwise. You must free the returned + * statistics with hdfsFreeHedgedReadMetrics. + * @return 0 if the metrics were successfully returned, -1 otherwise. + * On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support hedged read metrics. + */ +LIBHDFS_EXTERNAL +int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics** metrics); + +/** + * Free HDFS Hedged read metrics. + * + * @param metrics The HDFS Hedged read metrics to free + */ +LIBHDFS_EXTERNAL +void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics* metrics); + +/** + * hdfsConnectAsUser - Connect to a hdfs file system as a specific user + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user the user name (this is hadoop domain user). Or NULL is equivelant + * to hhdfsConnect(host, port) + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char* user); + +/** + * hdfsConnect - Connect to a hdfs file system. + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnect(const char* nn, tPort port); + +/** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user The user name to use when connecting + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS +hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char* user); + +/** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectNewInstance(const char* nn, tPort port); + +/** + * Connect to HDFS using the parameters defined by the builder. + * + * The HDFS builder will be freed, whether or not the connection was + * successful. + * + * Every successful call to hdfsBuilderConnect should be matched with a call + * to hdfsDisconnect, when the hdfsFS is no longer needed. + * + * @param bld The HDFS builder + * @return Returns a handle to the filesystem, or NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsBuilderConnect(struct hdfsBuilder* bld); + +/** + * Create an HDFS builder. + * + * @return The HDFS builder, or NULL on error. + */ +LIBHDFS_EXTERNAL +struct hdfsBuilder* hdfsNewBuilder(void); + +/** + * Force the builder to always create a new instance of the FileSystem, + * rather than possibly finding one in the cache. + * + * @param bld The HDFS builder + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetForceNewInstance(struct hdfsBuilder* bld); + +/** + * Set the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param nn The NameNode to use. + * + * If the string given is 'default', the default NameNode + * configuration will be used (from the XML configuration files) + * + * If NULL is given, a LocalFileSystem will be created. + * + * If the string starts with a protocol type such as file:// or + * hdfs://, this protocol type will be used. If not, the + * hdfs:// protocol type will be used. + * + * You may specify a NameNode port in the usual way by + * passing a string of the format hdfs://:. + * Alternately, you may set the port with + * hdfsBuilderSetNameNodePort. However, you must not pass the + * port in two different ways. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetNameNode(struct hdfsBuilder* bld, const char* nn); + +/** + * Set the port of the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param port The port. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetNameNodePort(struct hdfsBuilder* bld, tPort port); + +/** + * Set the username to use when connecting to the HDFS cluster. + * + * @param bld The HDFS builder + * @param userName The user name. The string will be shallow-copied. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetUserName(struct hdfsBuilder* bld, const char* userName); + +/** + * Set the path to the Kerberos ticket cache to use when connecting to + * the HDFS cluster. + * + * @param bld The HDFS builder + * @param kerbTicketCachePath The Kerberos ticket cache path. The string + * will be shallow-copied. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetKerbTicketCachePath( + struct hdfsBuilder* bld, + const char* kerbTicketCachePath); + +/** + * Free an HDFS builder. + * + * It is normally not necessary to call this function since + * hdfsBuilderConnect frees the builder. + * + * @param bld The HDFS builder + */ +LIBHDFS_EXTERNAL +void hdfsFreeBuilder(struct hdfsBuilder* bld); + +/** + * Set a configuration string for an HdfsBuilder. + * + * @param key The key to set. + * @param val The value, or NULL to set no value. + * This will be shallow-copied. You are responsible for + * ensuring that it remains valid until the builder is + * freed. + * + * @return 0 on success; nonzero error code otherwise. + */ +LIBHDFS_EXTERNAL +int hdfsBuilderConfSetStr( + struct hdfsBuilder* bld, + const char* key, + const char* val); + +/** + * Get a configuration string. + * + * @param key The key to find + * @param val (out param) The value. This will be set to NULL if the + * key isn't found. You must free this string with + * hdfsConfStrFree. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsConfGetStr(const char* key, char** val); + +/** + * Get a configuration integer. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsConfGetInt(const char* key, int32_t* val); + +/** + * Free a configuration string found with hdfsConfGetStr. + * + * @param val A configuration string obtained from hdfsConfGetStr + */ +LIBHDFS_EXTERNAL +void hdfsConfStrFree(char* val); + +/** + * hdfsDisconnect - Disconnect from the hdfs file system. + * Disconnect from hdfs. + * @param fs The configured filesystem handle. + * @return Returns 0 on success, -1 on error. + * Even if there is an error, the resources associated with the + * hdfsFS will be freed. + */ +LIBHDFS_EXTERNAL +int hdfsDisconnect(hdfsFS fs); + +/** + * hdfsOpenFile - Open a hdfs file in given mode. + * @deprecated Use the hdfsStreamBuilder functions instead. + * This function does not support setting block sizes bigger than 2 GB. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param flags - an | of bits/fcntl.h file flags - supported flags are + * O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), + * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || + * (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. + * @param bufferSize Size of buffer for read/write - pass 0 if you want + * to use the default configured values. + * @param replication Block replication - pass 0 if you want to use + * the default configured values. + * @param blocksize Size of block - pass 0 if you want to use the + * default configured values. Note that if you want a block size bigger + * than 2 GB, you must use the hdfsStreamBuilder API rather than this + * deprecated function. + * @return Returns the handle to the open file or NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFile hdfsOpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); + +/** + * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. Will be deep-copied. + * @param flags The open flags, as in hdfsOpenFile. + * @return Returns the hdfsStreamBuilder, or NULL on error. + */ +LIBHDFS_EXTERNAL +struct hdfsStreamBuilder* +hdfsStreamBuilderAlloc(hdfsFS fs, const char* path, int flags); + +/** + * hdfsStreamBuilderFree - Free an HDFS file builder. + * + * It is normally not necessary to call this function since + * hdfsStreamBuilderBuild frees the builder. + * + * @param bld The hdfsStreamBuilder to free. + */ +LIBHDFS_EXTERNAL +void hdfsStreamBuilderFree(struct hdfsStreamBuilder* bld); + +/** + * hdfsStreamBuilderSetBufferSize - Set the stream buffer size. + * + * @param bld The hdfs stream builder. + * @param bufferSize The buffer size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + */ +LIBHDFS_EXTERNAL +int hdfsStreamBuilderSetBufferSize( + struct hdfsStreamBuilder* bld, + int32_t bufferSize); + +/** + * hdfsStreamBuilderSetReplication - Set the replication for the stream. + * This is only relevant for output streams, which will create new blocks. + * + * @param bld The hdfs stream builder. + * @param replication The replication to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ +LIBHDFS_EXTERNAL +int hdfsStreamBuilderSetReplication( + struct hdfsStreamBuilder* bld, + int16_t replication); + +/** + * hdfsStreamBuilderSetDefaultBlockSize - Set the default block size for + * the stream. This is only relevant for output streams, which will create + * new blocks. + * + * @param bld The hdfs stream builder. + * @param defaultBlockSize The default block size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ +LIBHDFS_EXTERNAL +int hdfsStreamBuilderSetDefaultBlockSize( + struct hdfsStreamBuilder* bld, + int64_t defaultBlockSize); + +/** + * hdfsStreamBuilderBuild - Build the stream by calling open or create. + * + * @param bld The hdfs stream builder. This pointer will be freed, whether + * or not the open succeeds. + * + * @return the stream pointer on success, or NULL on error. Errno will be + * set on error. + */ +LIBHDFS_EXTERNAL +hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder* bld); + +/** + * hdfsTruncateFile - Truncate a hdfs file to given lenght. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param newlength The size the file is to be truncated to + * @return 1 if the file has been truncated to the desired newlength + * and is immediately available to be reused for write operations + * such as append. + * 0 if a background process of adjusting the length of the last + * block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength); + +/** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ +LIBHDFS_EXTERNAL +int hdfsUnbufferFile(hdfsFile file); + +/** + * hdfsCloseFile - Close an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + * On error, errno will be set appropriately. + * If the hdfs file was valid, the memory associated with it will + * be freed at the end of this call, even if there was an I/O + * error. + */ +LIBHDFS_EXTERNAL +int hdfsCloseFile(hdfsFS fs, hdfsFile file); + +/** + * hdfsExists - Checks if a given path exsits on the filesystem + * @param fs The configured filesystem handle. + * @param path The path to look for + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsExists(hdfsFS fs, const char* path); + +/** + * hdfsSeek - Seek to given offset in file. + * This works only for files opened in read-only mode. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param desiredPos Offset into the file to seek into. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + +/** + * hdfsTell - Get the current offset in the file, in bytes. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Current offset, -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsTell(hdfsFS fs, hdfsFile file); + +/** + * hdfsRead - Read data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return On success, a positive number indicating how many bytes + * were read. + * On end-of-file, 0. + * On error, -1. Errno will be set to the error code. + * Just like the POSIX read function, hdfsRead will return -1 + * and set errno to EINTR if data is temporarily unavailable, + * but we are not yet at the end of the file. + */ +LIBHDFS_EXTERNAL +tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + +/** + * hdfsPread - Positional read of data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return See hdfsRead + */ +LIBHDFS_EXTERNAL +tSize hdfsPread( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length); + +/** + * hdfsWrite - Write data into an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The data. + * @param length The no. of bytes to write. + * @return Returns the number of bytes written, -1 on error. + */ +LIBHDFS_EXTERNAL +tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + +/** + * hdfsWrite - Flush the data. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsFlush(hdfsFS fs, hdfsFile file); + +/** + * hdfsHFlush - Flush out the data in client's user buffer. After the + * return of this call, new readers will see the data. + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ +LIBHDFS_EXTERNAL +int hdfsHFlush(hdfsFS fs, hdfsFile file); + +/** + * hdfsHSync - Similar to posix fsync, Flush out the data in client's + * user buffer. all the way to the disk device (but the disk may have + * it in its cache). + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ +LIBHDFS_EXTERNAL +int hdfsHSync(hdfsFS fs, hdfsFile file); + +/** + * hdfsAvailable - Number of bytes that can be read from this + * input stream without blocking. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns available bytes; -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsAvailable(hdfsFS fs, hdfsFile file); + +/** + * hdfsCopy - Copy file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + +/** + * hdfsMove - Move file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + +/** + * hdfsDelete - Delete file. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param recursive if path is a directory and set to + * non-zero, the directory is deleted else throws an exception. In + * case of a file the recursive argument is irrelevant. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsDelete(hdfsFS fs, const char* path, int recursive); + +/** + * hdfsRename - Rename file. + * @param fs The configured filesystem handle. + * @param oldPath The path of the source file. + * @param newPath The path of the destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath); + +/** + * hdfsGetWorkingDirectory - Get the current working directory for + * the given filesystem. + * @param fs The configured filesystem handle. + * @param buffer The user-buffer to copy path of cwd into. + * @param bufferSize The length of user-buffer. + * @return Returns buffer, NULL on error. + */ +LIBHDFS_EXTERNAL +char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize); + +/** + * hdfsSetWorkingDirectory - Set the working directory. All relative + * paths will be resolved relative to it. + * @param fs The configured filesystem handle. + * @param path The path of the new 'cwd'. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSetWorkingDirectory(hdfsFS fs, const char* path); + +/** + * hdfsCreateDirectory - Make the given file and all non-existent + * parents into directories. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsCreateDirectory(hdfsFS fs, const char* path); + +/** + * hdfsSetReplication - Set the replication of the specified + * file to the supplied value + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication); + +/** + * hdfsFileInfo - Information about a file/directory. + */ +typedef struct { + tObjectKind mKind; /* file or directory */ + char* mName; /* the name of the file */ + tTime mLastMod; /* the last modification time for the file in seconds */ + tOffset mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + tOffset mBlockSize; /* the block size for the file */ + char* mOwner; /* the owner of the file */ + char* mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + tTime mLastAccess; /* the last access time for the file in seconds */ +} hdfsFileInfo; + +/** + * hdfsListDirectory - Get list of files/directories for a given + * directory-path. hdfsFreeFileInfo should be called to deallocate memory. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @param numEntries Set to the number of files/directories in path. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error or empty directory. + * errno is set to non-zero on error or zero on success. + */ +LIBHDFS_EXTERNAL +hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries); + +/** + * hdfsGetPathInfo - Get information about a path as a (dynamically + * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be + * called when the pointer is no longer needed. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns a dynamically-allocated hdfsFileInfo object; + * NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path); + +/** + * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ +LIBHDFS_EXTERNAL +void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries); + +/** + * hdfsFileIsEncrypted: determine if a file is encrypted based on its + * hdfsFileInfo. + * @return -1 if there was an error (errno will be set), 0 if the file is + * not encrypted, 1 if the file is encrypted. + */ +LIBHDFS_EXTERNAL +int hdfsFileIsEncrypted(hdfsFileInfo* hdfsFileInfo); + +/** + * hdfsGetHosts - Get hostnames where a particular block (determined by + * pos & blocksize) of a file is stored. The last element in the array + * is NULL. Due to replication, a single block could be present on + * multiple hosts. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param start The start of the block. + * @param length The length of the block. + * @return Returns a dynamically-allocated 2-d array of blocks-hosts; + * NULL on error. + */ +LIBHDFS_EXTERNAL +char*** +hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length); + +/** + * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ +LIBHDFS_EXTERNAL +void hdfsFreeHosts(char*** blockHosts); + +/** + * hdfsGetDefaultBlockSize - Get the default blocksize. + * + * @param fs The configured filesystem handle. + * @deprecated Use hdfsGetDefaultBlockSizeAtPath instead. + * + * @return Returns the default blocksize, or -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetDefaultBlockSize(hdfsFS fs); + +/** + * hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the + * filesystem indicated by a given path. + * + * @param fs The configured filesystem handle. + * @param path The given path will be used to locate the actual + * filesystem. The full path does not have to exist. + * + * @return Returns the default blocksize, or -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char* path); + +/** + * hdfsGetCapacity - Return the raw capacity of the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the raw-capacity; -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetCapacity(hdfsFS fs); + +/** + * hdfsGetUsed - Return the total raw size of all files in the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the total-size; -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetUsed(hdfsFS fs); + +/** + * Change the user and/or group of a file or directory. + * + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param owner User string. Set to NULL for 'no change' + * @param group Group string. Set to NULL for 'no change' + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsChown( + hdfsFS fs, + const char* path, + const char* owner, + const char* group); + +/** + * hdfsChmod + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mode the bitmask to set it to + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsChmod(hdfsFS fs, const char* path, short mode); + +/** + * hdfsUtime + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mtime new modification time or -1 for no change + * @param atime new access time or -1 for no change + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + +/** + * Allocate a zero-copy options structure. + * + * You must free all options structures allocated with this function using + * hadoopRzOptionsFree. + * + * @return A zero-copy options structure, or NULL if one could + * not be allocated. If NULL is returned, errno will + * contain the error number. + */ +LIBHDFS_EXTERNAL +struct hadoopRzOptions* hadoopRzOptionsAlloc(void); + +/** + * Determine whether we should skip checksums in read0. + * + * @param opts The options structure. + * @param skip Nonzero to skip checksums sometimes; zero to always + * check them. + * + * @return 0 on success; -1 plus errno on failure. + */ +LIBHDFS_EXTERNAL +int hadoopRzOptionsSetSkipChecksum(struct hadoopRzOptions* opts, int skip); + +/** + * Set the ByteBufferPool to use with read0. + * + * @param opts The options structure. + * @param className If this is NULL, we will not use any + * ByteBufferPool. If this is non-NULL, it will be + * treated as the name of the pool class to use. + * For example, you can use + * ELASTIC_BYTE_BUFFER_POOL_CLASS. + * + * @return 0 if the ByteBufferPool class was found and + * instantiated; + * -1 plus errno otherwise. + */ +LIBHDFS_EXTERNAL +int hadoopRzOptionsSetByteBufferPool( + struct hadoopRzOptions* opts, + const char* className); + +/** + * Free a hadoopRzOptionsFree structure. + * + * @param opts The options structure to free. + * Any associated ByteBufferPool will also be freed. + */ +LIBHDFS_EXTERNAL +void hadoopRzOptionsFree(struct hadoopRzOptions* opts); + +/** + * Perform a byte buffer read. + * If possible, this will be a zero-copy (mmap) read. + * + * @param file The file to read from. + * @param opts An options structure created by hadoopRzOptionsAlloc. + * @param maxLength The maximum length to read. We may read fewer bytes + * than this length. + * + * @return On success, we will return a new hadoopRzBuffer. + * This buffer will continue to be valid and readable + * until it is released by readZeroBufferFree. Failure to + * release a buffer will lead to a memory leak. + * You can access the data within the hadoopRzBuffer with + * hadoopRzBufferGet. If you have reached EOF, the data + * within the hadoopRzBuffer will be NULL. You must still + * free hadoopRzBuffer instances containing NULL. + * + * On failure, we will return NULL plus an errno code. + * errno = EOPNOTSUPP indicates that we could not do a + * zero-copy read, and there was no ByteBufferPool + * supplied. + */ +LIBHDFS_EXTERNAL +struct hadoopRzBuffer* +hadoopReadZero(hdfsFile file, struct hadoopRzOptions* opts, int32_t maxLength); + +/** + * Determine the length of the buffer returned from readZero. + * + * @param buffer a buffer returned from readZero. + * @return the length of the buffer. + */ +LIBHDFS_EXTERNAL +int32_t hadoopRzBufferLength(const struct hadoopRzBuffer* buffer); + +/** + * Get a pointer to the raw buffer returned from readZero. + * + * To find out how many bytes this buffer contains, call + * hadoopRzBufferLength. + * + * @param buffer a buffer returned from readZero. + * @return a pointer to the start of the buffer. This will be + * NULL when end-of-file has been reached. + */ +LIBHDFS_EXTERNAL +const void* hadoopRzBufferGet(const struct hadoopRzBuffer* buffer); + +/** + * Release a buffer obtained through readZero. + * + * @param file The hdfs stream that created this buffer. This must be + * the same stream you called hadoopReadZero on. + * @param buffer The buffer to release. + */ +LIBHDFS_EXTERNAL +void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer* buffer); + +/** + * Get the last exception root cause that happened in the context of the + * current thread, i.e. the thread that called into libHDFS. + * + * The pointer returned by this function is guaranteed to be valid until + * the next call into libHDFS by the current thread. + * Users of this function should not free the pointer. + * + * A NULL will be returned if no exception information could be retrieved + * for the previous call. + * + * @return The root cause as a C-string. + */ +LIBHDFS_EXTERNAL +char* hdfsGetLastExceptionRootCause(); + +/** + * Get the last exception stack trace that happened in the context of the + * current thread, i.e. the thread that called into libHDFS. + * + * The pointer returned by this function is guaranteed to be valid until + * the next call into libHDFS by the current thread. + * Users of this function should not free the pointer. + * + * A NULL will be returned if no exception information could be retrieved + * for the previous call. + * + * @return The stack trace as a C-string. + */ +LIBHDFS_EXTERNAL +char* hdfsGetLastExceptionStackTrace(); + +#ifdef __cplusplus +} +#endif + +#undef LIBHDFS_EXTERNAL +#endif /*LIBHDFS_HDFS_H*/ + +/** + * vim: ts=4: sw=4: et + */