Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Mar 27, 2024
1 parent 41d46d8 commit a100bdb
Show file tree
Hide file tree
Showing 18 changed files with 434 additions and 31 deletions.
3 changes: 2 additions & 1 deletion velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ void registerAggregateWindowFunction(const std::string& name) {
pool,
stringAllocator,
config);
});
},
exec::ProcessingUnit::kRow);
}
}
} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ add_library(
PlanNodeStats.cpp
PrefixSort.cpp
ProbeOperatorState.cpp
RowLevelStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand Down
99 changes: 99 additions & 0 deletions velox/exec/RowLevelStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RowLevelStreamingWindowBuild.h"

namespace facebook::velox::exec {

RowLevelStreamingWindowBuild::RowLevelStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void RowLevelStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
sortedRows_.push_back(inputRows_);
if (windowPartitions_.size() <= inputCurrentPartition_) {
auto partition =
folly::Range(sortedRows_.back().data(), sortedRows_.back().size());

windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(),
partition,
inputColumns_,
sortKeyInfo_,
ProcessingUnit::kRow));
}

windowPartitions_[inputCurrentPartition_]->insertNewBatch(sortedRows_.back());

if (isFinished) {
windowPartitions_[inputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);

inputCurrentPartition_++;
currentPartitionNum_ = 1;
}

inputRows_.clear();
}

void RowLevelStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
currentPartitionNum_++;
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
buildNextInputOrPartition(true);
}

// Wait for the peers to be ready in single partition; these peers are the
// rows that have identical values in the ORDER BY clause.
if (previousRow_ != nullptr && inputRows_.size() > 0 &&
compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
buildNextInputOrPartition(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
}

void RowLevelStreamingWindowBuild::noMoreInput() {
isFinished_ = true;
buildNextInputOrPartition(true);
}

std::shared_ptr<WindowPartition> RowLevelStreamingWindowBuild::nextPartition() {
return windowPartitions_[++outputCurrentPartition_];
}

bool RowLevelStreamingWindowBuild::hasNextPartition() {
return windowPartitions_.size() > 0 &&
outputCurrentPartition_ <= int(windowPartitions_.size() - 1);
}

} // namespace facebook::velox::exec
88 changes: 88 additions & 0 deletions velox/exec/RowLevelStreamingWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// Unlike StreamingWindowBuild, RowLevelStreamingWindowBuild is capable of
/// processing window functions as rows arrive within a single partition,
/// without the need to wait for the entire partition to be ready. This approach
/// can significantly reduce memory usage, especially when a single partition
/// contains a large amount of data. It is particularly suited for optimizing
/// rank and row_number functions, as well as aggregate window functions with a
/// default frame.
class RowLevelStreamingWindowBuild : public WindowBuild {
public:
RowLevelStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.size() == 0 ||
outputCurrentPartition_ == windowPartitions_.size() - 1;
}

private:
void buildNextInputOrPartition(bool isFinished);

/// Vector of pointers to each input row in the data_ RowContainer.
/// Rows are erased from data_ when they are processed in WindowPartition.
std::vector<std::vector<char*>> sortedRows_;

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to return the WidnowPartitions.
vector_size_t outputCurrentPartition_ = -1;

bool isFinished_ = false;

// Current partition when adding input. Used to construct WindowPartitions.
vector_size_t inputCurrentPartition_ = 0;

std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;

// Records the total rows number in each partition.
vector_size_t currentPartitionNum_ = 0;
};

} // namespace facebook::velox::exec
4 changes: 2 additions & 2 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
Expand All @@ -313,7 +313,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/StreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() {
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -89,7 +89,7 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/StreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
Expand Down
59 changes: 55 additions & 4 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/exec/Window.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/RowLevelStreamingWindowBuild.h"
#include "velox/exec/SortWindowBuild.h"
#include "velox/exec/StreamingWindowBuild.h"
#include "velox/exec/Task.h"
Expand All @@ -41,8 +42,13 @@ Window::Window(
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
if (supportRowLevelStreaming()) {
windowBuild_ = std::make_unique<RowLevelStreamingWindowBuild>(
windowNode_, pool(), spillConfig, &nonReclaimableSection_);
} else {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_);
Expand Down Expand Up @@ -187,6 +193,31 @@ void Window::createWindowFunctions() {
}
}

// The supportRowLevelStreaming is designed to support 'rank' and
// 'row_number' functions and the agg window function with default frame.
bool Window::supportRowLevelStreaming() {
for (const auto& windowNodeFunction : windowNode_->windowFunctions()) {
const auto& functionName = windowNodeFunction.functionCall->name();
auto windowFunctionEntry =
exec::getWindowFunctionEntry(functionName).value();
if (windowFunctionEntry->processingUnit == ProcessingUnit::kPartition) {
return false;
}

const auto& frame = windowNodeFunction.frame;
bool isDefaultFrame =
(frame.startType == core::WindowNode::BoundType::kUnboundedPreceding &&
frame.endType == core::WindowNode::BoundType::kCurrentRow);
// Only support the agg window function with default frame.
if (!(functionName == "rank" || functionName == "row_number") &&
!isDefaultFrame) {
return false;
}
}

return true;
}

void Window::addInput(RowVectorPtr input) {
windowBuild_->addInput(input);
numRows_ += input->size();
Expand Down Expand Up @@ -474,7 +505,8 @@ void Window::computePeerAndFrameBuffers(
// Ranking functions do not care about frames. So the function decides
// further what to do with empty frames.
computeValidFrames(
currentPartition_->numRows() - 1,
currentPartition_->numRows() -
currentPartition_->offsetInPartition() - 1,
numRows,
rawFrameStarts[i],
rawFrameEnds[i],
Expand Down Expand Up @@ -543,7 +575,18 @@ vector_size_t Window::callApplyLoop(
result);
resultIndex += rowsForCurrentPartition;
numOutputRowsLeft -= rowsForCurrentPartition;
callResetPartition();
if (currentPartition_->supportRowLevelStreaming()) {
if (currentPartition_->isFinished()) {
callResetPartition();
} else {
// Break until the next getOutput call to handle the remaining data in
// currentPartition_.
break;
}
} else {
callResetPartition();
}

if (!currentPartition_) {
// The WindowBuild doesn't have any more partitions to process right
// now. So break until the next getOutput call.
Expand Down Expand Up @@ -585,6 +628,14 @@ RowVectorPtr Window::getOutput() {
}
}

// BuildNextBatch until all the rows in currentPartition finished.
if (currentPartition_->supportRowLevelStreaming() &&
partitionOffset_ == currentPartition_->numRows()) {
if (!currentPartition_->buildNextBatch()) {
return nullptr;
}
}

auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft);
auto result = BaseVector::create<RowVector>(
outputType_, numOutputRows, operatorCtx_->pool());
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/Window.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class Window : public Operator {
const std::optional<FrameChannelArg> end;
};

bool supportRowLevelStreaming();

// Creates WindowFunction and frame objects for this operator.
void createWindowFunctions();

Expand Down Expand Up @@ -165,7 +167,7 @@ class Window : public Operator {

// Used to access window partition rows and columns by the window
// operator and functions. This structure is owned by the WindowBuild.
std::unique_ptr<WindowPartition> currentPartition_;
std::shared_ptr<WindowPartition> currentPartition_;

// HashStringAllocator required by functions that allocate out of line
// buffers.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/WindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class WindowBuild {
// the underlying columns of Window partition data.
// Check hasNextPartition() before invoking this function. This function fails
// if called when no partition is available.
virtual std::unique_ptr<WindowPartition> nextPartition() = 0;
virtual std::shared_ptr<WindowPartition> nextPartition() = 0;

// Returns the average size of input rows in bytes stored in the
// data container of the WindowBuild.
Expand Down
Loading

0 comments on commit a100bdb

Please sign in to comment.