Skip to content

Commit

Permalink
Support row level streaming for the agg window functions with default…
Browse files Browse the repository at this point in the history
… frame
  • Loading branch information
JkSelf committed Mar 25, 2024
1 parent deb5ed1 commit 763c90f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 61 deletions.
3 changes: 2 additions & 1 deletion velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ void registerAggregateWindowFunction(const std::string& name) {
pool,
stringAllocator,
config);
});
},
{exec::ProcessingUnit::kRow});
}
}
} // namespace facebook::velox::exec
18 changes: 10 additions & 8 deletions velox/exec/RowLevelStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ void RowLevelStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
windowPartitions_[inputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);

inputRows_.clear();
inputCurrentPartition_++;
currentPartitionNum_ = 1;
}

inputRows_.clear();
}

void RowLevelStreamingWindowBuild::addInput(RowVectorPtr input) {
Expand All @@ -70,20 +71,21 @@ void RowLevelStreamingWindowBuild::addInput(RowVectorPtr input) {
buildNextInputOrPartition(true);
}

// Wait for the peers to be ready; these peers are the rows that have
// identical values in the ORDER BY clause.
if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
buildNextInputOrPartition(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}

buildNextInputOrPartition(false);

inputRows_.clear();
}

void RowLevelStreamingWindowBuild::noMoreInput() {
isFinished_ = true;
windowPartitions_[outputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);
inputRows_.clear();
buildNextInputOrPartition(true);
}

std::shared_ptr<WindowPartition> RowLevelStreamingWindowBuild::nextPartition() {
Expand Down
35 changes: 26 additions & 9 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ Window::Window(
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}

} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_);
Expand Down Expand Up @@ -195,12 +194,23 @@ void Window::createWindowFunctions() {
}

// The supportRowLevelStreaming is designed to support 'rank' and
// 'row_number' functions.
// 'row_number' functions and the agg window function with default frame.
bool Window::supportRowLevelStreaming() {
for (const auto& windowNodeFunction : windowNode_->windowFunctions()) {
if (exec::getWindowFunctionEntry(windowNodeFunction.functionCall->name())
.value()
->processingUnit == ProcessingUnit::kPartition) {
const auto& functionName = windowNodeFunction.functionCall->name();
auto windowFunctionEntry =
exec::getWindowFunctionEntry(functionName).value();
if (windowFunctionEntry->processingUnit == ProcessingUnit::kPartition) {
return false;
}

const auto& frame = windowNodeFunction.frame;
bool isDefaultFrame =
(frame.startType == core::WindowNode::BoundType::kUnboundedPreceding &&
frame.endType == core::WindowNode::BoundType::kCurrentRow);
// Only support the agg window function with default frame.
if (!(functionName == "rank" || functionName == "row_number") &&
!isDefaultFrame) {
return false;
}
}
Expand Down Expand Up @@ -360,7 +370,10 @@ void Window::updateFrameBounds(
// Fills the frameBound buffer with increasing value of row indices
// (corresponding to CURRENT ROW) from the startRow of the current
// output buffer.
std::iota(rawFrameBounds, rawFrameBounds + numRows, startRow);
std::iota(
rawFrameBounds,
rawFrameBounds + numRows,
(startRow - currentPartition_->offsetInPartition()));
}
break;
}
Expand Down Expand Up @@ -495,7 +508,8 @@ void Window::computePeerAndFrameBuffers(
// Ranking functions do not care about frames. So the function decides
// further what to do with empty frames.
computeValidFrames(
currentPartition_->numRows() - 1,
currentPartition_->numRows() -
currentPartition_->offsetInPartition() - 1,
numRows,
rawFrameStarts[i],
rawFrameEnds[i],
Expand Down Expand Up @@ -613,8 +627,11 @@ RowVectorPtr Window::getOutput() {
}
}

if (!currentPartition_->isFinished()) {
currentPartition_->buildNextBatch();
// BuildNextBatch until all the rows in currentPartition finished.
if (partitionOffset_ >= currentPartition_->numRows()) {
if (!currentPartition_->buildNextBatch()) {
return nullptr;
}
}

auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft);
Expand Down
39 changes: 5 additions & 34 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,12 @@ WindowPartition::WindowPartition(
}
}

void WindowPartition::buildNextBatch() {
bool WindowPartition::buildNextBatch() {
if (rows_.size() == 0 ||
(currentBatchIndex_ >= 0 && currentBatchIndex_ == (rows_.size() - 1)))
return;
peerGroup_ = false;
currentBatchIndex_++;
// Compute whehter the last row in current batch is same with the first row
// in next batch.
auto peerCompare = [&](const char* lhs, const char* rhs) -> bool {
return compareRowsWithSortKeys(lhs, rhs);
};
if (!peerCompare(
partition_[partition_.size() - 1], rows_[currentBatchIndex_][0])) {
peerGroup_ = true;
}
return false;

currentBatchIndex_++;
// Erase partition_ in data_ and itself.
data_->eraseRows(partition_);
offsetInPartition_ += partition_.size();
Expand All @@ -58,6 +48,7 @@ void WindowPartition::buildNextBatch() {
partition_ = folly::Range(
rows_[currentBatchIndex_].data(), rows_[currentBatchIndex_].size());
processedNum_ += partition_.size();
return true;
}

void WindowPartition::extractColumn(
Expand Down Expand Up @@ -176,27 +167,7 @@ std::pair<vector_size_t, vector_size_t> WindowPartition::computePeerBuffers(
auto peerStart = prevPeerStart;
auto peerEnd = prevPeerEnd;

auto nextStart = start;
if (peerGroup_) {
peerEnd++;
nextStart = start + 1;
while (nextStart <= lastPartitionRow) {
if (peerCompare(
partition_[start - offsetInPartition_],
partition_[nextStart - offsetInPartition_])) {
break;
}
peerEnd++;
nextStart++;
}

for (auto j = start; j < nextStart; j++) {
rawPeerStarts[j - offsetInPartition_] = peerStart;
rawPeerEnds[j - offsetInPartition_] = peerEnd;
}
}

for (auto i = nextStart, j = (nextStart - start); i < end; i++, j++) {
for (auto i = start, j = 0; i < end; i++, j++) {
// When traversing input partition rows, the peers are the rows
// with the same values for the ORDER BY clause. These rows
// are equal in some ways and affect the results of ranking functions.
Expand Down
10 changes: 5 additions & 5 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class WindowPartition {
return partition_.size() + offsetInPartition_;
}

vector_size_t offsetInPartition() const {
return offsetInPartition_;
}

void insertNewBatch(const std::vector<char*>& inputRows) {
rows_.push_back(inputRows);
}
Expand All @@ -66,7 +70,7 @@ class WindowPartition {
totalNum_ = num;
}

void buildNextBatch();
bool buildNextBatch();

bool isFinished() {
return (!rowLevelStreaming_) || (totalNum_ == processedNum_);
Expand Down Expand Up @@ -215,10 +219,6 @@ class WindowPartition {
// The processed num in current partition.
vector_size_t processedNum_ = 0;

/// Whether the last row of current batch is same with the first row of next
/// batch.
bool peerGroup_ = false;

// Whether support streaming in WindowPartition.
bool rowLevelStreaming_ = false;

Expand Down
10 changes: 6 additions & 4 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ TEST_F(WindowTest, rankLikeWithEqualValue) {

createDuckDbTable({data});

const std::vector<std::string> kClauses = {"rank() over (order by c1)"};
const std::vector<std::string> kClauses = {
"rank() over (order by c1)", "sum(c1) over (order by c1)"};
core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values({data})
Expand All @@ -104,7 +105,8 @@ TEST_F(WindowTest, rankLikeWithEqualValue) {
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.spillDirectory(spillDirectory->path)
.assertResults("SELECT *, rank() over (order by c1) FROM tmp");
.assertResults(
"SELECT *, rank() over (order by c1), sum(c1) over (order by c1) FROM tmp");
}

TEST_F(WindowTest, rankLikeOptimization) {
Expand All @@ -125,7 +127,7 @@ TEST_F(WindowTest, rankLikeOptimization) {
const std::vector<std::string> kClauses = {
"rank() over (partition by p order by s)",
"row_number() over (partition by p order by s)",
};
"sum(d) over (partition by p order by s)"};
core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values({split(data, 10)})
Expand All @@ -142,7 +144,7 @@ TEST_F(WindowTest, rankLikeOptimization) {
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.spillDirectory(spillDirectory->path)
.assertResults(
"SELECT *, rank() over (partition by p order by s), row_number() over (partition by p order by s) FROM tmp");
"SELECT *, rank() over (partition by p order by s), row_number() over (partition by p order by s), sum(d) over (partition by p order by s) FROM tmp");
}

TEST_F(WindowTest, missingFunctionSignature) {
Expand Down

0 comments on commit 763c90f

Please sign in to comment.