diff --git a/velox/common/memory/ArbitrationParticipant.cpp b/velox/common/memory/ArbitrationParticipant.cpp index 0cd3be08f465..cdb1576bf903 100644 --- a/velox/common/memory/ArbitrationParticipant.cpp +++ b/velox/common/memory/ArbitrationParticipant.cpp @@ -267,7 +267,7 @@ uint64_t ArbitrationParticipant::reclaim( if (targetBytes == 0) { return 0; } - ArbitrationOperationTimedLock l(reclaimMutex_); + ArbitrationTimedLock l(reclaimMutex_, maxWaitTimeNs); TestValue::adjust( "facebook::velox::memory::ArbitrationParticipant::reclaim", this); uint64_t reclaimedBytes{0}; @@ -320,7 +320,7 @@ uint64_t ArbitrationParticipant::shrinkLocked(bool reclaimAll) { uint64_t ArbitrationParticipant::abort( const std::exception_ptr& error) noexcept { - ArbitrationOperationTimedLock l(reclaimMutex_); + std::lock_guard l(reclaimMutex_); return abortLocked(error); } @@ -403,52 +403,29 @@ std::string ArbitrationCandidate::toString() const { } #ifdef TSAN_BUILD -ArbitrationOperationTimedLock::ArbitrationOperationTimedLock( - std::timed_mutex& mutex) +ArbitrationTimedLock::ArbitrationTimedLock( + std::timed_mutex& mutex, + uint64_t /* unused */) : mutex_(mutex) { mutex_.lock(); } -ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() { +ArbitrationTimedLock::~ArbitrationTimedLock() { mutex_.unlock(); } #else -ArbitrationOperationTimedLock::ArbitrationOperationTimedLock( - std::timed_mutex& mutex) { - auto arbitrationContext = memoryArbitrationContext(); - if (arbitrationContext == nullptr) { - std::unique_lock l(mutex); - timedLock_ = std::move(l); - return; - } - auto* operation = arbitrationContext->op; - if (operation == nullptr) { - VELOX_CHECK_EQ( - MemoryArbitrationContext::typeName(arbitrationContext->type), - MemoryArbitrationContext::typeName( - MemoryArbitrationContext::Type::kGlobal)); - std::unique_lock l(mutex); - timedLock_ = std::move(l); - return; - } - VELOX_CHECK_EQ( - MemoryArbitrationContext::typeName(arbitrationContext->type), - MemoryArbitrationContext::typeName( - MemoryArbitrationContext::Type::kLocal)); - std::unique_lock l( - mutex, std::chrono::nanoseconds(operation->timeoutNs())); - timedLock_ = std::move(l); - if (!timedLock_.owns_lock()) { +ArbitrationTimedLock::ArbitrationTimedLock( + std::timed_mutex& mutex, + uint64_t timeoutNs) + : mutex_(mutex) { + if (!mutex_.try_lock_for(std::chrono::nanoseconds(timeoutNs))) { VELOX_MEM_ARBITRATION_TIMEOUT(fmt::format( - "Memory arbitration lock timed out on memory pool: {} after running {}", - operation->participant()->name(), - succinctNanos(operation->executionTimeNs()))); + "Memory arbitration lock timed out when reclaiming from arbitration participant.")); } } -ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() { - VELOX_CHECK(timedLock_.owns_lock()); - timedLock_.unlock(); +ArbitrationTimedLock::~ArbitrationTimedLock() { + mutex_.unlock(); } #endif } // namespace facebook::velox::memory diff --git a/velox/common/memory/ArbitrationParticipant.h b/velox/common/memory/ArbitrationParticipant.h index 1a4a1e1f29fc..6fa123cdf0ae 100644 --- a/velox/common/memory/ArbitrationParticipant.h +++ b/velox/common/memory/ArbitrationParticipant.h @@ -49,26 +49,16 @@ class ScopedArbitrationParticipant; /// automatically be applied. /// /// NOTE: TSAN is incompatible with std::timed_mutex when used with timeout. So -/// in TSAN build a trivial implementation is implemented. -#ifdef TSAN_BUILD -class ArbitrationOperationTimedLock { +/// in TSAN build a trivial lock is implemented. +class ArbitrationTimedLock { public: - explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex); - ~ArbitrationOperationTimedLock(); + ArbitrationTimedLock(std::timed_mutex& mutex, uint64_t timeoutNs); + ~ArbitrationTimedLock(); private: std::timed_mutex& mutex_; }; -#else -class ArbitrationOperationTimedLock { - public: - explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex); - ~ArbitrationOperationTimedLock(); - private: - std::unique_lock timedLock_; -}; -#endif /// Manages the memory arbitration operations on a query memory pool. It also /// tracks the arbitration stats during the query memory pool's lifecycle. class ArbitrationParticipant diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 90832ed22b7c..eac6d149f485 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -449,12 +449,8 @@ bool MemoryArbitrator::Stats::operator<=(const Stats& other) const { return !(*this > other); } -MemoryArbitrationContext::MemoryArbitrationContext( - const MemoryPool* requestor, - ArbitrationOperation* _op) - : type(Type::kLocal), requestorName(requestor->name()), op(_op) { - VELOX_CHECK_NOT_NULL(op); -} +MemoryArbitrationContext::MemoryArbitrationContext(const MemoryPool* requestor) + : type(Type::kLocal), requestorName(requestor->name()) {} std::string MemoryArbitrationContext::typeName( MemoryArbitrationContext::Type type) { @@ -469,10 +465,8 @@ std::string MemoryArbitrationContext::typeName( } ScopedMemoryArbitrationContext::ScopedMemoryArbitrationContext( - const MemoryPool* requestor, - ArbitrationOperation* op) - : savedArbitrationCtx_(arbitrationCtx), - currentArbitrationCtx_(requestor, op) { + const MemoryPool* requestor) + : savedArbitrationCtx_(arbitrationCtx), currentArbitrationCtx_(requestor) { arbitrationCtx = ¤tArbitrationCtx_; } diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 0120c6b91a0d..969fbb0fa5a9 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -421,13 +421,9 @@ struct MemoryArbitrationContext { /// global memory arbitration type. const std::string requestorName; - ArbitrationOperation* const op; + explicit MemoryArbitrationContext(const MemoryPool* requestor); - MemoryArbitrationContext( - const MemoryPool* requestor, - ArbitrationOperation* _op); - - MemoryArbitrationContext() : type(Type::kGlobal), op(nullptr) {} + MemoryArbitrationContext() : type(Type::kGlobal) {} }; /// Object used to set/restore the memory arbitration context when a thread is @@ -439,9 +435,7 @@ class ScopedMemoryArbitrationContext { explicit ScopedMemoryArbitrationContext( const MemoryArbitrationContext* context); - ScopedMemoryArbitrationContext( - const MemoryPool* requestor, - ArbitrationOperation* op); + explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor); ~ScopedMemoryArbitrationContext(); diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 4c8f1426e984..4f7224866464 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -1341,7 +1341,7 @@ SharedArbitrator::ScopedArbitration::ScopedArbitration( ArbitrationOperation* operation) : arbitrator_(arbitrator), operation_(operation), - arbitrationCtx_(operation->participant()->pool(), operation), + arbitrationCtx_(operation->participant()->pool()), startTime_(std::chrono::steady_clock::now()) { VELOX_CHECK_NOT_NULL(arbitrator_); VELOX_CHECK_NOT_NULL(operation_); diff --git a/velox/common/memory/tests/ArbitrationParticipantTest.cpp b/velox/common/memory/tests/ArbitrationParticipantTest.cpp index 376a31c7effc..5bb23d2c3729 100644 --- a/velox/common/memory/tests/ArbitrationParticipantTest.cpp +++ b/velox/common/memory/tests/ArbitrationParticipantTest.cpp @@ -1419,7 +1419,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, reclaimLock) { folly::EventCount reclaim1CompletedWait; std::thread reclaimThread1([&]() { memory::MemoryReclaimer::Stats stats; - ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000, stats), 0); + ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000'000'000, stats), 0); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); reclaim1CompletedFlag = true; reclaim1CompletedWait.notifyAll(); @@ -1454,7 +1454,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, reclaimLock) { folly::EventCount reclaim2CompletedWait; std::thread reclaimThread2([&]() { memory::MemoryReclaimer::Stats stats; - ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000, stats), 0); + ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000'000'000, stats), 0); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); reclaim2CompletedFlag = true; reclaim2CompletedWait.notifyAll(); @@ -1896,65 +1896,31 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperationTimedLock) { }; struct TestData { - std::string type; uint64_t lockHoldTimeNs; uint64_t opTimeoutNs; }; std::timed_mutex mutex; std::vector testDataVec{ - {"local", 1'000'000'000UL, 2'000'000'000UL}, - {"local", 2'000'000'000UL, 1'000'000'000UL}, - {"global", 1'000'000'000UL, 2'000'000'000UL}, - {"global", 2'000'000'000UL, 1'000'000'000UL}, - {"none", 1'000'000'000UL, 2'000'000'000UL}}; + {1'000'000'000UL, 2'000'000'000UL}, {2'000'000'000UL, 1'000'000'000UL}}; for (auto& testData : testDataVec) { - ScopedArbitrationParticipant scopedArbitrationParticipant( - participant, participantPool); - ArbitrationOperation operation( - std::move(scopedArbitrationParticipant), 1024, testData.opTimeoutNs); - if (testData.type == "local") { - MemoryArbitrationContext ctx(participantPool.get(), &operation); - ScopedMemoryArbitrationContext scopedCtx(&ctx); - - folly::EventCount lockWait; - std::atomic_bool lockWaitFlag{true}; - auto lockHolder = createLockHolderThread( - mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag); - std::unique_ptr timedLock{nullptr}; - lockWait.await([&]() { return !lockWaitFlag.load(); }); - if (testData.lockHoldTimeNs < testData.opTimeoutNs) { - timedLock = std::make_unique(mutex); - ASSERT_FALSE(mutex.try_lock()); - } else { - VELOX_ASSERT_THROW( - std::make_unique(mutex), - "Memory arbitration lock timed out"); - } - lockHolder.join(); - } else if (testData.type == "global") { - MemoryArbitrationContext ctx; - ScopedMemoryArbitrationContext scopedCtx(&ctx); - - folly::EventCount lockWait; - std::atomic_bool lockWaitFlag{true}; - auto lockHolder = createLockHolderThread( - mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag); - lockWait.await([&]() { return !lockWaitFlag.load(); }); - ArbitrationOperationTimedLock timedLock(mutex); + folly::EventCount lockWait; + std::atomic_bool lockWaitFlag{true}; + auto lockHolder = createLockHolderThread( + mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag); + std::unique_ptr timedLock{nullptr}; + lockWait.await([&]() { return !lockWaitFlag.load(); }); + if (testData.lockHoldTimeNs < testData.opTimeoutNs) { + timedLock = + std::make_unique(mutex, testData.opTimeoutNs); ASSERT_FALSE(mutex.try_lock()); - lockHolder.join(); } else { - folly::EventCount lockWait; - std::atomic_bool lockWaitFlag{true}; - auto lockHolder = createLockHolderThread( - mutex, testData.lockHoldTimeNs, lockWait, lockWaitFlag); - lockWait.await([&]() { return !lockWaitFlag.load(); }); - ArbitrationOperationTimedLock timedLock(mutex); - ASSERT_FALSE(mutex.try_lock()); - lockHolder.join(); + VELOX_ASSERT_THROW( + std::make_unique(mutex, testData.opTimeoutNs), + "Memory arbitration lock timed out"); } + lockHolder.join(); } } #endif diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 881cff43b843..1befc83a1a8f 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -990,19 +990,13 @@ TEST_F(MemoryReclaimerTest, arbitrationContext) { ASSERT_FALSE(isSpillMemoryPool(leafChild2.get())); ASSERT_TRUE(memoryArbitrationContext() == nullptr); { - auto arbitrationStructs = - test::ArbitrationTestStructs::createArbitrationTestStructs(leafChild1); - ScopedMemoryArbitrationContext arbitrationContext( - leafChild1.get(), arbitrationStructs.operation.get()); + ScopedMemoryArbitrationContext arbitrationContext(leafChild1.get()); ASSERT_TRUE(memoryArbitrationContext() != nullptr); ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild1->name()); } ASSERT_TRUE(memoryArbitrationContext() == nullptr); { - auto arbitrationStructs = - test::ArbitrationTestStructs::createArbitrationTestStructs(leafChild2); - ScopedMemoryArbitrationContext arbitrationContext( - leafChild2.get(), arbitrationStructs.operation.get()); + ScopedMemoryArbitrationContext arbitrationContext(leafChild2.get()); ASSERT_TRUE(memoryArbitrationContext() != nullptr); ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild2->name()); } @@ -1010,21 +1004,13 @@ TEST_F(MemoryReclaimerTest, arbitrationContext) { std::thread nonAbitrationThread([&]() { ASSERT_TRUE(memoryArbitrationContext() == nullptr); { - auto arbitrationStructs = - test::ArbitrationTestStructs::createArbitrationTestStructs( - leafChild1); - ScopedMemoryArbitrationContext arbitrationContext( - leafChild1.get(), arbitrationStructs.operation.get()); + ScopedMemoryArbitrationContext arbitrationContext(leafChild1.get()); ASSERT_TRUE(memoryArbitrationContext() != nullptr); ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild1->name()); } ASSERT_TRUE(memoryArbitrationContext() == nullptr); { - auto arbitrationStructs = - test::ArbitrationTestStructs::createArbitrationTestStructs( - leafChild2); - ScopedMemoryArbitrationContext arbitrationContext( - leafChild2.get(), arbitrationStructs.operation.get()); + ScopedMemoryArbitrationContext arbitrationContext(leafChild2.get()); ASSERT_TRUE(memoryArbitrationContext() != nullptr); ASSERT_EQ(memoryArbitrationContext()->requestorName, leafChild2->name()); } diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index fa0d94f92239..28e7732624a4 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -3888,10 +3888,7 @@ TEST_P(MemoryPoolTest, overuseUnderArbitration) { ASSERT_FALSE(child->maybeReserve(2 * kMaxSize)); ASSERT_EQ(child->usedBytes(), 0); ASSERT_EQ(child->reservedBytes(), 0); - auto arbitrationTestStructs = - test::ArbitrationTestStructs::createArbitrationTestStructs(root); - ScopedMemoryArbitrationContext scopedMemoryArbitration( - root.get(), arbitrationTestStructs.operation.get()); + ScopedMemoryArbitrationContext scopedMemoryArbitration(root.get()); ASSERT_TRUE(underMemoryArbitration()); ASSERT_TRUE(child->maybeReserve(2 * kMaxSize)); ASSERT_EQ(child->usedBytes(), 0); diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 37de542b675d..2b34f2f81ea7 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -1735,11 +1735,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) { const auto oldReservedBytes = writerPool->reservedBytes(); const auto oldUsedBytes = writerPool->usedBytes(); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - writerPool); - memory::ScopedMemoryArbitrationContext arbitrationCtx( - writerPool.get(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get()); writerPool->reclaim(1L << 30, 0, stats); } ASSERT_EQ(stats.numNonReclaimableAttempts, 0); @@ -1778,11 +1774,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) { writer->testingNonReclaimableSection() = false; stats.numNonReclaimableAttempts = 0; { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - writerPool); - memory::ScopedMemoryArbitrationContext arbitrationCtx( - writerPool.get(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get()); const auto reclaimedBytes = writerPool->reclaim(1L << 30, 0, stats); ASSERT_GT(reclaimedBytes, 0); } @@ -2124,11 +2116,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimThreshold) { *writerPool, reclaimableBytes)); ASSERT_GT(reclaimableBytes, 0); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - writerPool); - memory::ScopedMemoryArbitrationContext arbitrationCtx( - writerPool.get(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get()); ASSERT_GT(writerPool->reclaim(1L << 30, 0, stats), 0); } ASSERT_GT(stats.reclaimExecTimeUs, 0); @@ -2138,11 +2126,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimThreshold) { *writerPool, reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - writerPool); - memory::ScopedMemoryArbitrationContext arbitrationCtx( - writerPool.get(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext arbitrationCtx(writerPool.get()); ASSERT_EQ(writerPool->reclaim(1L << 30, 0, stats), 0); } ASSERT_EQ(stats.numNonReclaimableAttempts, 0); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 3f7e65995236..6f55a5876fdd 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -2168,11 +2168,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { if (testData.expectedReclaimable) { { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), 0, @@ -2185,11 +2181,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { ASSERT_EQ(op->pool()->usedBytes(), 0); } else { { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); VELOX_ASSERT_THROW( op->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), @@ -2305,11 +2297,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { const auto usedMemory = op->pool()->usedBytes(); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), 0, @@ -2554,11 +2542,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemory = op->pool()->usedBytes(); - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), 0, @@ -2570,11 +2554,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { reclaimerStats_.reset(); } else { ASSERT_EQ(reclaimableBytes, 0); - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); VELOX_ASSERT_THROW( op->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), @@ -3197,10 +3177,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); - auto arbitrationStructs = memory::test::ArbitrationTestStructs:: - createArbitrationTestStructs(op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index cabe33d1fa87..9d20e90cc3a3 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5784,11 +5784,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { if (testData.expectedReclaimable) { { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), 0, @@ -5928,11 +5924,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { ASSERT_GT(reclaimableBytes, 0); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), 0, @@ -6187,11 +6179,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemoryBytes = op->pool()->usedBytes(); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), 0, @@ -6272,11 +6260,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { } auto* driver = op->testingOperatorCtx()->driver(); auto task = driver->task(); - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); SuspendedSection suspendedSection(driver); auto taskPauseWait = task->requestPause(); taskPauseWait.wait(); @@ -6343,11 +6327,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { const auto usedMemoryBytes = op->pool()->usedBytes(); reclaimerStats_.reset(); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), 0, diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 9880f3102f64..19bed7fe8ef6 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -265,11 +265,7 @@ TEST_F(MemoryReclaimerTest, parallelMemoryReclaimer) { static_cast(leafPools.back()->reclaimer())); } - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - rootPool); - memory::ScopedMemoryArbitrationContext context( - rootPool.get(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext context(rootPool.get()); memory::MemoryReclaimer::Stats stats; rootPool->reclaim(testData.bytesToReclaim, 0, stats); for (int i = 0; i < memoryReclaimers.size(); ++i) { diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index a193f9df51bc..76800d50ccf3 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -652,11 +652,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { if (testData.expectedReclaimable) { { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), 0, @@ -782,11 +778,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { ASSERT_GT(reclaimableBytes, 0); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), 0, @@ -1037,11 +1029,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { ASSERT_GT(reclaimableBytes, 0); reclaimerStats_.reset(); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim(reclaimableBytes, 0, reclaimerStats_); } ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); @@ -1050,11 +1038,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { } else { ASSERT_EQ(reclaimableBytes, 0); { - auto arbitrationStructs = - memory::test::ArbitrationTestStructs::createArbitrationTestStructs( - op->pool()->shared_from_this()); - memory::ScopedMemoryArbitrationContext ctx( - op->pool(), arbitrationStructs.operation.get()); + memory::ScopedMemoryArbitrationContext ctx(op->pool()); VELOX_ASSERT_THROW( op->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),