diff --git a/velox/exec/AggregateWindow.cpp b/velox/exec/AggregateWindow.cpp index b491ba74ec785..3566432c4993d 100644 --- a/velox/exec/AggregateWindow.cpp +++ b/velox/exec/AggregateWindow.cpp @@ -424,7 +424,8 @@ void registerAggregateWindowFunction(const std::string& name) { pool, stringAllocator, config); - }); + }, + exec::ProcessingUnit::kRow); } } } // namespace facebook::velox::exec diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index dc89f71dbdc5f..6132c4e61d61a 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -62,6 +62,7 @@ add_library( PlanNodeStats.cpp PrefixSort.cpp ProbeOperatorState.cpp + RowLevelStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp SortBuffer.cpp diff --git a/velox/exec/RowLevelStreamingWindowBuild.cpp b/velox/exec/RowLevelStreamingWindowBuild.cpp new file mode 100644 index 0000000000000..92f81e7d6b7ae --- /dev/null +++ b/velox/exec/RowLevelStreamingWindowBuild.cpp @@ -0,0 +1,99 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/RowLevelStreamingWindowBuild.h" + +namespace facebook::velox::exec { + +RowLevelStreamingWindowBuild::RowLevelStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection) + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + +void RowLevelStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) { + sortedRows_.push_back(inputRows_); + if (windowPartitions_.size() <= inputCurrentPartition_) { + auto partition = + folly::Range(sortedRows_.back().data(), sortedRows_.back().size()); + + windowPartitions_.push_back(std::make_shared( + data_.get(), + partition, + inputColumns_, + sortKeyInfo_, + ProcessingUnit::kRow)); + } + + windowPartitions_[inputCurrentPartition_]->insertNewBatch(sortedRows_.back()); + + if (isFinished) { + windowPartitions_[inputCurrentPartition_]->setTotalNum( + currentPartitionNum_ - 1); + + inputCurrentPartition_++; + currentPartitionNum_ = 1; + } + + inputRows_.clear(); +} + +void RowLevelStreamingWindowBuild::addInput(RowVectorPtr input) { + for (auto i = 0; i < inputChannels_.size(); ++i) { + decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); + } + + for (auto row = 0; row < input->size(); ++row) { + currentPartitionNum_++; + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + + if (previousRow_ != nullptr && + compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + buildNextInputOrPartition(true); + } + + // Wait for the peers to be ready in single partition; these peers are the + // rows that have identical values in the ORDER BY clause. + if (previousRow_ != nullptr && inputRows_.size() > 0 && + compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) { + buildNextInputOrPartition(false); + } + + inputRows_.push_back(newRow); + previousRow_ = newRow; + } +} + +void RowLevelStreamingWindowBuild::noMoreInput() { + isFinished_ = true; + buildNextInputOrPartition(true); +} + +std::shared_ptr RowLevelStreamingWindowBuild::nextPartition() { + return windowPartitions_[++outputCurrentPartition_]; +} + +bool RowLevelStreamingWindowBuild::hasNextPartition() { + return windowPartitions_.size() > 0 && + outputCurrentPartition_ <= int(windowPartitions_.size() - 1); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowLevelStreamingWindowBuild.h b/velox/exec/RowLevelStreamingWindowBuild.h new file mode 100644 index 0000000000000..b3c73481d7058 --- /dev/null +++ b/velox/exec/RowLevelStreamingWindowBuild.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +/// Unlike StreamingWindowBuild, RowLevelStreamingWindowBuild is capable of +/// processing window functions as rows arrive within a single partition, +/// without the need to wait for the entire partition to be ready. This approach +/// can significantly reduce memory usage, especially when a single partition +/// contains a large amount of data. It is particularly suited for optimizing +/// rank and row_number functions, as well as aggregate window functions with a +/// default frame. +class RowLevelStreamingWindowBuild : public WindowBuild { + public: + RowLevelStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection); + + void addInput(RowVectorPtr input) override; + + void spill() override { + VELOX_UNREACHABLE(); + } + + std::optional spilledStats() const override { + return std::nullopt; + } + + void noMoreInput() override; + + bool hasNextPartition() override; + + std::shared_ptr nextPartition() override; + + bool needsInput() override { + // No partitions are available or the currentPartition is the last available + // one, so can consume input rows. + return windowPartitions_.size() == 0 || + outputCurrentPartition_ == windowPartitions_.size() - 1; + } + + private: + void buildNextInputOrPartition(bool isFinished); + + /// Vector of pointers to each input row in the data_ RowContainer. + /// Rows are erased from data_ when they are processed in WindowPartition. + std::vector> sortedRows_; + + // Holds input rows within the current partition. + std::vector inputRows_; + + // Used to compare rows based on partitionKeys. + char* previousRow_ = nullptr; + + // Current partition being output. Used to return the WidnowPartitions. + vector_size_t outputCurrentPartition_ = -1; + + bool isFinished_ = false; + + // Current partition when adding input. Used to construct WindowPartitions. + vector_size_t inputCurrentPartition_ = 0; + + std::vector> windowPartitions_; + + // Records the total rows number in each partition. + vector_size_t currentPartitionNum_ = 0; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 86f3ee458edba..a242a0c433936 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -291,7 +291,7 @@ void SortWindowBuild::loadNextPartitionFromSpill() { } } -std::unique_ptr SortWindowBuild::nextPartition() { +std::shared_ptr SortWindowBuild::nextPartition() { if (merge_ != nullptr) { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); @@ -313,7 +313,7 @@ std::unique_ptr SortWindowBuild::nextPartition() { auto partition = folly::Range( sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inputColumns_, sortKeyInfo_); } diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 645949ddb7e02..0caecfe6a5c37 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; private: void ensureInputFits(const RowVectorPtr& input); diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp index cab3e17030189..99073ea762fa0 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/StreamingWindowBuild.cpp @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() { partitionStartRows_.push_back(sortedRows_.size()); } -std::unique_ptr StreamingWindowBuild::nextPartition() { +std::shared_ptr StreamingWindowBuild::nextPartition() { VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") @@ -89,7 +89,7 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inputColumns_, sortKeyInfo_); } diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h index 2573f1f7e8d56..1de94ad6a56eb 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/StreamingWindowBuild.h @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { // No partitions are available or the currentPartition is the last available diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 6e09973edf605..625b817c49a06 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/Window.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/RowLevelStreamingWindowBuild.h" #include "velox/exec/SortWindowBuild.h" #include "velox/exec/StreamingWindowBuild.h" #include "velox/exec/Task.h" @@ -41,8 +42,13 @@ Window::Window( auto* spillConfig = spillConfig_.has_value() ? &spillConfig_.value() : nullptr; if (windowNode->inputsSorted()) { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + if (supportRowLevelStreaming()) { + windowBuild_ = std::make_unique( + windowNode_, pool(), spillConfig, &nonReclaimableSection_); + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } } else { windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); @@ -187,6 +193,31 @@ void Window::createWindowFunctions() { } } +// The supportRowLevelStreaming is designed to support 'rank' and +// 'row_number' functions and the agg window function with default frame. +bool Window::supportRowLevelStreaming() { + for (const auto& windowNodeFunction : windowNode_->windowFunctions()) { + const auto& functionName = windowNodeFunction.functionCall->name(); + auto windowFunctionEntry = + exec::getWindowFunctionEntry(functionName).value(); + if (windowFunctionEntry->processingUnit == ProcessingUnit::kPartition) { + return false; + } + + const auto& frame = windowNodeFunction.frame; + bool isDefaultFrame = + (frame.startType == core::WindowNode::BoundType::kUnboundedPreceding && + frame.endType == core::WindowNode::BoundType::kCurrentRow); + // Only support the agg window function with default frame. + if (!(functionName == "rank" || functionName == "row_number") && + !isDefaultFrame) { + return false; + } + } + + return true; +} + void Window::addInput(RowVectorPtr input) { windowBuild_->addInput(input); numRows_ += input->size(); @@ -474,7 +505,8 @@ void Window::computePeerAndFrameBuffers( // Ranking functions do not care about frames. So the function decides // further what to do with empty frames. computeValidFrames( - currentPartition_->numRows() - 1, + currentPartition_->numRows() - + currentPartition_->offsetInPartition() - 1, numRows, rawFrameStarts[i], rawFrameEnds[i], @@ -543,7 +575,18 @@ vector_size_t Window::callApplyLoop( result); resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; - callResetPartition(); + if (currentPartition_->supportRowLevelStreaming()) { + if (currentPartition_->isFinished()) { + callResetPartition(); + } else { + // Break until the next getOutput call to handle the remaining data in + // currentPartition_. + break; + } + } else { + callResetPartition(); + } + if (!currentPartition_) { // The WindowBuild doesn't have any more partitions to process right // now. So break until the next getOutput call. @@ -585,6 +628,14 @@ RowVectorPtr Window::getOutput() { } } + // BuildNextBatch until all the rows in currentPartition finished. + if (currentPartition_->supportRowLevelStreaming() && + partitionOffset_ == currentPartition_->numRows()) { + if (!currentPartition_->buildNextBatch()) { + return nullptr; + } + } + auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = BaseVector::create( outputType_, numOutputRows, operatorCtx_->pool()); diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 9be9a011baae6..83e5e7941a432 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -88,6 +88,8 @@ class Window : public Operator { const std::optional end; }; + bool supportRowLevelStreaming(); + // Creates WindowFunction and frame objects for this operator. void createWindowFunctions(); @@ -165,7 +167,7 @@ class Window : public Operator { // Used to access window partition rows and columns by the window // operator and functions. This structure is owned by the WindowBuild. - std::unique_ptr currentPartition_; + std::shared_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 1ac8d3630ea61..11832c4cf5929 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -68,7 +68,7 @@ class WindowBuild { // the underlying columns of Window partition data. // Check hasNextPartition() before invoking this function. This function fails // if called when no partition is available. - virtual std::unique_ptr nextPartition() = 0; + virtual std::shared_ptr nextPartition() = 0; // Returns the average size of input rows in bytes stored in the // data container of the WindowBuild. diff --git a/velox/exec/WindowFunction.cpp b/velox/exec/WindowFunction.cpp index b093024a1dbec..0f909fd55641a 100644 --- a/velox/exec/WindowFunction.cpp +++ b/velox/exec/WindowFunction.cpp @@ -25,7 +25,6 @@ WindowFunctionMap& windowFunctions() { return functions; } -namespace { std::optional getWindowFunctionEntry( const std::string& name) { auto& functionsMap = windowFunctions(); @@ -36,15 +35,15 @@ std::optional getWindowFunctionEntry( return std::nullopt; } -} // namespace bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory) { + WindowFunctionFactory factory, + ProcessingUnit processingUnit) { auto sanitizedName = sanitizeName(name); windowFunctions()[sanitizedName] = { - std::move(signatures), std::move(factory)}; + std::move(signatures), std::move(factory), processingUnit}; return true; } diff --git a/velox/exec/WindowFunction.h b/velox/exec/WindowFunction.h index ee0ef26869c1b..045894f2d5c69 100644 --- a/velox/exec/WindowFunction.h +++ b/velox/exec/WindowFunction.h @@ -149,7 +149,8 @@ using WindowFunctionFactory = std::function( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory); + WindowFunctionFactory factory, + ProcessingUnit processingUnit = ProcessingUnit::kPartition); /// Returns signatures of the window function with the specified name. /// Returns empty std::optional if function with that name is not found. @@ -159,8 +160,12 @@ std::optional> getWindowFunctionSignatures( struct WindowFunctionEntry { std::vector signatures; WindowFunctionFactory factory; + ProcessingUnit processingUnit; }; +std::optional getWindowFunctionEntry( + const std::string& name); + using WindowFunctionMap = std::unordered_map; /// Returns a map of all window function names to their registrations. diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 7e8f8747f0fc7..022e3bd119b0c 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -21,11 +21,35 @@ WindowPartition::WindowPartition( RowContainer* data, const folly::Range& rows, const std::vector& columns, - const std::vector>& sortKeyInfo) + const std::vector>& sortKeyInfo, + ProcessingUnit processingUnit) : data_(data), partition_(rows), columns_(columns), - sortKeyInfo_(sortKeyInfo) {} + sortKeyInfo_(sortKeyInfo) { + if (processingUnit == ProcessingUnit::kRow) { + processingUnit_ = processingUnit; + processedNum_ = partition_.size(); + } +} + +bool WindowPartition::buildNextBatch() { + if (rows_.size() == 0 || (currentBatchIndex_ == (rows_.size() - 1))) + return false; + + offsetInPartition_ += partition_.size(); + // Erase partition_ in data_ and itself. + data_->eraseRows(folly::Range( + rows_[currentBatchIndex_].data(), rows_[currentBatchIndex_].size())); + rows_[currentBatchIndex_].clear(); + + currentBatchIndex_++; + // Set new partition_. + partition_ = folly::Range( + rows_[currentBatchIndex_].data(), rows_[currentBatchIndex_].size()); + processedNum_ += partition_.size(); + return true; +} void WindowPartition::extractColumn( int32_t columnIndex, @@ -47,7 +71,7 @@ void WindowPartition::extractColumn( vector_size_t resultOffset, const VectorPtr& result) const { RowContainer::extractColumn( - partition_.data() + partitionOffset, + partition_.data() + partitionOffset - offsetInPartition_, numRows, columns_[columnIndex], resultOffset, @@ -158,7 +182,9 @@ std::pair WindowPartition::computePeerBuffers( peerStart = i; peerEnd = i; while (peerEnd <= lastPartitionRow) { - if (peerCompare(partition_[peerStart], partition_[peerEnd])) { + if (peerCompare( + partition_[peerStart - offsetInPartition_], + partition_[peerEnd - offsetInPartition_])) { break; } peerEnd++; diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7ce97559904c5..b2e0ebddc4de6 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -23,6 +23,17 @@ /// TODO: This implementation will be revised for Spill to disk semantics. namespace facebook::velox::exec { + +/// The processing unit for calculating the window function in a streaming +/// manner. kRow indicates that the calculation begins as soon as rows are +/// available within a single partition, without waiting for all data in the +/// partition to be ready. kPartition indicates that the calculation begins only +/// when all rows in a partition are ready. +enum class ProcessingUnit { + kPartition, + kRow, +}; + class WindowPartition { public: /// The WindowPartition is used by the Window operator and WindowFunction @@ -33,16 +44,41 @@ class WindowPartition { /// 'columns' : Input rows of 'data' used for accessing column data from it. /// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to /// get peer rows from the input partition. + /// 'processingUnit' : The processing unit: kPartition or kRow. WindowPartition( RowContainer* data, const folly::Range& rows, const std::vector& columns, const std::vector>& - sortKeyInfo); + sortKeyInfo, + ProcessingUnit processingUnit = ProcessingUnit::kPartition); /// Returns the number of rows in the current WindowPartition. vector_size_t numRows() const { - return partition_.size(); + return partition_.size() + offsetInPartition_; + } + + vector_size_t offsetInPartition() const { + return offsetInPartition_; + } + + bool supportRowLevelStreaming() { + return processingUnit_ == ProcessingUnit::kRow; + } + + void insertNewBatch(const std::vector& inputRows) { + rows_.push_back(inputRows); + } + + void setTotalNum(const vector_size_t& num) { + totalNum_ = num; + } + + bool buildNextBatch(); + + bool isFinished() { + return (processingUnit_ == ProcessingUnit::kPartition) || + (totalNum_ == processedNum_); } /// Copies the values at 'columnIndex' into 'result' (starting at @@ -181,5 +217,22 @@ class WindowPartition { // ORDER BY column info for this partition. const std::vector> sortKeyInfo_; + + // The offset of every batch in partition. + vector_size_t offsetInPartition_ = 0; + + // The processed num in current partition. + vector_size_t processedNum_ = 0; + + ProcessingUnit processingUnit_ = ProcessingUnit::kPartition; + + // Add new batch in WindowPartition. + std::vector> rows_; + + // The batch index in WindowPartition. + vector_size_t currentBatchIndex_ = 0; + + // The total num in WindowPartition. + vector_size_t totalNum_ = 0; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index cfc45ee5afd65..41169a3a04549 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -79,6 +79,74 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, rankLikeWithEqualValue) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "sum(c1) over (order by c1 rows unbounded preceding)"}; + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults( + "SELECT *, sum(c1) over (order by c1 rows unbounded preceding) FROM tmp"); +} + +TEST_F(WindowTest, rankLikeOptimization) { + const vector_size_t size = 1'0; + auto data = makeRowVector( + {"d", "p", "s"}, + { + // Payload. + makeFlatVector(size, [](auto row) { return row; }), + // Partition key. + makeFlatVector(size, [](auto row) { return row % 2; }), + // Sorting key. + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (partition by p order by s)", + "row_number() over (partition by p order by s)", + "sum(d) over (partition by p order by s)"}; + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"p", "s"}, false) + .streamingWindow(kClauses) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults( + "SELECT *, rank() over (partition by p order by s), row_number() over (partition by p order by s), sum(d) over (partition by p order by s) FROM tmp"); +} + TEST_F(WindowTest, missingFunctionSignature) { auto input = {makeRowVector({ makeFlatVector({1, 2, 3}), diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index 646557a2e30ce..953707c99cd34 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -97,9 +97,7 @@ void registerRankInternal( exec::FunctionSignatureBuilder().returnType(returnType).build(), }; - exec::registerWindowFunction( - name, - std::move(signatures), + auto windowFunctionFactory = [name]( const std::vector& /*args*/, const TypePtr& resultType, @@ -107,9 +105,20 @@ void registerRankInternal( velox::memory::MemoryPool* /*pool*/, HashStringAllocator* /*stringAllocator*/, const core::QueryConfig& /*queryConfig*/) - -> std::unique_ptr { - return std::make_unique>(resultType); - }); + -> std::unique_ptr { + return std::make_unique>(resultType); + }; + + if constexpr (TRank == RankType::kRank) { + exec::registerWindowFunction( + name, + std::move(signatures), + std::move(windowFunctionFactory), + exec::ProcessingUnit::kRow); + } else { + exec::registerWindowFunction( + name, std::move(signatures), std::move(windowFunctionFactory)); + } } void registerRankBigint(const std::string& name) { diff --git a/velox/functions/lib/window/RowNumber.cpp b/velox/functions/lib/window/RowNumber.cpp index 16b7feb0a5431..5d0ad0a8deef3 100644 --- a/velox/functions/lib/window/RowNumber.cpp +++ b/velox/functions/lib/window/RowNumber.cpp @@ -84,7 +84,8 @@ void registerRowNumber(const std::string& name, TypeKind resultTypeKind) { const core::QueryConfig& /*queryConfig*/) -> std::unique_ptr { return std::make_unique(resultType); - }); + }, + exec::ProcessingUnit::kRow); } void registerRowNumberInteger(const std::string& name) {