Skip to content

Commit

Permalink
Hash-map based GROUP BY can now process lazy inputs (#1651)
Browse files Browse the repository at this point in the history
Since #1229, QLever supports a hash-map based GROUP BY, which can be activated via the runtime parameter `group-by-hash-map-enabled` (which is `false` by default). So far, this required a fully materialized input. With this change, the input can be processed lazily. The output is still fully materialized and never lazy (because, when the input is not sorted, we cannot output any group until we have seen the last input row).
  • Loading branch information
joka921 authored Jan 4, 2025
1 parent 97c195a commit 2953f16
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 77 deletions.
170 changes: 101 additions & 69 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
if (useHashMapOptimization) {
const auto* child = _subtree->getRootOperation()->getChildren().at(0);
// Skip sorting
subresult = child->getResult();
subresult = child->getResult(true);
// Update runtime information
auto runTimeInfoChildren =
child->getRootOperation()->getRuntimeInfoPointer();
Expand Down Expand Up @@ -366,13 +366,28 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
}

if (useHashMapOptimization) {
auto localVocab = subresult->getCopyOfLocalVocab();
IdTable idTable = CALL_FIXED_SIZE(
groupByCols.size(), &GroupBy::computeGroupByForHashMapOptimization,
this, metadataForUnsequentialData->aggregateAliases_,
subresult->idTable(), groupByCols, &localVocab);
// Helper lambda that calls `computeGroupByForHashMapOptimization` for the
// given `subresults`.
auto computeWithHashMap = [this, &metadataForUnsequentialData,
&groupByCols](auto&& subresults) {
auto doCompute = [&]<int NumCols> {
return computeGroupByForHashMapOptimization<NumCols>(
metadataForUnsequentialData->aggregateAliases_, AD_FWD(subresults),
groupByCols);
};
return ad_utility::callFixedSize(groupByCols.size(), doCompute);
};

return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
// Now call `computeWithHashMap` and return the result. It expects a range
// of results, so if the result is fully materialized, we create an array
// with a single element.
if (subresult->isFullyMaterialized()) {
return computeWithHashMap(
std::array{std::pair{std::cref(subresult->idTable()),
std::cref(subresult->localVocab())}});
} else {
return computeWithHashMap(std::move(subresult->idTables()));
}
}

size_t inWidth = _subtree->getResultWidth();
Expand Down Expand Up @@ -846,7 +861,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
const auto& index = getExecutionContext()->getIndex();

// TODO<joka921, C++23> Simplify the following pattern by using
// `ql::views::chunkd_by` and implement a lazy version of this view for
// `ql::views::chunk_by` and implement a lazy version of this view for
// input iterators.

// Take care of duplicate values in the input.
Expand Down Expand Up @@ -1487,78 +1502,95 @@ static constexpr auto makeProcessGroupsVisitor =

// _____________________________________________________________________________
template <size_t NUM_GROUP_COLUMNS>
IdTable GroupBy::computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases,
const IdTable& subresult, const std::vector<size_t>& columnIndices,
LocalVocab* localVocab) const {
AD_CONTRACT_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS ||
NUM_GROUP_COLUMNS == 0);

// Initialize aggregation data
Result GroupBy::computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases, auto subresults,
const std::vector<size_t>& columnIndices) const {
AD_CORRECTNESS_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS ||
NUM_GROUP_COLUMNS == 0);
LocalVocab localVocab;

// Initialize the data for the aggregates of the GROUP BY operation.
HashMapAggregationData<NUM_GROUP_COLUMNS> aggregationData(
getExecutionContext()->getAllocator(), aggregateAliases,
columnIndices.size());

// Initialize evaluation context
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), subresult,
getExecutionContext()->getAllocator(), *localVocab, cancellationHandle_,
deadline_);

evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
_groupByVariables.begin(), _groupByVariables.end()};
evaluationContext._isPartOfGroupBy = true;

// Process the input blocks (pairs of `IdTable` and `LocalVocab`) one after
// the other.
ad_utility::Timer lookupTimer{ad_utility::Timer::Stopped};
ad_utility::Timer aggregationTimer{ad_utility::Timer::Stopped};
for (size_t i = 0; i < subresult.size(); i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
checkCancellation();

evaluationContext._beginIndex = i;
evaluationContext._endIndex =
std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, subresult.size());

auto currentBlockSize = evaluationContext.size();

// Perform HashMap lookup once for all groups in current block
using U = HashMapAggregationData<NUM_GROUP_COLUMNS>::template ArrayOrVector<
std::span<const Id>>;
U groupValues;
resizeIfVector(groupValues, columnIndices.size());

// TODO<C++23> use views::enumerate
size_t j = 0;
for (auto& idx : columnIndices) {
groupValues[j] = subresult.getColumn(idx).subspan(
evaluationContext._beginIndex, currentBlockSize);
++j;
}
lookupTimer.cont();
auto hashEntries = aggregationData.getHashEntries(groupValues);
lookupTimer.stop();

aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
sparqlExpression::ExpressionResult expressionResult =
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
aggregate.aggregateDataIndex_);

std::visit(makeProcessGroupsVisitor(currentBlockSize,
&evaluationContext, hashEntries),
std::move(expressionResult), aggregationDataVariant);
for (const auto& [inputTableRef, inputLocalVocabRef] : subresults) {
const IdTable& inputTable = inputTableRef;
const LocalVocab& inputLocalVocab = inputLocalVocabRef;

// Merge the local vocab of each input block.
//
// NOTE: If the input blocks have very similar or even identical non-empty
// local vocabs, no deduplication is performed.
localVocab.mergeWith(std::span{&inputLocalVocab, 1});

// Setup the `EvaluationContext` for this input block.
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), inputTable,
getExecutionContext()->getAllocator(), localVocab, cancellationHandle_,
deadline_);
evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
_groupByVariables.begin(), _groupByVariables.end()};
evaluationContext._isPartOfGroupBy = true;

// Iterate of the rows of this input block. Process (up to)
// `GROUP_BY_HASH_MAP_BLOCK_SIZE` rows at a time.
for (size_t i = 0; i < inputTable.size();
i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
checkCancellation();

evaluationContext._beginIndex = i;
evaluationContext._endIndex =
std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, inputTable.size());

auto currentBlockSize = evaluationContext.size();

// Perform HashMap lookup once for all groups in current block
using U = HashMapAggregationData<
NUM_GROUP_COLUMNS>::template ArrayOrVector<std::span<const Id>>;
U groupValues;
resizeIfVector(groupValues, columnIndices.size());

// TODO<C++23> use views::enumerate
size_t j = 0;
for (auto& idx : columnIndices) {
groupValues[j] = inputTable.getColumn(idx).subspan(
evaluationContext._beginIndex, currentBlockSize);
++j;
}
lookupTimer.cont();
auto hashEntries = aggregationData.getHashEntries(groupValues);
lookupTimer.stop();

aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
sparqlExpression::ExpressionResult expressionResult =
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
aggregate.aggregateDataIndex_);

std::visit(makeProcessGroupsVisitor(currentBlockSize,
&evaluationContext, hashEntries),
std::move(expressionResult), aggregationDataVariant);
}
}
aggregationTimer.stop();
}
aggregationTimer.stop();
}

runtimeInfo().addDetail("timeMapLookup", lookupTimer.msecs());
runtimeInfo().addDetail("timeAggregation", aggregationTimer.msecs());

return createResultFromHashMap(aggregationData, aggregateAliases, localVocab);
IdTable resultTable =
createResultFromHashMap(aggregationData, aggregateAliases, &localVocab);
return {std::move(resultTable), resultSortedOn(), std::move(localVocab)};
}

// _____________________________________________________________________________
Expand Down
14 changes: 6 additions & 8 deletions src/engine/GroupBy.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright 2018, University of Freiburg,
// Copyright 2018 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures.
// Author:
// 2018 Florian Kramer ([email protected])
// 2020- Johannes Kalmbach ([email protected])
// Authors: Florian Kramer [2018]
// Johannes Kalmbach <[email protected]>

#pragma once

Expand Down Expand Up @@ -316,10 +315,9 @@ class GroupBy : public Operation {
// Create result IdTable by using a HashMap mapping groups to aggregation data
// and subsequently calling `createResultFromHashMap`.
template <size_t NUM_GROUP_COLUMNS>
IdTable computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases,
const IdTable& subresult, const std::vector<size_t>& columnIndices,
LocalVocab* localVocab) const;
Result computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases, auto subresults,
const std::vector<size_t>& columnIndices) const;

using AggregationData =
std::variant<AvgAggregationData, CountAggregationData, MinAggregationData,
Expand Down
46 changes: 46 additions & 0 deletions test/GroupByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Authors: Florian Kramer ([email protected])
// Johannes Kalmbach ([email protected])

#include <engine/SpatialJoinAlgorithms.h>
#include <gmock/gmock.h>

#include <cstdio>
Expand Down Expand Up @@ -36,6 +37,7 @@ using ::testing::Optional;

namespace {
auto I = IntId;
auto D = DoubleId;

// Return a matcher that checks, whether a given `std::optional<IdTable` has a
// value and that value is equal to `makeIdTableFromVector(table)`.
Expand Down Expand Up @@ -747,6 +749,50 @@ TEST_F(GroupByOptimizations, correctResultForHashMapOptimization) {
resultWithoutOptimization->asDebugString());
}

// _____________________________________________________________________________
TEST_F(GroupByOptimizations, hashMapOptimizationLazyAndMaterializedInputs) {
/* Setup query:
SELECT ?x (AVG(?y) as ?avg) WHERE {
# explicitly defined subresult.
} GROUP BY ?x
*/
// Setup three unsorted input blocks. The first column will be the grouped
// `?x`, and the second column the variable `?y` of which we compute the
// average.
auto runTest = [this](bool inputIsLazy) {
std::vector<IdTable> tables;
tables.push_back(makeIdTableFromVector({{3, 6}, {8, 27}, {5, 7}}, I));
tables.push_back(makeIdTableFromVector({{8, 27}, {5, 9}}, I));
tables.push_back(makeIdTableFromVector({{5, 2}, {3, 4}}, I));
// The expected averages are as follows: (3 -> 5.0), (5 -> 6.0), (8
// -> 27.0).
auto subtree = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(tables),
std::vector<std::optional<Variable>>{Variable{"?x"}, Variable{"?y"}});
auto& values =
dynamic_cast<ValuesForTesting&>(*subtree->getRootOperation());
values.forceFullyMaterialized() = !inputIsLazy;

SparqlExpressionPimpl avgYPimpl = makeAvgPimpl(varY);
std::vector<Alias> aliasesAvgY{Alias{avgYPimpl, Variable{"?avg"}}};

// Calculate result with optimization
qec->getQueryTreeCache().clearAll();
RuntimeParameters().set<"group-by-hash-map-enabled">(true);
GroupBy groupBy{qec, variablesOnlyX, aliasesAvgY, std::move(subtree)};
auto result = groupBy.computeResultOnlyForTesting();
ASSERT_TRUE(result.isFullyMaterialized());
EXPECT_THAT(
result.idTable(),
matchesIdTableFromVector({{I(3), D(5)}, {I(5), D(6)}, {I(8), D(27)}}));
};
runTest(true);
runTest(false);

// Disable optimization for following tests
RuntimeParameters().set<"group-by-hash-map-enabled">(false);
}

// _____________________________________________________________________________
TEST_F(GroupByOptimizations, correctResultForHashMapOptimizationForCountStar) {
/* Setup query:
Expand Down
2 changes: 2 additions & 0 deletions test/engine/ValuesForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ValuesForTesting : public Operation {
}
bool supportsLimit() const override { return supportsLimit_; }

bool& forceFullyMaterialized() { return forceFullyMaterialized_; }

private:
// ___________________________________________________________________________
string getCacheKeyImpl() const override {
Expand Down

0 comments on commit 2953f16

Please sign in to comment.