From 0ca51ec16676b443e272b86f3c048c6a532904f8 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 9 Jul 2024 18:42:47 +0300 Subject: [PATCH] abort_source: subscription: allow calling on_abort explicitly Today, when calling `subscribe` after abort has alredy been requested on an `abort_source` we return a default- initialized subscription that signifies that subscription failed. With this change, `subscribe` would return an unlinked subscription, that still holds the required callback. `bool(subscription)` stil elaluates to `false` in the same way, since the subscription is unlinked, but, it supports `on_abort`, that calls the subscribed callback, at a later time, the same way as if abort is requested at a later time. Thise allows the users of this interface to implement a unified abort path that can either be called by the abort_source, if subscribe happened before abort was requested, or called by the user `on_abort` call, if abort weas never requested, or if it was requested before `subscribe` was called. Signed-off-by: Benny Halevy --- include/seastar/core/abort_source.hh | 18 ++++--- tests/unit/abort_source_test.cc | 70 ++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/include/seastar/core/abort_source.hh b/include/seastar/core/abort_source.hh index b0c34f946fe..28f7fcead1d 100644 --- a/include/seastar/core/abort_source.hh +++ b/include/seastar/core/abort_source.hh @@ -66,21 +66,31 @@ public: class subscription : public bi::list_base_hook> { friend class abort_source; - subscription_callback_type _target; + subscription_callback_type _target = noop_handler; explicit subscription(abort_source& as, subscription_callback_type target) : _target(std::move(target)) { + if (!as.abort_requested()) { as._subscriptions.push_back(*this); + } } struct naive_cb_tag {}; // to disambiguate constructors explicit subscription(naive_cb_tag, abort_source& as, naive_subscription_callback_type naive_cb) : _target([cb = std::move(naive_cb)] (const std::optional&) noexcept { cb(); }) { + if (!as.abort_requested()) { as._subscriptions.push_back(*this); + } } + public: + static void noop_handler(const std::optional&) noexcept {} + + /// the subscribed callback is called at most once void on_abort(const std::optional& ex) noexcept { - _target(ex); + unlink(); + auto target = std::exchange(_target, noop_handler); + target(ex); } public: @@ -119,7 +129,6 @@ private: auto subs = std::move(_subscriptions); while (!subs.empty()) { subscription& s = subs.front(); - s.unlink(); s.on_abort(ex); } } @@ -140,9 +149,6 @@ public: std::is_nothrow_invocable_r_v) [[nodiscard]] optimized_optional subscribe(Func&& f) { - if (abort_requested()) { - return { }; - } if constexpr (std::is_invocable_v) { return { subscription(*this, std::forward(f)) }; } else { diff --git a/tests/unit/abort_source_test.cc b/tests/unit/abort_source_test.cc index fae40e904cc..fb3ba1573e0 100644 --- a/tests/unit/abort_source_test.cc +++ b/tests/unit/abort_source_test.cc @@ -195,3 +195,73 @@ SEASTAR_THREAD_TEST_CASE(test_request_abort_twice) { as.request_abort(); BOOST_REQUIRE_THROW(as.check(), std::runtime_error); } + +SEASTAR_THREAD_TEST_CASE(test_on_abort_call_after_abort) { + std::exception_ptr signalled_ex; + auto as = abort_source(); + auto sub = as.subscribe([&] (const std::optional& ex) noexcept { + BOOST_REQUIRE(!signalled_ex); + signalled_ex = *ex; + }); + BOOST_REQUIRE_EQUAL(bool(sub), true); + BOOST_REQUIRE(signalled_ex == nullptr); + + // on_abort should trigger the subscribed callback + as.request_abort_ex(std::make_exception_ptr(std::runtime_error("signaled"))); + BOOST_REQUIRE_EQUAL(bool(sub), false); + BOOST_REQUIRE(signalled_ex != nullptr); + BOOST_REQUIRE_THROW(std::rethrow_exception(signalled_ex), std::runtime_error); + + // on_abort is single-shot + signalled_ex = nullptr; + sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled"))); + BOOST_REQUIRE(signalled_ex == nullptr); +} + +SEASTAR_THREAD_TEST_CASE(test_on_abort_call_before_abort) { + std::exception_ptr signalled_ex; + auto as = abort_source(); + auto sub = as.subscribe([&] (const std::optional& ex) noexcept { + BOOST_REQUIRE(!signalled_ex); + signalled_ex = *ex; + }); + BOOST_REQUIRE_EQUAL(bool(sub), true); + BOOST_REQUIRE(signalled_ex == nullptr); + + // on_abort should trigger the subscribed callback + sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled"))); + BOOST_REQUIRE_EQUAL(bool(sub), false); + BOOST_REQUIRE(signalled_ex != nullptr); + BOOST_REQUIRE_THROW(std::rethrow_exception(signalled_ex), std::runtime_error); + + // subscription is single-shot + signalled_ex = nullptr; + as.request_abort_ex(std::make_exception_ptr(std::runtime_error("signaled"))); + BOOST_REQUIRE(signalled_ex == nullptr); +} + +SEASTAR_THREAD_TEST_CASE(test_subscribe_aborted_source) { + std::exception_ptr signalled_ex; + auto as = abort_source(); + as.request_abort(); + auto sub = as.subscribe([&] (const std::optional& ex) noexcept { + BOOST_REQUIRE(!signalled_ex); + signalled_ex = *ex; + }); + + // subscription is expected to evaluate to false + // if abort_source was already aborted + BOOST_REQUIRE_EQUAL(bool(sub), false); + BOOST_REQUIRE(signalled_ex == nullptr); + + // on_abort should trigger the subscribed callback + // if abort_source was already aborted + sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled"))); + BOOST_REQUIRE(signalled_ex != nullptr); + BOOST_REQUIRE_THROW(std::rethrow_exception(signalled_ex), std::runtime_error); + + // on_abort is single-shot + signalled_ex = nullptr; + sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled"))); + BOOST_REQUIRE(signalled_ex == nullptr); +}