Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamingWindow support #404

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,12 +1160,14 @@ WindowNode::WindowNode(
std::vector<SortOrder> sortingOrders,
std::vector<std::string> windowColumnNames,
std::vector<Function> windowFunctions,
bool inputsSorted,
PlanNodePtr source)
: PlanNode(std::move(id)),
partitionKeys_(std::move(partitionKeys)),
sortingKeys_(std::move(sortingKeys)),
sortingOrders_(std::move(sortingOrders)),
windowFunctions_(std::move(windowFunctions)),
inputsSorted_(inputsSorted),
sources_{std::move(source)},
outputType_(getWindowOutputType(
sources_[0]->outputType(),
Expand Down Expand Up @@ -1201,6 +1203,8 @@ void WindowNode::addDetails(std::stringstream& stream) const {
stream << outputType_->names()[i] << " := ";
addWindowFunction(stream, windowFunctions_[i - numInputCols]);
}

stream << " inputsSorted [" << inputsSorted_ << "]";
}

namespace {
Expand Down Expand Up @@ -1317,6 +1321,7 @@ folly::dynamic WindowNode::serialize() const {
windowNames.push_back(outputType_->nameOf(i));
}
obj["names"] = ISerializable::serialize(windowNames);
obj["inputsSorted"] = inputsSorted_;

return obj;
}
Expand All @@ -1336,13 +1341,16 @@ PlanNodePtr WindowNode::create(const folly::dynamic& obj, void* context) {

auto windowNames = deserializeStrings(obj["names"]);

auto inputsSorted = obj["inputsSorted"].asBool();

return std::make_shared<WindowNode>(
deserializePlanNodeId(obj),
partitionKeys,
sortingKeys,
sortingOrders,
windowNames,
functions,
inputsSorted,
source);
}

Expand Down
7 changes: 7 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,7 @@ class WindowNode : public PlanNode {
std::vector<SortOrder> sortingOrders,
std::vector<std::string> windowColumnNames,
std::vector<Function> windowFunctions,
bool inputsSorted,
PlanNodePtr source);

const std::vector<PlanNodePtr>& sources() const override {
Expand Down Expand Up @@ -2030,6 +2031,10 @@ class WindowNode : public PlanNode {
return windowFunctions_;
}

bool inputsSorted() const {
return inputsSorted_;
}

std::string_view name() const override {
return "Window";
}
Expand All @@ -2048,6 +2053,8 @@ class WindowNode : public PlanNode {

const std::vector<Function> windowFunctions_;

const bool inputsSorted_;

const std::vector<PlanNodePtr> sources_;

const RowTypePtr outputType_;
Expand Down
4 changes: 3 additions & 1 deletion velox/docs/develop/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ LocalMergeNode LocalMerge
LocalPartitionNode LocalPartition and LocalExchange
EnforceSingleRowNode EnforceSingleRow
AssignUniqueIdNode AssignUniqueId
WindowNode Window
WindowNode Window or StreamingWindow
RowNumberNode RowNumber
TopNRowNumberNode TopNRowNumber
========================== ============================================== ===========================
Expand Down Expand Up @@ -550,6 +550,8 @@ If no sorting columns are specified then the order of the results is unspecified
- Output column names for each window function invocation in windowFunctions list below.
* - windowFunctions
- Window function calls with the frame clause. e.g row_number(), first_value(name) between range 10 preceding and current row. The default frame is between range unbounded preceding and current row.
* - inputsSorted
- If the input data is already sorted, the StreamingWindow will be used to reduce the memory footprint.

RowNumberNode
~~~~~~~~~~~~~
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ add_library(
RowContainer.cpp
RowNumber.cpp
SortedAggregations.cpp
SortWindowBuild.cpp
Spill.cpp
SpillOperatorGroup.cpp
Spiller.cpp
StreamingAggregation.cpp
StreamingWindowBuild.cpp
TableScan.cpp
TableWriter.cpp
Task.cpp
Expand All @@ -67,6 +69,7 @@ add_library(
Values.cpp
VectorHasher.cpp
Window.cpp
WindowBuild.cpp
WindowFunction.cpp
WindowPartition.cpp
AssignUniqueId.cpp
Expand Down
40 changes: 40 additions & 0 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ class RowContainer {
vector_size_t resultOffset,
const VectorPtr& result);

/// Sets in result all locations with null values in col for rows
/// (for numRows number of rows).
static void extractNulls(
const char* FOLLY_NONNULL const* FOLLY_NONNULL rows,
int32_t numRows,
RowColumn col,
const BufferPtr& result);

// Copies the values at 'columnIndex' into 'result' for the
// 'numRows' rows pointed to by 'rows'. If an entry in 'rows' is null, sets
// corresponding row in 'result' to null.
Expand Down Expand Up @@ -350,6 +358,15 @@ class RowContainer {
rows, rowNumbers, columnAt(columnIndex), resultOffset, result);
}

/// Sets in result all locations with null values in columnIndex for rows.
void extractNulls(
const char* FOLLY_NONNULL const* FOLLY_NONNULL rows,
int32_t numRows,
int32_t columnIndex,
const BufferPtr& result) {
extractNulls(rows, numRows, columnAt(columnIndex), result);
}

/// Copies the 'probed' flags for the specified rows into 'result'.
/// The 'result' is expected to be flat vector of type boolean.
/// For rows with null keys, sets null in 'result' if 'setNullForNullKeysRow'
Expand Down Expand Up @@ -1279,6 +1296,29 @@ inline void RowContainer::extractColumn(
result);
}

inline void RowContainer::extractNulls(
const char* FOLLY_NONNULL const* FOLLY_NONNULL rows,
int32_t numRows,
RowColumn column,
const BufferPtr& result) {
VELOX_DCHECK(result->size() >= bits::nbytes(numRows));
auto* rawResult = result->asMutable<uint64_t>();
bits::fillBits(rawResult, 0, numRows, false);

auto nullMask = column.nullMask();
if (!nullMask) {
return;
}

auto nullByte = column.nullByte();
for (int32_t i = 0; i < numRows; ++i) {
const char* row = rows[i];
if (row == nullptr || isNullAt(row, nullByte, nullMask)) {
bits::setBit(rawResult, i, true);
}
}
}

template <bool mayHaveNulls>
inline bool RowContainer::equals(
const char* FOLLY_NONNULL row,
Expand Down
131 changes: 131 additions & 0 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/SortWindowBuild.h"

namespace facebook::velox::exec {

SortWindowBuild::SortWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool)
: WindowBuild(windowNode, pool) {
allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size());
allKeyInfo_.insert(
allKeyInfo_.cend(), partitionKeyInfo_.begin(), partitionKeyInfo_.end());
allKeyInfo_.insert(
allKeyInfo_.cend(), sortKeyInfo_.begin(), sortKeyInfo_.end());
partitionStartRows_.resize(0);
}

void SortWindowBuild::addInput(RowVectorPtr input) {
for (auto col = 0; col < input->childrenSize(); ++col) {
decodedInputVectors_[col].decode(*input->childAt(col));
}

// Add all the rows into the RowContainer.
for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}
}
numRows_ += input->size();
}

void SortWindowBuild::computePartitionStartRows() {
partitionStartRows_.reserve(numRows_);
auto partitionCompare = [&](const char* lhs, const char* rhs) -> bool {
return compareRowsWithKeys(lhs, rhs, partitionKeyInfo_);
};

// Using a sequential traversal to find changing partitions.
// This algorithm is inefficient and can be changed
// i) Use a binary search kind of strategy.
// ii) If we use a Hashtable instead of a full sort then the count
// of rows in the partition can be directly used.
partitionStartRows_.push_back(0);

VELOX_CHECK_GT(sortedRows_.size(), 0);
for (auto i = 1; i < sortedRows_.size(); i++) {
if (partitionCompare(sortedRows_[i - 1], sortedRows_[i])) {
partitionStartRows_.push_back(i);
}
}

// Setting the startRow of the (last + 1) partition to be returningRows.size()
// to help for last partition related calculations.
partitionStartRows_.push_back(sortedRows_.size());
}

void SortWindowBuild::sortPartitions() {
// This is a very inefficient but easy implementation to order the input rows
// by partition keys + sort keys.
// Sort the pointers to the rows in RowContainer (data_) instead of sorting
// the rows.
sortedRows_.resize(numRows_);
RowContainerIterator iter;
data_->listRows(&iter, numRows_, sortedRows_.data());

std::sort(
sortedRows_.begin(),
sortedRows_.end(),
[this](const char* leftRow, const char* rightRow) {
return compareRowsWithKeys(leftRow, rightRow, allKeyInfo_);
});

computePartitionStartRows();
}

void SortWindowBuild::noMoreInput() {
if (numRows_ == 0) {
return;
}
// At this point we have seen all the input rows. The operator is
// being prepared to output rows now.
// To prepare the rows for output in SortWindowBuild they need to
// be separated into partitions and sort by ORDER BY keys within
// the partition. This will order the rows for getOutput().
sortPartitions();
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
VELOX_CHECK(partitionStartRows_.size() > 0, "No window partitions available")

currentPartition_++;
VELOX_CHECK(
currentPartition_ <= partitionStartRows_.size() - 2,
"All window partitions consumed");

auto windowPartition = std::make_unique<WindowPartition>(
data_.get(), inputColumns_, sortKeyInfo_);
// There is partition data available now.
auto partitionSize = partitionStartRows_[currentPartition_ + 1] -
partitionStartRows_[currentPartition_];
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
windowPartition->resetPartition(partition);

return windowPartition;
}

bool SortWindowBuild::hasNextPartition() {
return partitionStartRows_.size() > 0 &&
currentPartition_ < int(partitionStartRows_.size() - 2);
}

} // namespace facebook::velox::exec
78 changes: 78 additions & 0 deletions velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

// Sorts input data of the Window by {partition keys, sort keys}
// to identify window partitions. This sort fully orders
// rows as needed for window function computation.
class SortWindowBuild : public WindowBuild {
public:
SortWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool);

bool needsInput() override {
// No partitions are available yet, so can consume input rows.
return partitionStartRows_.size() == 0;
}

void addInput(RowVectorPtr input) override;

void noMoreInput() override;

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;

private:
// Main sorting function loop done after all input rows are received
// by WindowBuild.
void sortPartitions();

// Function to compute the partitionStartRows_ structure.
// partitionStartRows_ is vector of the starting rows index
// of each partition in the data. This is an auxiliary
// structure that helps simplify the window function computations.
void computePartitionStartRows();

// allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_).
// It is used to perform a full sorting of the input rows to be able to
// separate partitions and sort the rows in it. The rows are output in
// this order by the operator.
std::vector<std::pair<column_index_t, core::SortOrder>> allKeyInfo_;

// Vector of pointers to each input row in the data_ RowContainer.
// The rows are sorted by partitionKeys + sortKeys. This total
// ordering can be used to split partitions (with the correct
// order by) for the processing.
std::vector<char*> sortedRows_;

// This is a vector that gives the index of the start row
// (in sortedRows_) of each partition in the RowContainer data_.
// This auxiliary structure helps demarcate partitions.
std::vector<vector_size_t> partitionStartRows_;

// Current partition being output. Used to construct WindowPartitions
// during resetPartition.
vector_size_t currentPartition_ = -1;
};

} // namespace facebook::velox::exec
Loading
Loading