diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 38d4f1f26fe9e..21958b02abe5e 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -2134,8 +2134,7 @@ class WindowNode : public PlanNode { // No partitioning keys means the whole input is one big partition. In this // case, spilling is not helpful because we need to have a full partition in // memory to produce results. - return !partitionKeys_.empty() && !inputsSorted_ && - queryConfig.windowSpillEnabled(); + return !partitionKeys_.empty() && queryConfig.windowSpillEnabled(); } const RowTypePtr& inputType() const { diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 87b7e3bce07e5..b29af36bb330a 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -85,7 +85,8 @@ Spiller::Spiller( spillConfig->maxSpillRunRows, spillConfig->fileCreateConfig) { VELOX_CHECK( - type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, + type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput || + type_ == Type::kStreamingWindowInput, "Unexpected spiller type: {}", typeName(type_)); VELOX_CHECK_EQ(state_.maxPartitions(), 1); @@ -455,7 +456,8 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) { bool Spiller::needSort() const { return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild && - type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput; + type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput && + type_ != Type::kStreamingWindowInput; } void Spiller::spill() { @@ -490,7 +492,10 @@ void Spiller::spill(const RowContainerIterator* startRowIter) { void Spiller::spill(std::vector& rows) { CHECK_NOT_FINALIZED(); - VELOX_CHECK_EQ(type_, Type::kOrderByOutput); + VELOX_CHECK( + type_ == Type::kOrderByOutput || type_ == Type::kStreamingWindowInput, + "Unexpected spiller type: {}", + typeName(type_)); VELOX_CHECK(!rows.empty()); markAllPartitionsSpilled(); @@ -658,6 +663,8 @@ std::string Spiller::typeName(Type type) { return "AGGREGATE_INPUT"; case Type::kAggregateOutput: return "AGGREGATE_OUTPUT"; + case Type::kStreamingWindowInput: + return "STREAMINGWINDOW_INPUT"; default: VELOX_UNREACHABLE("Unknown type: {}", static_cast(type)); } diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 3c4c855d0a781..88c259b662dd2 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -39,8 +39,10 @@ class Spiller { kOrderByInput = 4, // Used for order by output processing stage. kOrderByOutput = 5, + // Used for streaming window input processing stage. + kStreamingWindowInput = 6, // Number of spiller types. - kNumTypes = 6, + kNumTypes = 7, }; static std::string typeName(Type); diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp index cab3e17030189..8c450cc083a59 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/StreamingWindowBuild.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/StreamingWindowBuild.h" +#include "velox/exec/MemoryReclaimer.h" namespace facebook::velox::exec { @@ -23,9 +24,108 @@ StreamingWindowBuild::StreamingWindowBuild( velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) - : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection), + pool_(pool) { + allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size()); + allKeyInfo_.insert( + allKeyInfo_.cend(), partitionKeyInfo_.begin(), partitionKeyInfo_.end()); + allKeyInfo_.insert( + allKeyInfo_.cend(), sortKeyInfo_.begin(), sortKeyInfo_.end()); +} + +void StreamingWindowBuild::ensureInputFits(const RowVectorPtr& input) { + if (spillConfig_ == nullptr) { + // Spilling is disabled. + return; + } + + if (data_->numRows() == 0) { + // Nothing to spill. + return; + } + + // Test-only spill path. + if (inputRows_.size() > 0 && testingTriggerSpill()) { + spill(); + return; + } + + auto [freeRows, outOfLineFreeBytes] = data_->freeSpace(); + const auto outOfLineBytes = + data_->stringAllocator().retainedSize() - outOfLineFreeBytes; + const auto outOfLineBytesPerRow = outOfLineBytes / data_->numRows(); + + const auto currentUsage = data_->pool()->currentBytes(); + const auto minReservationBytes = + currentUsage * spillConfig_->minSpillableReservationPct / 100; + const auto availableReservationBytes = data_->pool()->availableReservation(); + const auto incrementBytes = + data_->sizeIncrement(input->size(), outOfLineBytesPerRow * input->size()); + + // First to check if we have sufficient minimal memory reservation. + if (availableReservationBytes >= minReservationBytes) { + if ((freeRows > input->size()) && + (outOfLineBytes == 0 || + outOfLineFreeBytes >= outOfLineBytesPerRow * input->size())) { + // Enough free rows for input rows and enough variable length free space. + return; + } + } + + // Check if we can increase reservation. The increment is the largest of twice + // the maximum increment from this input and 'spillableReservationGrowthPct_' + // of the current memory usage. + const auto targetIncrementBytes = std::max( + incrementBytes * 2, + currentUsage * spillConfig_->spillableReservationGrowthPct / 100); + { + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); + if (data_->pool()->maybeReserve(targetIncrementBytes)) { + return; + } + } + + LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes) + << " for memory pool " << data_->pool()->name() + << ", usage: " << succinctBytes(data_->pool()->currentBytes()) + << ", reservation: " + << succinctBytes(data_->pool()->reservedBytes()); +} + +void StreamingWindowBuild::setupSpiller() { + spillers_.push_back(std::make_unique( + Spiller::Type::kStreamingWindowInput, + data_.get(), + inputType_, + spillConfig_)); +} + +void StreamingWindowBuild::spill() { + if (spillers_.size() < currentSpilledPartition_ + 1) { + setupSpiller(); + } + + spillers_[currentSpilledPartition_]->spill(inputRows_); + inputRows_.clear(); +} void StreamingWindowBuild::buildNextPartition() { + if (currentSpilledPartition_ < spillers_.size() && + spillers_[currentSpilledPartition_] != nullptr) { + partitionStartRows_.push_back(sortedRows_.size()); + + // Spill remaining data to avoid running out of memory while + // sort-merging + // spilled data. + spill(); + + auto spillPartition = spillers_[currentSpilledPartition_]->finishSpill(); + merges_.push_back(spillPartition.createOrderedReader(pool_)); + return; + } + + merges_.push_back(nullptr); + spillers_.push_back(nullptr); partitionStartRows_.push_back(sortedRows_.size()); sortedRows_.insert(sortedRows_.end(), inputRows_.begin(), inputRows_.end()); inputRows_.clear(); @@ -36,6 +136,8 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) { decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); } + ensureInputFits(input); + for (auto row = 0; row < input->size(); ++row) { char* newRow = data_->newRow(); @@ -46,9 +148,11 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) { if (previousRow_ != nullptr && compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { buildNextPartition(); + currentSpilledPartition_++; } inputRows_.push_back(newRow); + previousRow_ = newRow; } } @@ -61,10 +165,34 @@ void StreamingWindowBuild::noMoreInput() { } std::unique_ptr StreamingWindowBuild::nextPartition() { + currentPartition_++; + + if (currentPartition_ < merges_.size() && + merges_[currentPartition_] != nullptr) { + loadNextPartitionFromSpill(); + VELOX_CHECK(!spilledSortedRows_.empty(), "No window partitions available") + + // Currently, when the data of the same window partition is spilled multiple + // times, the order of the data loaded here is incorrect. For example, the + // data spilled for the first time is 1, 2, 3. The second time it is spilled + // is 4, 5. The data obtained here is 4, 5, 1, 2, 3. Therefore, a sort is + // added here to sort the data. + std::sort( + spilledSortedRows_.begin(), + spilledSortedRows_.end(), + [this](const char* leftRow, const char* rightRow) { + return compareRowsWithKeys(leftRow, rightRow, allKeyInfo_); + }); + + auto partition = + folly::Range(spilledSortedRows_.data(), spilledSortedRows_.size()); + return std::make_unique( + data_.get(), partition, inputColumns_, sortKeyInfo_); + } + VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") - currentPartition_++; VELOX_CHECK_LE( currentPartition_, partitionStartRows_.size() - 2, @@ -93,6 +221,24 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { data_.get(), partition, inputColumns_, sortKeyInfo_); } +void StreamingWindowBuild::loadNextPartitionFromSpill() { + spilledSortedRows_.clear(); + data_->clear(); + + while (auto next = merges_[currentPartition_]->next()) { + if (next == nullptr) { + break; + } + + auto* newRow = data_->newRow(); + for (auto i = 0; i < inputChannels_.size(); ++i) { + data_->store(next->decoded(i), next->currentIndex(), newRow, i); + } + spilledSortedRows_.push_back(newRow); + next->pop(); + } +} + bool StreamingWindowBuild::hasNextPartition() { return partitionStartRows_.size() > 0 && currentPartition_ < int(partitionStartRows_.size() - 2); diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h index 2573f1f7e8d56..7d36a9ce39a72 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/StreamingWindowBuild.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/exec/Spiller.h" #include "velox/exec/WindowBuild.h" namespace facebook::velox::exec { @@ -34,12 +35,18 @@ class StreamingWindowBuild : public WindowBuild { void addInput(RowVectorPtr input) override; - void spill() override { - VELOX_UNREACHABLE(); - } + void spill() override; std::optional spilledStats() const override { - return std::nullopt; + if (spillers_.size() == 0) { + return std::nullopt; + } + + common::SpillStats stats; + for (vector_size_t i = 0; i < spillers_.size(); i++) { + stats += spillers_[i]->stats(); + } + return stats; } void noMoreInput() override; @@ -56,13 +63,23 @@ class StreamingWindowBuild : public WindowBuild { } private: + void ensureInputFits(const RowVectorPtr& input); + + void setupSpiller(); + void buildNextPartition(); + // Reads next partition from spilled data into 'data_' and 'sortedRows_'. + void loadNextPartitionFromSpill(); + // Vector of pointers to each input row in the data_ RowContainer. // Rows are erased from data_ when they are output from the // Window operator. std::vector sortedRows_; + // Store the spilled sorted rows. + std::vector spilledSortedRows_; + // Holds input rows within the current partition. std::vector inputRows_; @@ -71,12 +88,32 @@ class StreamingWindowBuild : public WindowBuild { // partitions. std::vector partitionStartRows_; + // Record the rows in each window partition. + vector_size_t partitionRows_ = -1; + // Used to compare rows based on partitionKeys. char* previousRow_ = nullptr; // Current partition being output. Used to construct WindowPartitions // during resetPartition. vector_size_t currentPartition_ = -1; + + // Current spilled partition index. Used to construct spillers_ and merges_. + vector_size_t currentSpilledPartition_ = 0; + + // Spiller for contents of the 'data_'. + std::vector> spillers_; + + // Used to sort-merge spilled data. + std::vector>> merges_; + + // allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_). + // It is used to perform a full sorting of the input rows to be able to + // separate partitions and sort the rows in it. The rows are output in + // this order by the operator. + std::vector> allKeyInfo_; + + memory::MemoryPool* const pool_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index cfc45ee5afd65..6b9092b89732e 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -79,6 +79,50 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, streamingWindowSpill) { + const vector_size_t size = 1'000; + auto data = makeRowVector( + {"d", "p", "s"}, + { + // Payload. + makeFlatVector(size, [](auto row) { return row; }), + // Partition key. + makeFlatVector(size, [](auto row) { return row % 11; }), + // Sorting key. + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + core::PlanNodeId windowId; + auto plan = + PlanBuilder() + .values(split(data, 10)) + .orderBy({"p", "s"}, false) + .streamingWindow({"row_number() over (partition by p order by s)"}) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults( + "SELECT *, row_number() over (partition by p order by s) FROM tmp"); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(windowId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); +} + TEST_F(WindowTest, missingFunctionSignature) { auto input = {makeRowVector({ makeFlatVector({1, 2, 3}),