diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 0f675a6dec37..37e415e034fd 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -19,6 +19,7 @@ #include #include "velox/common/base/Exceptions.h" +#include "velox/common/base/Portability.h" #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/future/VeloxPromise.h" #include "velox/common/time/Timer.h" @@ -356,6 +357,45 @@ class MemoryReclaimer { MemoryReclaimer() = default; }; +/// The object is used to set/clear non-reclaimable section of an operation in +/// the middle of its execution. It allows the memory arbitrator to reclaim +/// memory from a running operator which is waiting for memory arbitration. +/// 'nonReclaimableSection' points to the corresponding flag of the associated +/// operator. +class ReclaimableSectionGuard { + public: + explicit ReclaimableSectionGuard(tsan_atomic* nonReclaimableSection) + : nonReclaimableSection_(nonReclaimableSection), + oldNonReclaimableSectionValue_(*nonReclaimableSection_) { + *nonReclaimableSection_ = false; + } + + ~ReclaimableSectionGuard() { + *nonReclaimableSection_ = oldNonReclaimableSectionValue_; + } + + private: + tsan_atomic* const nonReclaimableSection_; + const bool oldNonReclaimableSectionValue_; +}; + +class NonReclaimableSectionGuard { + public: + explicit NonReclaimableSectionGuard(tsan_atomic* nonReclaimableSection) + : nonReclaimableSection_(nonReclaimableSection), + oldNonReclaimableSectionValue_(*nonReclaimableSection_) { + *nonReclaimableSection_ = true; + } + + ~NonReclaimableSectionGuard() { + *nonReclaimableSection_ = oldNonReclaimableSectionValue_; + } + + private: + tsan_atomic* const nonReclaimableSection_; + const bool oldNonReclaimableSectionValue_; +}; + /// The memory arbitration context which is set on per-thread local variable by /// memory arbitrator. It is used to indicate a running thread is under memory /// arbitration processing or not. This helps to enable sanity check such as all diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index d627119b9862..3e770d973ed1 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -180,8 +180,8 @@ std::shared_ptr createSortPool( return writerPool->addLeafChild(fmt::format("{}.sort", writerPool->name())); } -#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \ - exec::NonReclaimableSectionGuard nonReclaimableGuard( \ +#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \ + memory::NonReclaimableSectionGuard nonReclaimableGuard( \ writerInfo_[(index)]->nonReclaimableSectionHolder.get()) } // namespace diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index f23a1d77c2d8..4aa4bc86d317 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -1654,7 +1654,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) { // Expect a throw if we don't set the non-reclaimable section. VELOX_ASSERT_THROW(writer->write(vectors[0]), ""); { - exec::NonReclaimableSectionGuard nonReclaimableGuard( + memory::NonReclaimableSectionGuard nonReclaimableGuard( &nonReclaimableSection); for (size_t i = 0; i < vectors.size(); ++i) { writer->write(vectors[i]); @@ -1753,7 +1753,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnFlush) { })); { - exec::NonReclaimableSectionGuard nonReclaimableGuard( + memory::NonReclaimableSectionGuard nonReclaimableGuard( &nonReclaimableSection); for (size_t i = 0; i < vectors.size(); ++i) { writer->write(vectors[i]); @@ -1833,7 +1833,7 @@ TEST_F(E2EWriterTest, memoryReclaimAfterClose) { writer->flush(); { - exec::NonReclaimableSectionGuard nonReclaimableGuard( + memory::NonReclaimableSectionGuard nonReclaimableGuard( &nonReclaimableSection); for (size_t i = 0; i < vectors.size(); ++i) { writer->write(vectors[i]); @@ -1926,7 +1926,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimDuringInit) { std::unique_ptr writer; { - exec::NonReclaimableSectionGuard nonReclaimableGuard( + memory::NonReclaimableSectionGuard nonReclaimableGuard( &nonReclaimableSection); std::thread writerThread([&]() { writer = @@ -1989,7 +1989,7 @@ TEST_F(E2EWriterTest, memoryReclaimThreshold) { std::make_unique(std::move(sink), options, dwrfPool); { - exec::NonReclaimableSectionGuard nonReclaimableGuard( + memory::NonReclaimableSectionGuard nonReclaimableGuard( &nonReclaimableSection); for (size_t i = 0; i < vectors.size(); ++i) { writer->write(vectors[i]); diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 4fe4dc364dd2..d8344af5263f 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -222,7 +222,7 @@ void Writer::ensureWriteFits(size_t appendBytes, size_t appendRows) { // Allows the memory arbitrator to reclaim memory from this file writer if the // memory reservation below has triggered memory arbitration. - exec::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_); + memory::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_); const size_t estimatedAppendMemoryBytes = std::max(appendBytes, context.estimateNextWriteSize(appendRows)); @@ -254,7 +254,7 @@ void Writer::ensureStripeFlushFits() { // Allows the memory arbitrator to reclaim memory from this file writer if the // memory reservation below has triggered memory arbitration. - exec::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_); + memory::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_); auto& context = getContext(); const size_t estimateFlushMemoryBytes = diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 0af733800e62..15efe8cbaf0e 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -872,7 +872,7 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) { incrementBytes * 2, currentUsage * spillConfig_->spillableReservationGrowthPct / 100); { - ReclaimableSectionGuard guard(nonReclaimableSection_); + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); if (pool_.maybeReserve(targetIncrementBytes)) { return; } @@ -901,7 +901,7 @@ void GroupingSet::ensureOutputFits() { const uint64_t outputBufferSizeToReserve = queryConfig_.preferredOutputBatchBytes() * 1.2; { - ReclaimableSectionGuard guard(nonReclaimableSection_); + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); if (pool_.maybeReserve(outputBufferSizeToReserve)) { return; } diff --git a/velox/exec/MemoryReclaimer.h b/velox/exec/MemoryReclaimer.h index c04dde73929a..4517a0b42849 100644 --- a/velox/exec/MemoryReclaimer.h +++ b/velox/exec/MemoryReclaimer.h @@ -16,8 +16,6 @@ #pragma once -#include "velox/common/base/Exceptions.h" -#include "velox/common/base/Portability.h" #include "velox/common/memory/MemoryArbitrator.h" namespace facebook::velox::exec { @@ -47,43 +45,4 @@ class MemoryReclaimer : public memory::MemoryReclaimer { /// drivers to go off thread. A suspended driver thread is not counted as /// running. void memoryArbitrationStateCheck(memory::MemoryPool& pool); - -/// The object is used to set/clear non-reclaimable section of an operation in -/// the middle of its execution. It allows the memory arbitrator to reclaim -/// memory from a running operator which is waiting for memory arbitration. -/// 'nonReclaimableSection' points to the corresponding flag of the associated -/// operator. -class ReclaimableSectionGuard { - public: - explicit ReclaimableSectionGuard(tsan_atomic* nonReclaimableSection) - : nonReclaimableSection_(nonReclaimableSection), - oldNonReclaimableSectionValue_(*nonReclaimableSection_) { - *nonReclaimableSection_ = false; - } - - ~ReclaimableSectionGuard() { - *nonReclaimableSection_ = oldNonReclaimableSectionValue_; - } - - private: - tsan_atomic* const nonReclaimableSection_; - const bool oldNonReclaimableSectionValue_; -}; - -class NonReclaimableSectionGuard { - public: - explicit NonReclaimableSectionGuard(tsan_atomic* nonReclaimableSection) - : nonReclaimableSection_(nonReclaimableSection), - oldNonReclaimableSectionValue_(*nonReclaimableSection_) { - *nonReclaimableSection_ = true; - } - - ~NonReclaimableSectionGuard() { - *nonReclaimableSection_ = oldNonReclaimableSectionValue_; - } - - private: - tsan_atomic* const nonReclaimableSection_; - const bool oldNonReclaimableSectionValue_; -}; } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index cc9784112522..801aef97cc31 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -233,7 +233,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) { estimatedIncrementalBytes * 2, currentMemoryUsage * spillConfig_->spillableReservationGrowthPct / 100); { - exec::ReclaimableSectionGuard guard(nonReclaimableSection_); + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); if (pool_->maybeReserve(targetIncrementBytes)) { return; } diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index ce70f011ac57..2c86ae062575 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -122,7 +122,7 @@ void SortWindowBuild::ensureInputFits(const RowVectorPtr& input) { incrementBytes * 2, currentUsage * spillConfig_->spillableReservationGrowthPct / 100); { - ReclaimableSectionGuard guard(nonReclaimableSection_); + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); if (data_->pool()->maybeReserve(targetIncrementBytes)) { return; } diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index cf2442fb2b7a..28927de3ee85 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/exec/MemoryReclaimer.h" +#include "velox/common/memory/MemoryArbitrator.h" #include "velox/common/memory/MemoryPool.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -116,16 +116,16 @@ TEST_F(MemoryReclaimerTest, abortTest) { TEST(ReclaimableSectionGuard, basic) { tsan_atomic nonReclaimableSection{false}; { - NonReclaimableSectionGuard guard(&nonReclaimableSection); + memory::NonReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_TRUE(nonReclaimableSection); { - ReclaimableSectionGuard guard(&nonReclaimableSection); + memory::ReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_FALSE(nonReclaimableSection); { - ReclaimableSectionGuard guard(&nonReclaimableSection); + memory::ReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_FALSE(nonReclaimableSection); { - NonReclaimableSectionGuard guard(&nonReclaimableSection); + memory::NonReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_TRUE(nonReclaimableSection); } ASSERT_FALSE(nonReclaimableSection); @@ -137,21 +137,21 @@ TEST(ReclaimableSectionGuard, basic) { ASSERT_FALSE(nonReclaimableSection); nonReclaimableSection = true; { - ReclaimableSectionGuard guard(&nonReclaimableSection); + memory::ReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_FALSE(nonReclaimableSection); { - NonReclaimableSectionGuard guard(&nonReclaimableSection); + memory::NonReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_TRUE(nonReclaimableSection); { - ReclaimableSectionGuard guard(&nonReclaimableSection); + memory::ReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_FALSE(nonReclaimableSection); { - ReclaimableSectionGuard guard(&nonReclaimableSection); + memory::ReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_FALSE(nonReclaimableSection); } ASSERT_FALSE(nonReclaimableSection); { - NonReclaimableSectionGuard guard(&nonReclaimableSection); + memory::NonReclaimableSectionGuard guard(&nonReclaimableSection); ASSERT_TRUE(nonReclaimableSection); } ASSERT_FALSE(nonReclaimableSection);