diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 76e53e01664d..a7ebe02b5b78 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -681,8 +681,25 @@ class AggregationNode : public PlanNode { // (https://github.com/facebookincubator/velox/issues/3263) and pre-grouped // aggregation (https://github.com/facebookincubator/velox/issues/3264). We // will add support later to re-enable. - return (isFinal() || isSingle()) && !(aggregates().empty()) && - preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); + if (!queryConfig.aggregationSpillEnabled()) { + return false; + } + + if (!isFinal() && !isSingle()) { + return false; + } + + if (!preGroupedKeys().empty()) { + return false; + } + + // aggregates().empty() means distinct aggregate + if (aggregates().empty() && + !queryConfig.distinctAggregationSpillEnabled()) { + return false; + } + + return true; } bool isFinal() const { diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 28d09d29efab..51bda9132146 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -144,6 +144,10 @@ class QueryConfig { static constexpr const char* kAggregationSpillEnabled = "aggregation_spill_enabled"; + /// Distinct aggregation spilling flag + static constexpr const char* kDistinctAggregationSpillEnabled = + "distinct_aggregation_spill_enabled"; + /// Join spilling flag, only applies if "spill_enabled" flag is set. static constexpr const char* kJoinSpillEnabled = "join_spill_enabled"; @@ -329,6 +333,11 @@ class QueryConfig { return get(kAggregationSpillEnabled, true); } + /// Returns 'is distinct aggregation spilling enabled' flag. + bool distinctAggregationSpillEnabled() const { + return get(kDistinctAggregationSpillEnabled, true); + } + /// Returns 'is join spilling enabled' flag. Must also check the /// spillEnabled()! bool joinSpillEnabled() const { diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 89a04d35983f..406c4315e680 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -70,6 +70,8 @@ HashAggregation::HashAggregation( std::vector aggregateInfos; aggregateInfos.reserve(numAggregates); + printf("[zuochunwei] numHashers:%ld, numAggregates:%ld\n", (long)numHashers, (long)numAggregates); + for (auto i = 0; i < numAggregates; i++) { const auto& aggregate = aggregationNode->aggregates()[i]; @@ -127,6 +129,7 @@ HashAggregation::HashAggregation( "Aggregations over sorted inputs with masks are not supported yet"); } + printf("[zuochunwei] aggregate name:%s, numSortingKeys:%ld\n", aggregate.call->name().c_str(), (long)numSortingKeys); aggregateInfos.emplace_back(std::move(info)); } @@ -159,6 +162,53 @@ HashAggregation::HashAggregation( spillConfig_.has_value() ? &spillConfig_.value() : nullptr, &nonReclaimableSection_, operatorCtx_.get()); + + distinctAggregationSpillEnabled_ = + driverCtx->queryConfig().distinctAggregationSpillEnabled(); + + debug("ctor"); +} + +void HashAggregation::debug(const std::string& str) { + printf("[zuochunwei] %s this=%p, " + "isPartialOutput_:%d, " + "isDistinct_:%d, " + "isGlobal_:%d, " + "isIntermediate_:%d, " + "maxExtendedPartialAggregationMemoryUsage_:%ld, " + "maxPartialAggregationMemoryUsage_:%ld, " + "distinctAggregationSpillEnabled_:%d, " + "partialFull_:%d, " + "newDistincts_:%d, " + "finished_:%d, " + "abandonedPartialAggregation_:%d, " + "abandonPartialAggregationMinRows_:%d, " + "abandonPartialAggregationMinPct_:%d, " + "pushdownChecked_:%d, " + "mayPushdown_:%d, " + "numInputRows_:%ld, " + "numInputVectors_:%ld, " + "numOutputRows_:%ld\n" + , str.c_str(), this + , (int)isPartialOutput_ + , (int)isDistinct_ + , (int)isGlobal_ + , (int)isIntermediate_ + , (long int)maxExtendedPartialAggregationMemoryUsage_ + , (long int)maxPartialAggregationMemoryUsage_ + , (int)distinctAggregationSpillEnabled_ + , (int)partialFull_ + , (int)newDistincts_ + , (int)finished_ + , (int)abandonedPartialAggregation_ + , (int)abandonPartialAggregationMinRows_ + , (int)abandonPartialAggregationMinPct_ + , (int)pushdownChecked_ + , (int)mayPushdown_ + , (long int)numInputRows_ + , (long int)numInputVectors_ + , (long int)numOutputRows_ + ); } bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { @@ -175,6 +225,8 @@ void HashAggregation::addInput(RowVectorPtr input) { if (abandonedPartialAggregation_) { input_ = input; numInputRows_ += input->size(); + printf("[zuochunwei] abandonedPartialAggregation_, numInputRows_:%ld, numOutputRows_:%ld\n", + (long)numInputRows_, (long)numOutputRows_); return; } groupingSet_->addInput(input, mayPushdown_); @@ -188,17 +240,21 @@ void HashAggregation::addInput(RowVectorPtr input) { if (isPartialOutput_ && !isGlobal_ && !isIntermediate_) { if (groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_)) { partialFull_ = true; + printf("[zuochunwei] addInput set partialFull_ = true, maxPartialAggregationMemoryUsage_:%ld\n", (long)maxPartialAggregationMemoryUsage_); } uint64_t kDefaultFlushMemory = 1L << 24; if (groupingSet_->allocatedBytes() > kDefaultFlushMemory && abandonPartialAggregationEarly(groupingSet_->numDistinct())) { partialFull_ = true; + printf("[zuochunwei] partialFull_ = true, allocatedBytes:%ld, numDistinct:%ld\n", + (long)groupingSet_->allocatedBytes(), + (long)groupingSet_->numDistinct()); } } const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ && abandonPartialAggregationEarly(groupingSet_->numDistinct()); - if (isDistinct_) { + if (isDistinct_ && !distinctAggregationSpillEnabled_) { newDistincts_ = !groupingSet_->hashLookup().newGroups.empty(); if (newDistincts_) { @@ -269,6 +325,9 @@ void HashAggregation::resetPartialOutputIfNeed() { VELOX_DCHECK(!isGlobal_); const double aggregationPct = numOutputRows_ == 0 ? 0 : (numOutputRows_ * 1.0) / numInputRows_ * 100; + + char buf[512] = {}; + sprintf(buf, " {aggregationPct:%f numOutputRows_:%ld numInputRows_:%ld} ", aggregationPct, numOutputRows_, numInputRows_); { auto lockedStats = stats_.wlock(); lockedStats->addRuntimeStat( @@ -279,6 +338,9 @@ void HashAggregation::resetPartialOutputIfNeed() { } groupingSet_->resetPartial(); partialFull_ = false; + + debug(buf); + if (!finished_) { maybeIncreasePartialAggregationMemoryUsage(aggregationPct); } @@ -290,10 +352,19 @@ void HashAggregation::resetPartialOutputIfNeed() { void HashAggregation::maybeIncreasePartialAggregationMemoryUsage( double aggregationPct) { // If more than this many are unique at full memory, give up on partial agg. - constexpr int32_t kPartialMinFinalPct = 40; + constexpr int32_t kPartialMinFinalPct = 20; VELOX_DCHECK(isPartialOutput_); // If size is at max and there still is not enough reduction, abandon partial // aggregation. + + char buf[1024]; + sprintf(buf, "{numOutputRows_:%ld, aggregationPct:%f, kPartialMinFinalPct:%d, maxPartialAggregationMemoryUsage_:%ld, maxExtendedPartialAggregationMemoryUsage_:%ld}", + numOutputRows_, + aggregationPct, + kPartialMinFinalPct, + (long)maxPartialAggregationMemoryUsage_, + (long)maxExtendedPartialAggregationMemoryUsage_); + if (abandonPartialAggregationEarly(numOutputRows_) || (aggregationPct > kPartialMinFinalPct && maxPartialAggregationMemoryUsage_ >= @@ -302,6 +373,7 @@ void HashAggregation::maybeIncreasePartialAggregationMemoryUsage( pool()->release(); addRuntimeStat("abandonedPartialAggregation", RuntimeCounter(1)); abandonedPartialAggregation_ = true; + printf("[zuochunwei] set abandonedPartialAggregation_ = true, %s\n", buf); return; } const int64_t extendedPartialAggregationMemoryUsage = std::min( @@ -313,7 +385,9 @@ void HashAggregation::maybeIncreasePartialAggregationMemoryUsage( const int64_t memoryToReserve = std::max( 0, extendedPartialAggregationMemoryUsage - groupingSet_->allocatedBytes()); + if (!pool()->maybeReserve(memoryToReserve)) { + printf("[zuochunwei] maybeReserve %ld return fasle %s\n", memoryToReserve, buf); return; } // Update the aggregation memory usage size limit on memory reservation @@ -323,6 +397,9 @@ void HashAggregation::maybeIncreasePartialAggregationMemoryUsage( "maxExtendedPartialAggregationMemoryUsage", RuntimeCounter( maxPartialAggregationMemoryUsage_, RuntimeCounter::Unit::kBytes)); + + printf("[zuochunwei] maybeReserve %ld return true %s\n", memoryToReserve, buf); + debug("End"); } RowVectorPtr HashAggregation::getOutput() { @@ -340,6 +417,8 @@ RowVectorPtr HashAggregation::getOutput() { prepareOutput(input_->size()); groupingSet_->toIntermediate(input_, output_); numOutputRows_ += input_->size(); + printf("[zuochunwei] abandonedPartialAggregation_ getOutput, numOutputRows_:%ld, numInputRows_:%ld, inputSize:%ld\n", + (long)numOutputRows_, (long)numInputRows_, (long)input_->size()); input_ = nullptr; return output_; } @@ -355,7 +434,7 @@ RowVectorPtr HashAggregation::getOutput() { return nullptr; } - if (isDistinct_) { + if (isDistinct_ && !distinctAggregationSpillEnabled_) { if (!newDistincts_) { if (noMoreInput_) { finished_ = true; @@ -372,8 +451,8 @@ RowVectorPtr HashAggregation::getOutput() { auto output = fillOutput(size, indices); numOutputRows_ += size; - // Drop reference to input_ to make it singly-referenced at the producer and - // allow for memory reuse. + // Drop reference to input_ to make it singly-referenced at the producer + // and allow for memory reuse. input_ = nullptr; resetPartialOutputIfNeed(); @@ -396,6 +475,8 @@ RowVectorPtr HashAggregation::getOutput() { return nullptr; } numOutputRows_ += output_->size(); + printf("[zuochunwei] getOutput, numOutputRows_:%ld, numInputRows_:%ld, outputSize:%ld\n", + (long)numOutputRows_, (long)numInputRows_, (long)output_->size()); return output_; } diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 5300d733562e..a1c3d437d651 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -47,6 +47,8 @@ class HashAggregation : public Operator { void close() override; + void debug(const std::string& str); + private: void updateRuntimeStats(); @@ -80,6 +82,8 @@ class HashAggregation : public Operator { int64_t maxPartialAggregationMemoryUsage_; std::unique_ptr groupingSet_; + bool distinctAggregationSpillEnabled_{false}; + bool partialFull_ = false; bool newDistincts_ = false; bool finished_ = false;