Skip to content

Commit

Permalink
Support spill in Streamingwindow addInput phase
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Mar 7, 2024
1 parent 09ba220 commit d7c682c
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 12 deletions.
3 changes: 1 addition & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -2134,8 +2134,7 @@ class WindowNode : public PlanNode {
// No partitioning keys means the whole input is one big partition. In this
// case, spilling is not helpful because we need to have a full partition in
// memory to produce results.
return !partitionKeys_.empty() && !inputsSorted_ &&
queryConfig.windowSpillEnabled();
return !partitionKeys_.empty() && queryConfig.windowSpillEnabled();
}

const RowTypePtr& inputType() const {
Expand Down
13 changes: 10 additions & 3 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ Spiller::Spiller(
spillConfig->maxSpillRunRows,
spillConfig->fileCreateConfig) {
VELOX_CHECK(
type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput,
type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput ||
type_ == Type::kStreamingWindowInput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -455,7 +456,8 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) {

bool Spiller::needSort() const {
return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild &&
type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput;
type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput &&
type_ != Type::kStreamingWindowInput;
}

void Spiller::spill() {
Expand Down Expand Up @@ -490,7 +492,10 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {

void Spiller::spill(std::vector<char*>& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
VELOX_CHECK(
type_ == Type::kOrderByOutput || type_ == Type::kStreamingWindowInput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK(!rows.empty());

markAllPartitionsSpilled();
Expand Down Expand Up @@ -658,6 +663,8 @@ std::string Spiller::typeName(Type type) {
return "AGGREGATE_INPUT";
case Type::kAggregateOutput:
return "AGGREGATE_OUTPUT";
case Type::kStreamingWindowInput:
return "STREAMINGWINDOW_INPUT";
default:
VELOX_UNREACHABLE("Unknown type: {}", static_cast<int>(type));
}
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ class Spiller {
kOrderByInput = 4,
// Used for order by output processing stage.
kOrderByOutput = 5,
// Used for streaming window input processing stage.
kStreamingWindowInput = 6,
// Number of spiller types.
kNumTypes = 6,
kNumTypes = 7,
};

static std::string typeName(Type);
Expand Down
150 changes: 148 additions & 2 deletions velox/exec/StreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/StreamingWindowBuild.h"
#include "velox/exec/MemoryReclaimer.h"

namespace facebook::velox::exec {

Expand All @@ -23,9 +24,108 @@ StreamingWindowBuild::StreamingWindowBuild(
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection),
pool_(pool) {
allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size());
allKeyInfo_.insert(
allKeyInfo_.cend(), partitionKeyInfo_.begin(), partitionKeyInfo_.end());
allKeyInfo_.insert(
allKeyInfo_.cend(), sortKeyInfo_.begin(), sortKeyInfo_.end());
}

void StreamingWindowBuild::ensureInputFits(const RowVectorPtr& input) {
if (spillConfig_ == nullptr) {
// Spilling is disabled.
return;
}

if (data_->numRows() == 0) {
// Nothing to spill.
return;
}

// Test-only spill path.
if (inputRows_.size() > 0 && testingTriggerSpill()) {
spill();
return;
}

auto [freeRows, outOfLineFreeBytes] = data_->freeSpace();
const auto outOfLineBytes =
data_->stringAllocator().retainedSize() - outOfLineFreeBytes;
const auto outOfLineBytesPerRow = outOfLineBytes / data_->numRows();

const auto currentUsage = data_->pool()->currentBytes();
const auto minReservationBytes =
currentUsage * spillConfig_->minSpillableReservationPct / 100;
const auto availableReservationBytes = data_->pool()->availableReservation();
const auto incrementBytes =
data_->sizeIncrement(input->size(), outOfLineBytesPerRow * input->size());

// First to check if we have sufficient minimal memory reservation.
if (availableReservationBytes >= minReservationBytes) {
if ((freeRows > input->size()) &&
(outOfLineBytes == 0 ||
outOfLineFreeBytes >= outOfLineBytesPerRow * input->size())) {
// Enough free rows for input rows and enough variable length free space.
return;
}
}

// Check if we can increase reservation. The increment is the largest of twice
// the maximum increment from this input and 'spillableReservationGrowthPct_'
// of the current memory usage.
const auto targetIncrementBytes = std::max<int64_t>(
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (data_->pool()->maybeReserve(targetIncrementBytes)) {
return;
}
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << data_->pool()->name()
<< ", usage: " << succinctBytes(data_->pool()->currentBytes())
<< ", reservation: "
<< succinctBytes(data_->pool()->reservedBytes());
}

void StreamingWindowBuild::setupSpiller() {
spillers_.push_back(std::make_unique<Spiller>(
Spiller::Type::kStreamingWindowInput,
data_.get(),
inputType_,
spillConfig_));
}

void StreamingWindowBuild::spill() {
if (spillers_.size() < currentSpilledPartition_ + 1) {
setupSpiller();
}

spillers_[currentSpilledPartition_]->spill(inputRows_);
inputRows_.clear();
}

void StreamingWindowBuild::buildNextPartition() {
if (currentSpilledPartition_ < spillers_.size() &&
spillers_[currentSpilledPartition_] != nullptr) {
partitionStartRows_.push_back(sortedRows_.size());

// Spill remaining data to avoid running out of memory while
// sort-merging
// spilled data.
spill();

auto spillPartition = spillers_[currentSpilledPartition_]->finishSpill();
merges_.push_back(spillPartition.createOrderedReader(pool_));
return;
}

merges_.push_back(nullptr);
spillers_.push_back(nullptr);
partitionStartRows_.push_back(sortedRows_.size());
sortedRows_.insert(sortedRows_.end(), inputRows_.begin(), inputRows_.end());
inputRows_.clear();
Expand All @@ -36,6 +136,8 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

ensureInputFits(input);

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

Expand All @@ -46,9 +148,11 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) {
if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
buildNextPartition();
currentSpilledPartition_++;
}

inputRows_.push_back(newRow);

previousRow_ = newRow;
}
}
Expand All @@ -61,10 +165,34 @@ void StreamingWindowBuild::noMoreInput() {
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
currentPartition_++;

if (currentPartition_ < merges_.size() &&
merges_[currentPartition_] != nullptr) {
loadNextPartitionFromSpill();
VELOX_CHECK(!spilledSortedRows_.empty(), "No window partitions available")

// Currently, when the data of the same window partition is spilled multiple
// times, the order of the data loaded here is incorrect. For example, the
// data spilled for the first time is 1, 2, 3. The second time it is spilled
// is 4, 5. The data obtained here is 4, 5, 1, 2, 3. Therefore, a sort is
// added here to sort the data.
std::sort(
spilledSortedRows_.begin(),
spilledSortedRows_.end(),
[this](const char* leftRow, const char* rightRow) {
return compareRowsWithKeys(leftRow, rightRow, allKeyInfo_);
});

auto partition =
folly::Range(spilledSortedRows_.data(), spilledSortedRows_.size());
return std::make_unique<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_);
}

VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

currentPartition_++;
VELOX_CHECK_LE(
currentPartition_,
partitionStartRows_.size() - 2,
Expand Down Expand Up @@ -93,6 +221,24 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
data_.get(), partition, inputColumns_, sortKeyInfo_);
}

void StreamingWindowBuild::loadNextPartitionFromSpill() {
spilledSortedRows_.clear();
data_->clear();

while (auto next = merges_[currentPartition_]->next()) {
if (next == nullptr) {
break;
}

auto* newRow = data_->newRow();
for (auto i = 0; i < inputChannels_.size(); ++i) {
data_->store(next->decoded(i), next->currentIndex(), newRow, i);
}
spilledSortedRows_.push_back(newRow);
next->pop();
}
}

bool StreamingWindowBuild::hasNextPartition() {
return partitionStartRows_.size() > 0 &&
currentPartition_ < int(partitionStartRows_.size() - 2);
Expand Down
46 changes: 42 additions & 4 deletions velox/exec/StreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "velox/exec/Spiller.h"
#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {
Expand All @@ -34,12 +35,18 @@ class StreamingWindowBuild : public WindowBuild {

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}
void spill() override;

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
if (spillers_.size() == 0) {
return std::nullopt;
}

common::SpillStats stats;
for (vector_size_t i = 0; i < spillers_.size(); i++) {
stats += spillers_[i]->stats();
}
return stats;
}

void noMoreInput() override;
Expand All @@ -56,13 +63,24 @@ class StreamingWindowBuild : public WindowBuild {
}

private:
void ensureInputFits(const RowVectorPtr& input);

void setupSpiller();

void buildNextPartition();

// Reads next partition from spilled data into 'data_' and
// 'spilledSortedRows_'.
void loadNextPartitionFromSpill();

// Vector of pointers to each input row in the data_ RowContainer.
// Rows are erased from data_ when they are output from the
// Window operator.
std::vector<char*> sortedRows_;

// Store the spilled sorted rows.
std::vector<char*> spilledSortedRows_;

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

Expand All @@ -71,12 +89,32 @@ class StreamingWindowBuild : public WindowBuild {
// partitions.
std::vector<vector_size_t> partitionStartRows_;

// Record the rows in each window partition.
vector_size_t partitionRows_ = -1;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to construct WindowPartitions
// during resetPartition.
vector_size_t currentPartition_ = -1;

// Current spilled partition index. Used to construct spillers_ and merges_.
vector_size_t currentSpilledPartition_ = 0;

// Spiller for contents of the 'data_'.
std::vector<std::unique_ptr<Spiller>> spillers_;

// Used to sort-merge spilled data.
std::vector<std::unique_ptr<TreeOfLosers<SpillMergeStream>>> merges_;

// allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_).
// It is used to perform a full sorting of the input rows to be able to
// separate partitions and sort the rows in it. The rows are output in
// this order by the operator.
std::vector<std::pair<column_index_t, core::SortOrder>> allKeyInfo_;

memory::MemoryPool* const pool_;
};

} // namespace facebook::velox::exec
Loading

0 comments on commit d7c682c

Please sign in to comment.