Skip to content

Commit

Permalink
Avoid duplicate reading for small parquet files (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 authored and zhejiangxiaomai committed Jul 4, 2023
1 parent 482e4b3 commit 43697df
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
8 changes: 8 additions & 0 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ class BufferedInput {
return std::make_unique<BufferedInput>(input_, pool_);
}

std::unique_ptr<SeekableInputStream> readFile(
uint64_t length,
LogType logType) {
enqueue({0, length});
load(logType);
return readBuffer(0, length);
}

const std::shared_ptr<ReadFile>& getReadFile() const {
return input_->getReadFile();
}
Expand Down
41 changes: 29 additions & 12 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ ReaderBase::ReaderBase(
}

void ReaderBase::loadFileMetaData() {
bool preloadFile_ = fileLength_ <= filePreloadThreshold_;
preloadFile_ = fileLength_ <= filePreloadThreshold_ ||
fileLength_ <= directorySizeGuess_;
uint64_t readSize =
preloadFile_ ? fileLength_ : std::min(fileLength_, directorySizeGuess_);

auto stream = input_->read(
fileLength_ - readSize, readSize, dwio::common::LogType::FOOTER);
std::unique_ptr<dwio::common::SeekableInputStream> stream = nullptr;
if (preloadFile_) {
stream = input_->readFile(fileLength_, dwio::common::LogType::FOOTER);
} else {
stream = input_->read(
fileLength_ - readSize, readSize, dwio::common::LogType::FOOTER);
}

std::vector<char> copy(readSize);
const char* bufferStart = nullptr;
Expand Down Expand Up @@ -465,19 +471,30 @@ void ReaderBase::scheduleRowGroups(
currentGroup + 1 < rowGroupIds.size() ? rowGroupIds[currentGroup + 1] : 0;
auto input = inputs_[thisGroup].get();
if (!input) {
auto newInput = input_->clone();
reader.enqueueRowGroup(thisGroup, *newInput);
newInput->load(dwio::common::LogType::STRIPE);
inputs_[thisGroup] = std::move(newInput);
if (preloadFile_) {
// Read data from buffer directly.
reader.enqueueRowGroup(thisGroup, *input_);
inputs_[thisGroup] = input_;
} else {
auto newInput = input_->clone();
reader.enqueueRowGroup(thisGroup, *newInput);
newInput->load(dwio::common::LogType::STRIPE);
inputs_[thisGroup] = std::move(newInput);
}
}
for (auto counter = 0; counter < FLAGS_parquet_prefetch_rowgroups;
++counter) {
if (nextGroup) {
if (inputs_.count(nextGroup) != 0) {
auto newInput = input_->clone();
reader.enqueueRowGroup(nextGroup, *newInput);
newInput->load(dwio::common::LogType::STRIPE);
inputs_[nextGroup] = std::move(newInput);
if (inputs_.count(nextGroup) == 0) {
if (preloadFile_) {
reader.enqueueRowGroup(nextGroup, *input_);
inputs_[nextGroup] = input_;
} else {
auto newInput = input_->clone();
reader.enqueueRowGroup(nextGroup, *newInput);
newInput->load(dwio::common::LogType::STRIPE);
inputs_[nextGroup] = std::move(newInput);
}
}
} else {
break;
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/parquet/reader/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,18 @@ class ReaderBase {
const uint64_t directorySizeGuess_;
const uint64_t filePreloadThreshold_;
const dwio::common::ReaderOptions& options_;
std::unique_ptr<velox::dwio::common::BufferedInput> input_;
std::shared_ptr<velox::dwio::common::BufferedInput> input_;
uint64_t fileLength_;
std::unique_ptr<thrift::FileMetaData> fileMetaData_;
RowTypePtr schema_;
std::shared_ptr<const dwio::common::TypeWithId> schemaWithId_;

const bool binaryAsString = false;

bool preloadFile_ = false;

// Map from row group index to pre-created loading BufferedInput.
std::unordered_map<uint32_t, std::unique_ptr<dwio::common::BufferedInput>>
std::unordered_map<uint32_t, std::shared_ptr<dwio::common::BufferedInput>>
inputs_;
};

Expand Down

0 comments on commit 43697df

Please sign in to comment.