From 8657ebf217cc5c0be82808844cc7b3dd27374c65 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Mon, 8 Apr 2024 18:49:45 +0800 Subject: [PATCH] Add RowsStreamingWindowBuild to avoid OOM in Window operator --- velox/exec/AggregateWindow.cpp | 3 +- velox/exec/CMakeLists.txt | 2 + velox/exec/RowsStreamingWindowBuild.cpp | 94 +++++++++++++++++++++ velox/exec/RowsStreamingWindowBuild.h | 80 ++++++++++++++++++ velox/exec/RowsStreamingWindowPartition.cpp | 58 +++++++++++++ velox/exec/RowsStreamingWindowPartition.h | 89 +++++++++++++++++++ velox/exec/SortWindowBuild.cpp | 6 +- velox/exec/SortWindowBuild.h | 2 +- velox/exec/StreamingWindowBuild.cpp | 4 +- velox/exec/StreamingWindowBuild.h | 2 +- velox/exec/Window.cpp | 62 +++++++++++++- velox/exec/Window.h | 4 +- velox/exec/WindowBuild.h | 9 +- velox/exec/WindowFunction.cpp | 14 ++- velox/exec/WindowFunction.h | 23 ++++- velox/exec/WindowPartition.cpp | 6 +- velox/exec/WindowPartition.h | 30 ++++++- velox/exec/tests/WindowTest.cpp | 68 +++++++++++++++ velox/functions/lib/window/Rank.cpp | 21 +++-- velox/functions/lib/window/RowNumber.cpp | 3 +- 20 files changed, 554 insertions(+), 26 deletions(-) create mode 100644 velox/exec/RowsStreamingWindowBuild.cpp create mode 100644 velox/exec/RowsStreamingWindowBuild.h create mode 100644 velox/exec/RowsStreamingWindowPartition.cpp create mode 100644 velox/exec/RowsStreamingWindowPartition.h diff --git a/velox/exec/AggregateWindow.cpp b/velox/exec/AggregateWindow.cpp index cb32bd0779c3..8a9b3e0f2429 100644 --- a/velox/exec/AggregateWindow.cpp +++ b/velox/exec/AggregateWindow.cpp @@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) { pool, stringAllocator, config); - }); + }, + {exec::ProcessingUnit::kRows, false}); } } } // namespace facebook::velox::exec diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index dc89f71dbdc5..b4540e6c80c3 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -62,6 +62,8 @@ add_library( PlanNodeStats.cpp PrefixSort.cpp ProbeOperatorState.cpp + RowsStreamingWindowBuild.cpp + RowsStreamingWindowPartition.cpp RowContainer.cpp RowNumber.cpp SortBuffer.cpp diff --git a/velox/exec/RowsStreamingWindowBuild.cpp b/velox/exec/RowsStreamingWindowBuild.cpp new file mode 100644 index 000000000000..64905c51abc4 --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/RowsStreamingWindowBuild.h" +#include "velox/exec/RowsStreamingWindowPartition.h" + +namespace facebook::velox::exec { + +RowsStreamingWindowBuild::RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection) + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + +void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) { + if (windowPartitions_.size() <= inputCurrentPartition_) { + windowPartitions_.push_back(std::make_shared( + data_.get(), + folly::Range(nullptr, nullptr), + inversedInputChannels_, + sortKeyInfo_)); + } + + windowPartitions_[inputCurrentPartition_]->addNewRows(inputRows_); + + if (isFinished) { + windowPartitions_[inputCurrentPartition_]->setInputRowsFinished(); + inputCurrentPartition_++; + } + + inputRows_.clear(); +} + +void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { + for (auto i = 0; i < inputChannels_.size(); ++i) { + decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); + } + + for (auto row = 0; row < input->size(); ++row) { + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + + if (previousRow_ != nullptr && + compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + buildNextInputOrPartition(true); + } + + // Wait for the peers to be ready in single partition; these peers are the + // rows that have identical values in the ORDER BY clause. + if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_ && + compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) { + buildNextInputOrPartition(false); + } + + inputRows_.push_back(newRow); + previousRow_ = newRow; + } +} + +void RowsStreamingWindowBuild::noMoreInput() { + buildNextInputOrPartition(true); +} + +std::shared_ptr RowsStreamingWindowBuild::nextPartition() { + if (outputCurrentPartition_ > 0) { + windowPartitions_[outputCurrentPartition_].reset(); + } + + return windowPartitions_[++outputCurrentPartition_]; +} + +bool RowsStreamingWindowBuild::hasNextPartition() { + return windowPartitions_.size() > 0 && + outputCurrentPartition_ <= int(windowPartitions_.size() - 2); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowsStreamingWindowBuild.h b/velox/exec/RowsStreamingWindowBuild.h new file mode 100644 index 000000000000..2eb5d4e78a1d --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of +/// processing window functions as rows arrive within a single partition, +/// without the need to wait for the entire partition to be ready. This approach +/// can significantly reduce memory usage, especially when a single partition +/// contains a large amount of data. It is particularly suited for optimizing +/// rank and row_number functions, as well as aggregate window functions with a +/// default frame. +class RowsStreamingWindowBuild : public WindowBuild { + public: + RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection); + + void addInput(RowVectorPtr input) override; + + void spill() override { + VELOX_UNREACHABLE(); + } + + std::optional spilledStats() const override { + return std::nullopt; + } + + void noMoreInput() override; + + bool hasNextPartition() override; + + std::shared_ptr nextPartition() override; + + bool needsInput() override { + // No partitions are available or the currentPartition is the last available + // one, so can consume input rows. + return windowPartitions_.size() == 0 || + outputCurrentPartition_ == windowPartitions_.size() - 1; + } + + private: + void buildNextInputOrPartition(bool isFinished); + + // Holds input rows within the current partition. + std::vector inputRows_; + + // Used to compare rows based on partitionKeys. + char* previousRow_ = nullptr; + + // Current partition being output. Used to return the WidnowPartitions. + vector_size_t outputCurrentPartition_ = -1; + + // Current partition when adding input. Used to construct WindowPartitions. + vector_size_t inputCurrentPartition_ = 0; + + // Holds all the WindowPartitions. + std::vector> windowPartitions_; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowsStreamingWindowPartition.cpp b/velox/exec/RowsStreamingWindowPartition.cpp new file mode 100644 index 000000000000..878007d77fec --- /dev/null +++ b/velox/exec/RowsStreamingWindowPartition.cpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/RowsStreamingWindowPartition.h" + +namespace facebook::velox::exec { + +RowsStreamingWindowPartition::RowsStreamingWindowPartition( + RowContainer* data, + const folly::Range& rows, + const std::vector& inputMapping, + const std::vector>& sortKeyInfo) + : WindowPartition(data, rows, inputMapping, sortKeyInfo) { + partitionStartRows_.push_back(0); +} + +void RowsStreamingWindowPartition::addNewRows(std::vector rows) { + partitionStartRows_.push_back(partitionStartRows_.back() + rows.size()); + + sortedRows_.insert(sortedRows_.end(), rows.begin(), rows.end()); +} + +bool RowsStreamingWindowPartition::buildNextRows() { + if (currentPartition_ >= int(partitionStartRows_.size() - 2)) + return false; + + currentPartition_++; + + // Erase previous rows in current partition. + if (currentPartition_ > 0) { + auto numPreviousPartitionRows = partitionStartRows_[currentPartition_] - + partitionStartRows_[currentPartition_ - 1]; + data_->eraseRows( + folly::Range(sortedRows_.data(), numPreviousPartitionRows)); + sortedRows_.erase( + sortedRows_.begin(), sortedRows_.begin() + numPreviousPartitionRows); + } + + auto partitionSize = partitionStartRows_[currentPartition_ + 1] - + partitionStartRows_[currentPartition_]; + + partition_ = folly::Range(sortedRows_.data(), partitionSize); + return true; +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowsStreamingWindowPartition.h b/velox/exec/RowsStreamingWindowPartition.h new file mode 100644 index 000000000000..8d2abcaa53cb --- /dev/null +++ b/velox/exec/RowsStreamingWindowPartition.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/RowContainer.h" +#include "velox/exec/WindowPartition.h" + +namespace facebook::velox::exec { + +/// RowsStreamingWindowPartition is to facilitate RowsStreamingWindowBuild by +/// processing rows within WindowPartition in a streaming manner. +class RowsStreamingWindowPartition : public WindowPartition { + public: + RowsStreamingWindowPartition( + RowContainer* data, + const folly::Range& rows, + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo); + + // Returns the number of rows in the current partial window partition, + // including the offset within the full partition. + vector_size_t numRows() const override { + if (currentPartition_ == -1) { + return 0; + } else { + return partition_.size() + partitionStartRows_[currentPartition_]; + } + } + + // Returns the starting offset of the current partial window partition within + // the full partition. + vector_size_t offsetInPartition() const override { + return partitionStartRows_[currentPartition_]; + } + + // Indicates support for row-level streaming processing. + bool supportRowLevelStreaming() const override { + return true; + } + + // Sets the flag indicating that all input rows have been processed on the + // producer side. + void setInputRowsFinished() override { + inputRowsFinished_ = true; + } + + // Adds new rows to the partition using a streaming approach on the producer + // side. + void addNewRows(std::vector rows) override; + + // Builds the next set of available rows on the consumer side. + bool buildNextRows() override; + + // Determines if the current partition is complete and then proceed to the + // next partition. + bool processFinished() const override { + return ( + inputRowsFinished_ && + currentPartition_ == partitionStartRows_.size() - 2); + } + + private: + // Indicates whether all input rows have been added to sortedRows_ + bool inputRowsFinished_ = false; + + // Stores new rows added to the WindowPartition. + std::vector sortedRows_; + + // Indices of the start row (in sortedRows_) of each partitial partition. + std::vector partitionStartRows_; + + // Current partial partition being output. + vector_size_t currentPartition_ = -1; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index e53b0d762e92..c8a2fe0a2dc4 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -291,11 +291,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() { } } -std::unique_ptr SortWindowBuild::nextPartition() { +std::shared_ptr SortWindowBuild::nextPartition() { if (merge_ != nullptr) { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } @@ -313,7 +313,7 @@ std::unique_ptr SortWindowBuild::nextPartition() { auto partition = folly::Range( sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 645949ddb7e0..0caecfe6a5c3 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; private: void ensureInputFits(const RowVectorPtr& input); diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp index 791d6b886ca7..db1fb5846fbb 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/StreamingWindowBuild.cpp @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() { partitionStartRows_.push_back(sortedRows_.size()); } -std::unique_ptr StreamingWindowBuild::nextPartition() { +std::shared_ptr StreamingWindowBuild::nextPartition() { VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") @@ -89,7 +89,7 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h index 2573f1f7e8d5..1de94ad6a56e 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/StreamingWindowBuild.h @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { // No partitions are available or the currentPartition is the last available diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index b93b7bdf79b6..80993b2fca23 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/Window.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/SortWindowBuild.h" #include "velox/exec/StreamingWindowBuild.h" #include "velox/exec/Task.h" @@ -41,8 +42,13 @@ Window::Window( auto* spillConfig = spillConfig_.has_value() ? &spillConfig_.value() : nullptr; if (windowNode->inputsSorted()) { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + if (supportRowLevelStreaming()) { + windowBuild_ = std::make_unique( + windowNode_, pool(), spillConfig, &nonReclaimableSection_); + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } } else { windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); @@ -54,6 +60,7 @@ void Window::initialize() { VELOX_CHECK_NOT_NULL(windowNode_); createWindowFunctions(); createPeerAndFrameBuffers(); + windowBuild_->setNumRowsPerOutput(numRowsPerOutput_); windowNode_.reset(); } @@ -187,6 +194,30 @@ void Window::createWindowFunctions() { } } +// Support 'rank' and +// 'row_number' functions and the agg window function with default frame. +bool Window::supportRowLevelStreaming() { + for (const auto& windowNodeFunction : windowNode_->windowFunctions()) { + const auto& functionName = windowNodeFunction.functionCall->name(); + auto windowFunctionMetadata = + exec::getWindowFunctionMetadata(functionName).value(); + if (windowFunctionMetadata.processingUnit == ProcessingUnit::kPartition) { + return false; + } + + const auto& frame = windowNodeFunction.frame; + bool isDefaultFrame = + (frame.startType == core::WindowNode::BoundType::kUnboundedPreceding && + frame.endType == core::WindowNode::BoundType::kCurrentRow); + // Only support the agg window function with default frame. + if (!windowFunctionMetadata.ignoreFrame && !isDefaultFrame) { + return false; + } + } + + return true; +} + void Window::addInput(RowVectorPtr input) { windowBuild_->addInput(input); numRows_ += input->size(); @@ -580,7 +611,25 @@ vector_size_t Window::callApplyLoop( result); resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; - callResetPartition(); + if (currentPartition_->supportRowLevelStreaming()) { + if (currentPartition_->processFinished()) { + callResetPartition(); + if (currentPartition_ && + partitionOffset_ == currentPartition_->numRows()) { + if (!currentPartition_->buildNextRows()) { + break; + } + } + + } else { + // Break until the next getOutput call to handle the remaining data in + // currentPartition_. + break; + } + } else { + callResetPartition(); + } + if (!currentPartition_) { // The WindowBuild doesn't have any more partitions to process right // now. So break until the next getOutput call. @@ -622,6 +671,13 @@ RowVectorPtr Window::getOutput() { } } + if (currentPartition_->supportRowLevelStreaming() && + partitionOffset_ == currentPartition_->numRows()) { + if (!currentPartition_->buildNextRows()) { + return nullptr; + } + } + auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = BaseVector::create( outputType_, numOutputRows, operatorCtx_->pool()); diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 9be9a011baae..83e5e7941a43 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -88,6 +88,8 @@ class Window : public Operator { const std::optional end; }; + bool supportRowLevelStreaming(); + // Creates WindowFunction and frame objects for this operator. void createWindowFunctions(); @@ -165,7 +167,7 @@ class Window : public Operator { // Used to access window partition rows and columns by the window // operator and functions. This structure is owned by the WindowBuild. - std::unique_ptr currentPartition_; + std::shared_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 9a1c0a6bfd7a..b247e67074d4 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -68,7 +68,7 @@ class WindowBuild { // the underlying columns of Window partition data. // Check hasNextPartition() before invoking this function. This function fails // if called when no partition is available. - virtual std::unique_ptr nextPartition() = 0; + virtual std::shared_ptr nextPartition() = 0; // Returns the average size of input rows in bytes stored in the // data container of the WindowBuild. @@ -76,6 +76,10 @@ class WindowBuild { return data_->estimateRowSize(); } + void setNumRowsPerOutput(vector_size_t numRowsPerOutput) { + numRowsPerOutput_ = numRowsPerOutput; + } + protected: bool compareRowsWithKeys( const char* lhs, @@ -113,6 +117,9 @@ class WindowBuild { // Number of input rows. vector_size_t numRows_ = 0; + + // Number of rows that be fit into an output block. + vector_size_t numRowsPerOutput_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/WindowFunction.cpp b/velox/exec/WindowFunction.cpp index b093024a1dbe..07a10c8549a8 100644 --- a/velox/exec/WindowFunction.cpp +++ b/velox/exec/WindowFunction.cpp @@ -41,13 +41,23 @@ std::optional getWindowFunctionEntry( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory) { + WindowFunctionFactory factory, + StreamingProcessMetadata metadata) { auto sanitizedName = sanitizeName(name); windowFunctions()[sanitizedName] = { - std::move(signatures), std::move(factory)}; + std::move(signatures), std::move(factory), metadata}; return true; } +std::optional getWindowFunctionMetadata( + const std::string& name) { + auto sanitizedName = sanitizeName(name); + if (auto func = getWindowFunctionEntry(sanitizedName)) { + return func.value()->metadata; + } + return std::nullopt; +} + std::optional> getWindowFunctionSignatures( const std::string& name) { auto sanitizedName = sanitizeName(name); diff --git a/velox/exec/WindowFunction.h b/velox/exec/WindowFunction.h index ee0ef26869c1..08eb2d7976d9 100644 --- a/velox/exec/WindowFunction.h +++ b/velox/exec/WindowFunction.h @@ -31,6 +31,22 @@ struct WindowFunctionArg { std::optional index; }; +/// The processing unit for calculating the window function in a streaming +/// manner. kRows indicates that the calculation begins as soon as rows are +/// available within a single partition, without waiting for all data in the +/// partition to be ready. kPartition indicates that the calculation begins only +/// when all rows in a partition are ready. +enum class ProcessingUnit { + kPartition, + kRows, +}; + +/// Store the metadata for WindowFunction. +struct StreamingProcessMetadata { + ProcessingUnit processingUnit; + bool ignoreFrame; +}; + class WindowFunction { public: explicit WindowFunction( @@ -149,7 +165,8 @@ using WindowFunctionFactory = std::function( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory); + WindowFunctionFactory factory, + StreamingProcessMetadata metadata = {ProcessingUnit::kPartition, false}); /// Returns signatures of the window function with the specified name. /// Returns empty std::optional if function with that name is not found. @@ -159,8 +176,12 @@ std::optional> getWindowFunctionSignatures( struct WindowFunctionEntry { std::vector signatures; WindowFunctionFactory factory; + StreamingProcessMetadata metadata; }; +std::optional getWindowFunctionMetadata( + const std::string& name); + using WindowFunctionMap = std::unordered_map; /// Returns a map of all window function names to their registrations. diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 51729a308207..f3d4314a3b4b 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -51,7 +51,7 @@ void WindowPartition::extractColumn( vector_size_t resultOffset, const VectorPtr& result) const { RowContainer::extractColumn( - partition_.data() + partitionOffset, + partition_.data() + partitionOffset - offsetInPartition(), numRows, columns_[columnIndex], resultOffset, @@ -162,7 +162,9 @@ std::pair WindowPartition::computePeerBuffers( peerStart = i; peerEnd = i; while (peerEnd <= lastPartitionRow) { - if (peerCompare(partition_[peerStart], partition_[peerEnd])) { + if (peerCompare( + partition_[peerStart - offsetInPartition()], + partition_[peerEnd - offsetInPartition()])) { break; } peerEnd++; diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7073af3a4238..df28b1480487 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -23,6 +23,7 @@ /// TODO: This implementation will be revised for Spill to disk semantics. namespace facebook::velox::exec { + class WindowPartition { public: /// The WindowPartition is used by the Window operator and WindowFunction @@ -42,11 +43,37 @@ class WindowPartition { const std::vector>& sortKeyInfo); + virtual ~WindowPartition() = default; + /// Returns the number of rows in the current WindowPartition. - vector_size_t numRows() const { + virtual vector_size_t numRows() const { return partition_.size(); } + virtual vector_size_t offsetInPartition() const { + return 0; + } + + virtual bool supportRowLevelStreaming() const { + return false; + }; + + virtual void setInputRowsFinished() { + return; + } + + virtual void addNewRows(std::vector rows) { + return; + } + + virtual bool buildNextRows() { + return false; + } + + virtual bool processFinished() const { + return true; + } + /// Copies the values at 'columnIndex' into 'result' (starting at /// 'resultOffset') for the rows at positions in the 'rowNumbers' /// array from the partition input data. @@ -162,6 +189,7 @@ class WindowPartition { const vector_size_t* rawPeerBounds, vector_size_t* rawFrameBounds) const; + protected: // The RowContainer associated with the partition. // It is owned by the WindowBuild that creates the partition. RowContainer* data_; diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index febdcd743d30..9843942ee554 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -79,6 +79,74 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, rankLikeWithEqualValue) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "sum(c1) over (order by c1 rows unbounded preceding)"}; + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults( + "SELECT *, sum(c1) over (order by c1 rows unbounded preceding) FROM tmp"); +} + +TEST_F(WindowTest, rankLikeOptimization) { + const vector_size_t size = 1'0; + auto data = makeRowVector( + {"d", "p", "s"}, + { + // Payload. + makeFlatVector(size, [](auto row) { return row; }), + // Partition key. + makeFlatVector(size, [](auto row) { return row % 2; }), + // Sorting key. + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (partition by p order by s)", + "row_number() over (partition by p order by s)", + "sum(d) over (partition by p order by s)"}; + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"p", "s"}, false) + .streamingWindow(kClauses) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults( + "SELECT *, rank() over (partition by p order by s), row_number() over (partition by p order by s), sum(d) over (partition by p order by s) FROM tmp"); +} + TEST_F(WindowTest, missingFunctionSignature) { auto input = {makeRowVector({ makeFlatVector({1, 2, 3}), diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index 646557a2e30c..bcbcd24c59eb 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -97,9 +97,7 @@ void registerRankInternal( exec::FunctionSignatureBuilder().returnType(returnType).build(), }; - exec::registerWindowFunction( - name, - std::move(signatures), + auto windowFunctionFactory = [name]( const std::vector& /*args*/, const TypePtr& resultType, @@ -107,9 +105,20 @@ void registerRankInternal( velox::memory::MemoryPool* /*pool*/, HashStringAllocator* /*stringAllocator*/, const core::QueryConfig& /*queryConfig*/) - -> std::unique_ptr { - return std::make_unique>(resultType); - }); + -> std::unique_ptr { + return std::make_unique>(resultType); + }; + + if constexpr (TRank == RankType::kRank) { + exec::registerWindowFunction( + name, + std::move(signatures), + std::move(windowFunctionFactory), + {exec::ProcessingUnit::kRows, true}); + } else { + exec::registerWindowFunction( + name, std::move(signatures), std::move(windowFunctionFactory)); + } } void registerRankBigint(const std::string& name) { diff --git a/velox/functions/lib/window/RowNumber.cpp b/velox/functions/lib/window/RowNumber.cpp index 16b7feb0a543..86e94f74b05a 100644 --- a/velox/functions/lib/window/RowNumber.cpp +++ b/velox/functions/lib/window/RowNumber.cpp @@ -84,7 +84,8 @@ void registerRowNumber(const std::string& name, TypeKind resultTypeKind) { const core::QueryConfig& /*queryConfig*/) -> std::unique_ptr { return std::make_unique(resultType); - }); + }, + {exec::ProcessingUnit::kRows, true}); } void registerRowNumberInteger(const std::string& name) {