Skip to content

Commit

Permalink
Support row level streaming for the agg window functions with default…
Browse files Browse the repository at this point in the history
… frame
  • Loading branch information
JkSelf committed Mar 27, 2024
1 parent ac67d68 commit 26a7b32
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 94 deletions.
3 changes: 2 additions & 1 deletion velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ void registerAggregateWindowFunction(const std::string& name) {
pool,
stringAllocator,
config);
});
},
exec::ProcessingUnit::kRow);
}
}
} // namespace facebook::velox::exec
27 changes: 14 additions & 13 deletions velox/exec/RowLevelStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ void RowLevelStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
inputColumns_,
sortKeyInfo_,
ProcessingUnit::kRow));
} else {
windowPartitions_[inputCurrentPartition_]->insertNewBatch(
sortedRows_.back());
}

windowPartitions_[inputCurrentPartition_]->insertNewBatch(sortedRows_.back());

if (isFinished) {
windowPartitions_[inputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);

inputRows_.clear();
inputCurrentPartition_++;
currentPartitionNum_ = 1;
}

inputRows_.clear();
}

void RowLevelStreamingWindowBuild::addInput(RowVectorPtr input) {
Expand All @@ -70,29 +70,30 @@ void RowLevelStreamingWindowBuild::addInput(RowVectorPtr input) {
buildNextInputOrPartition(true);
}

// Wait for the peers to be ready in single partition; these peers are the
// rows that have identical values in the ORDER BY clause.
if (previousRow_ != nullptr && inputRows_.size() > 0 &&
compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
buildNextInputOrPartition(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}

buildNextInputOrPartition(false);

inputRows_.clear();
}

void RowLevelStreamingWindowBuild::noMoreInput() {
isFinished_ = true;
windowPartitions_[outputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);
inputRows_.clear();
buildNextInputOrPartition(true);
}

std::shared_ptr<WindowPartition> RowLevelStreamingWindowBuild::nextPartition() {
return windowPartitions_[++outputCurrentPartition_];
return windowPartitions_[outputCurrentPartition_++];
}

bool RowLevelStreamingWindowBuild::hasNextPartition() {
return windowPartitions_.size() > 0 &&
outputCurrentPartition_ < int(windowPartitions_.size() - 1);
outputCurrentPartition_ <= int(windowPartitions_.size() - 1);
}

} // namespace facebook::velox::exec
2 changes: 1 addition & 1 deletion velox/exec/RowLevelStreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class RowLevelStreamingWindowBuild : public WindowBuild {
char* previousRow_ = nullptr;

// Current partition being output. Used to return the WidnowPartitions.
vector_size_t outputCurrentPartition_ = -1;
vector_size_t outputCurrentPartition_ = 0;

bool isFinished_ = false;

Expand Down
45 changes: 32 additions & 13 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ Window::Window(
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}

} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_);
Expand Down Expand Up @@ -195,12 +194,23 @@ void Window::createWindowFunctions() {
}

// The supportRowLevelStreaming is designed to support 'rank' and
// 'row_number' functions.
// 'row_number' functions and the agg window function with default frame.
bool Window::supportRowLevelStreaming() {
for (const auto& windowNodeFunction : windowNode_->windowFunctions()) {
if (exec::getWindowFunctionEntry(windowNodeFunction.functionCall->name())
.value()
->processingUnit == ProcessingUnit::kPartition) {
const auto& functionName = windowNodeFunction.functionCall->name();
auto windowFunctionEntry =
exec::getWindowFunctionEntry(functionName).value();
if (windowFunctionEntry->processingUnit == ProcessingUnit::kPartition) {
return false;
}

const auto& frame = windowNodeFunction.frame;
bool isDefaultFrame =
(frame.startType == core::WindowNode::BoundType::kUnboundedPreceding &&
frame.endType == core::WindowNode::BoundType::kCurrentRow);
// Only support the agg window function with default frame.
if (!(functionName == "rank" || functionName == "row_number") &&
!isDefaultFrame) {
return false;
}
}
Expand Down Expand Up @@ -532,7 +542,8 @@ void Window::computePeerAndFrameBuffers(
// Ranking functions do not care about frames. So the function decides
// further what to do with empty frames.
computeValidFrames(
currentPartition_->numRows() - 1,
currentPartition_->numRows() -
currentPartition_->offsetInPartition() - 1,
numRows,
rawFrameStarts[i],
rawFrameEnds[i],
Expand Down Expand Up @@ -601,12 +612,16 @@ vector_size_t Window::callApplyLoop(
result);
resultIndex += rowsForCurrentPartition;
numOutputRowsLeft -= rowsForCurrentPartition;
if (currentPartition_->isFinished()) {
callResetPartition();
if (currentPartition_->supportRowLevelStreaming()) {
if (currentPartition_->isFinished()) {
callResetPartition();
} else {
// Break until the next getOutput call to handle the remaining data in
// currentPartition_.
break;
}
} else {
// Break until the next getOutput call to handle the remaining data in
// currentPartition_.
break;
callResetPartition();
}

if (!currentPartition_) {
Expand Down Expand Up @@ -650,8 +665,12 @@ RowVectorPtr Window::getOutput() {
}
}

if (!currentPartition_->isFinished()) {
currentPartition_->buildNextBatch();
// BuildNextBatch until all the rows in currentPartition finished.
if (currentPartition_->supportRowLevelStreaming() &&
partitionOffset_ == currentPartition_->numRows()) {
if (!currentPartition_->buildNextBatch()) {
return nullptr;
}
}

auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft);
Expand Down
55 changes: 13 additions & 42 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,29 @@ WindowPartition::WindowPartition(
partition_(rows),
columns_(columns),
sortKeyInfo_(sortKeyInfo) {
processedNum_ = partition_.size();

if (processingUnit == ProcessingUnit::kRow) {
rowLevelStreaming_ = true;
processingUnit_ = processingUnit;
processedNum_ = partition_.size();
}
}

void WindowPartition::buildNextBatch() {
bool WindowPartition::buildNextBatch() {
if (rows_.size() == 0 ||
(currentBatchIndex_ >= 0 && currentBatchIndex_ == (rows_.size() - 1)))
return;
peerGroup_ = false;
currentBatchIndex_++;
// Compute whehter the last row in current batch is same with the first row
// in next batch.
auto peerCompare = [&](const char* lhs, const char* rhs) -> bool {
return compareRowsWithSortKeys(lhs, rhs);
};
if (!peerCompare(
partition_[partition_.size() - 1], rows_[currentBatchIndex_][0])) {
peerGroup_ = true;
}
(currentBatchIndex_ == (rows_.size() - 1)))
return false;

// Erase partition_ in data_ and itself.
data_->eraseRows(partition_);
offsetInPartition_ += partition_.size();
partition_.clear();
// Erase partition_ in data_ and itself.
data_->eraseRows(folly::Range(
rows_[currentBatchIndex_].data(), rows_[currentBatchIndex_].size()));
rows_[currentBatchIndex_].clear();

currentBatchIndex_++;
// Set new partition_.
partition_ = folly::Range(
rows_[currentBatchIndex_].data(), rows_[currentBatchIndex_].size());
processedNum_ += partition_.size();
return true;
}

void WindowPartition::extractColumn(
Expand Down Expand Up @@ -175,28 +167,7 @@ std::pair<vector_size_t, vector_size_t> WindowPartition::computePeerBuffers(
auto lastPartitionRow = numRows() - 1;
auto peerStart = prevPeerStart;
auto peerEnd = prevPeerEnd;

auto nextStart = start;
if (peerGroup_) {
peerEnd++;
nextStart = start + 1;
while (nextStart <= lastPartitionRow) {
if (peerCompare(
partition_[start - offsetInPartition_],
partition_[nextStart - offsetInPartition_])) {
break;
}
peerEnd++;
nextStart++;
}

for (auto j = start; j < nextStart; j++) {
rawPeerStarts[j - offsetInPartition_] = peerStart;
rawPeerEnds[j - offsetInPartition_] = peerEnd;
}
}

for (auto i = nextStart, j = (nextStart - start); i < end; i++, j++) {
for (auto i = start, j = 0; i < end; i++, j++) {
// When traversing input partition rows, the peers are the rows
// with the same values for the ORDER BY clause. These rows
// are equal in some ways and affect the results of ranking functions.
Expand Down
24 changes: 14 additions & 10 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class WindowPartition {
/// 'columns' : Input rows of 'data' used for accessing column data from it.
/// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to
/// get peer rows from the input partition.
/// 'streaming' : Whether support streaming in WindowPartition.
/// 'processingUnit' : The processing unit: kPartition or kRow.
WindowPartition(
RowContainer* data,
const folly::Range<char**>& rows,
Expand All @@ -58,6 +58,14 @@ class WindowPartition {
return partition_.size() + offsetInPartition_;
}

vector_size_t offsetInPartition() const {
return offsetInPartition_;
}

bool supportRowLevelStreaming() {
return processingUnit_ == ProcessingUnit::kRow;
}

void insertNewBatch(const std::vector<char*>& inputRows) {
rows_.push_back(inputRows);
}
Expand All @@ -66,10 +74,11 @@ class WindowPartition {
totalNum_ = num;
}

void buildNextBatch();
bool buildNextBatch();

bool isFinished() {
return (!rowLevelStreaming_) || (totalNum_ == processedNum_);
return (processingUnit_ == ProcessingUnit::kPartition) ||
(totalNum_ == processedNum_);
}

/// Copies the values at 'columnIndex' into 'result' (starting at
Expand Down Expand Up @@ -215,18 +224,13 @@ class WindowPartition {
// The processed num in current partition.
vector_size_t processedNum_ = 0;

/// Whether the last row of current batch is same with the first row of next
/// batch.
bool peerGroup_ = false;

// Whether support streaming in WindowPartition.
bool rowLevelStreaming_ = false;
ProcessingUnit processingUnit_ = ProcessingUnit::kPartition;

// Add new batch in WindowPartition.
std::vector<std::vector<char*>> rows_;

// The batch index in WindowPartition.
vector_size_t currentBatchIndex_ = -1;
vector_size_t currentBatchIndex_ = 0;

// The total num in WindowPartition.
vector_size_t totalNum_ = 0;
Expand Down
14 changes: 8 additions & 6 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ TEST_F(WindowTest, rankLikeWithEqualValue) {

createDuckDbTable({data});

const std::vector<std::string> kClauses = {"rank() over (order by c1)"};
const std::vector<std::string> kClauses = {
"sum(c1) over (order by c1 rows unbounded preceding)"};
core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values({data})
Expand All @@ -104,18 +105,19 @@ TEST_F(WindowTest, rankLikeWithEqualValue) {
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.spillDirectory(spillDirectory->path)
.assertResults("SELECT *, rank() over (order by c1) FROM tmp");
.assertResults(
"SELECT *, sum(c1) over (order by c1 rows unbounded preceding) FROM tmp");
}

TEST_F(WindowTest, rankLikeOptimization) {
const vector_size_t size = 1'000;
const vector_size_t size = 1'0;
auto data = makeRowVector(
{"d", "p", "s"},
{
// Payload.
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
// Partition key.
makeFlatVector<int16_t>(size, [](auto row) { return row % 11; }),
makeFlatVector<int16_t>(size, [](auto row) { return row % 2; }),
// Sorting key.
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
});
Expand All @@ -125,7 +127,7 @@ TEST_F(WindowTest, rankLikeOptimization) {
const std::vector<std::string> kClauses = {
"rank() over (partition by p order by s)",
"row_number() over (partition by p order by s)",
};
"sum(d) over (partition by p order by s)"};
core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values({split(data, 10)})
Expand All @@ -142,7 +144,7 @@ TEST_F(WindowTest, rankLikeOptimization) {
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.spillDirectory(spillDirectory->path)
.assertResults(
"SELECT *, rank() over (partition by p order by s), row_number() over (partition by p order by s) FROM tmp");
"SELECT *, rank() over (partition by p order by s), row_number() over (partition by p order by s), sum(d) over (partition by p order by s) FROM tmp");
}

TEST_F(WindowTest, missingFunctionSignature) {
Expand Down
22 changes: 15 additions & 7 deletions velox/functions/lib/window/Rank.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,28 @@ void registerRankInternal(
exec::FunctionSignatureBuilder().returnType(returnType).build(),
};

exec::registerWindowFunction(
name,
std::move(signatures),
auto windowFunctionFactory =
[name](
const std::vector<exec::WindowFunctionArg>& /*args*/,
const TypePtr& resultType,
bool /*ignoreNulls*/,
velox::memory::MemoryPool* /*pool*/,
HashStringAllocator* /*stringAllocator*/,
const core::QueryConfig& /*queryConfig*/)
-> std::unique_ptr<exec::WindowFunction> {
return std::make_unique<RankFunction<TRank, TResult>>(resultType);
},
{exec::ProcessingUnit::kRow});
-> std::unique_ptr<exec::WindowFunction> {
return std::make_unique<RankFunction<TRank, TResult>>(resultType);
};

if constexpr (TRank == RankType::kRank) {
exec::registerWindowFunction(
name,
std::move(signatures),
std::move(windowFunctionFactory),
exec::ProcessingUnit::kRow);
} else {
exec::registerWindowFunction(
name, std::move(signatures), std::move(windowFunctionFactory));
}
}

void registerRankBigint(const std::string& name) {
Expand Down
Loading

0 comments on commit 26a7b32

Please sign in to comment.