Skip to content

Commit

Permalink
delete the if else check
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 15, 2024
1 parent d909c2a commit 194a8ae
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 128 deletions.
42 changes: 0 additions & 42 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,10 @@

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#endif

namespace facebook::velox {

#ifdef VELOX_ENABLE_HDFS
HdfsReadFile::HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
const std::string_view path)
: driver_(driver), hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
auto error = fmt::format(
"FileNotFoundException: Path {} does not exist.", filePath_);
auto errMsg = fmt::format(
"Unable to get file path info for file: {}. got error: {}",
filePath_,
error);
if (error.find("FileNotFoundException") != std::string::npos) {
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
}
VELOX_FAIL(errMsg);
}
}
#endif

#ifdef VELOX_ENABLE_HDFS3
HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
: hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
Expand All @@ -61,32 +35,16 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
VELOX_FAIL(errMsg);
}
}
#endif

HdfsReadFile::~HdfsReadFile() {
#ifdef VELOX_ENABLE_HDFS
// should call hdfsFreeFileInfo to avoid memory leak
if (fileInfo_) {
driver_->FreeFileInfo(fileInfo_, 1);
}
#endif

#ifdef VELOX_ENABLE_HDFS3
hdfsFreeFileInfo(fileInfo_, 1);
#endif
}

void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
if (!file_->handle_) {
#ifdef VELOX_ENABLE_HDFS
file_->open(driver_, hdfsClient_, filePath_);
#endif

#ifdef VELOX_ENABLE_HDFS3
file_->open(hdfsClient_, filePath_);
#endif
}
file_->seek(offset);
uint64_t totalBytesRead = 0;
Expand Down
77 changes: 0 additions & 77 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,9 @@
*/

#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#ifdef VELOX_ENABLE_HDFS
#include "velox/external/hdfs/ArrowHdfsInternal.h"
#endif

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#endif

namespace facebook::velox {
#ifdef VELOX_ENABLE_HDFS
HdfsWriteFile::HdfsWriteFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfsClient,
std::string_view path,
int bufferSize,
short replication,
int blockSize)
: driver_(driver), hdfsClient_(hdfsClient), filePath_(path) {
auto pos = filePath_.rfind("/");
auto parentDir = filePath_.substr(0, pos + 1);
if (driver_->Exists(hdfsClient_, parentDir.c_str()) == -1) {
driver_->MakeDirectory(hdfsClient_, parentDir.c_str());
}

hdfsFile_ = driver_->OpenFile(
hdfsClient_,
filePath_.c_str(),
O_WRONLY,
bufferSize,
replication,
blockSize);
VELOX_CHECK_NOT_NULL(
hdfsFile_,
"Failed to open hdfs file: {}, with error: {}",
filePath_,
driver_->GetLastExceptionRootCause());
}
#endif

#ifdef VELOX_ENABLE_HDFS3
HdfsWriteFile::HdfsWriteFile(
hdfsFS hdfsClient,
std::string_view path,
Expand All @@ -81,7 +44,6 @@ HdfsWriteFile::HdfsWriteFile(
filePath_,
std::string(hdfsGetLastError()));
}
#endif

HdfsWriteFile::~HdfsWriteFile() {
if (hdfsFile_) {
Expand All @@ -90,23 +52,12 @@ HdfsWriteFile::~HdfsWriteFile() {
}

void HdfsWriteFile::close() {
#ifdef VELOX_ENABLE_HDFS
int success = driver_->CloseFile(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success,
0,
"Failed to close hdfs file: {}",
driver_->GetLastExceptionRootCause());
#endif

#ifdef VELOX_ENABLE_HDFS3
int success = hdfsCloseFile(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success,
0,
"Failed to close hdfs file: {}",
std::string(hdfsGetLastError()));
#endif

hdfsFile_ = nullptr;
}
Expand All @@ -116,17 +67,9 @@ void HdfsWriteFile::flush() {
hdfsFile_,
"Cannot flush HDFS file because file handle is null, file path: {}",
filePath_);
#ifdef VELOX_ENABLE_HDFS
int success = driver_->Flush(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success, 0, "Hdfs flush error: {}", driver_->GetLastExceptionRootCause());
#endif

#ifdef VELOX_ENABLE_HDFS3
int success = hdfsFlush(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError()));
#endif
}

void HdfsWriteFile::append(std::string_view data) {
Expand All @@ -137,41 +80,21 @@ void HdfsWriteFile::append(std::string_view data) {
hdfsFile_,
"Cannot append to HDFS file because file handle is null, file path: {}",
filePath_);
#ifdef VELOX_ENABLE_HDFS
int64_t totalWrittenBytes = driver_->Write(
hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
VELOX_CHECK_EQ(
totalWrittenBytes,
data.size(),
"Write failure in HDFSWriteFile::append {}",
driver_->GetLastExceptionRootCause());
#endif

#ifdef VELOX_ENABLE_HDFS3
int64_t totalWrittenBytes =
hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
VELOX_CHECK_EQ(
totalWrittenBytes,
data.size(),
"Write failure in HDFSWriteFile::append {}",
std::string(hdfsGetLastError()));
#endif
}

uint64_t HdfsWriteFile::size() const {
#ifdef VELOX_ENABLE_HDFS
auto fileInfo = driver_->GetPathInfo(hdfsClient_, filePath_.c_str());
uint64_t size = fileInfo->mSize;
// should call hdfsFreeFileInfo to avoid memory leak
driver_->FreeFileInfo(fileInfo, 1);
#endif

#ifdef VELOX_ENABLE_HDFS3
auto fileInfo = hdfsGetPathInfo(hdfsClient_, filePath_.c_str());
uint64_t size = fileInfo->mSize;
// should call hdfsFreeFileInfo to avoid memory leak
hdfsFreeFileInfo(fileInfo, 1);
#endif

return size;
}
Expand Down
6 changes: 0 additions & 6 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@
#pragma once

#include "velox/common/file/File.h"
#ifdef VELOX_ENABLE_HDFS
#include "velox/external/hdfs/HdfsInternal.h"
#endif

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#endif

namespace facebook::velox {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#if defined(VELOX_ENABLE_HDFS3) || defined(VELOX_ENABLE_HDFS)
#ifdef VELOX_ENABLE_HDFS3
#include "folly/concurrency/ConcurrentHashMap.h"

#include "velox/common/config/Config.h"
Expand All @@ -25,7 +25,7 @@

namespace facebook::velox::filesystems {

#if defined(VELOX_ENABLE_HDFS3) || defined(VELOX_ENABLE_HDFS)
#ifdef VELOX_ENABLE_HDFS3
std::mutex mtx;

std::function<std::shared_ptr<
Expand Down Expand Up @@ -96,7 +96,7 @@ hdfsWriteFileSinkGenerator() {
#endif

void registerHdfsFileSystem() {
#if defined(VELOX_ENABLE_HDFS3) || defined(VELOX_ENABLE_HDFS)
#ifdef VELOX_ENABLE_HDFS3
registerFileSystem(HdfsFileSystem::isHdfsFile, hdfsFileSystemGenerator());
dwio::common::FileSink::registerFactory(hdfsWriteFileSinkGenerator());
#endif
Expand Down

0 comments on commit 194a8ae

Please sign in to comment.