Skip to content

Commit

Permalink
Corral: add try/finally syntax.
Browse files Browse the repository at this point in the history
Corral's currently advertised approach to asynchronous cancellation:

    AsyncFile f = co_await AsyncFile::open("...");
    co_await anyOf(
        [&]() -> Task<> { /* do stuff with f */ },
        untilCancelledAnd(f.close()));

-- has a downside: upon cancellation from the outside, both children of `anyOf()`
get cancelled simultaneously, which may bring problems if the body of "do stuff"
clause features asynchronous cancellation and requires the file to remain
in scope until its own cancellation completes.

The naïve way of fixing this:

    Event done;
    co_await anyOf([&]() -> Task<> {
            ScopeGuard guard([&done]{ done.trigger(); };
            // do stuff with f
        }, untilCancelledAnd([&]() -> Task<> {
            co_await done;
            co_await f.close();
        });

— will deadlock, since anyOf() will require both children to fully cancel
before it can resume and the parent can proceed with the destruction
of `anyOf` awaitable — which will trigger the destruction of children
and running the scope guard. We therefore need another combiner which would
resemble the try/finally semantics more closely.

This diff adds `corral::detail::TryFinally`, which has a few important
differences from `anyOf(a, untilCancelledAnd(b))`:

  * it sequences cancellation of its children: when cancelled from the outside
    it cancels its try-block coroutine and waits until its cancellation
    completes before starting the finally-block coroutine;

  * it also fully destroys the try-block, including any local variables
    and lambda captures, thereby giving any synchronous scope guard a chance
    to trigger, before starting the finally-block.

Similarly to `CORRAL_WITH_NURSERY()` macro, we can creatively (ab)use
`co_yield` and macros to allow more user-friendly syntax:

    AsyncFile f = co_await AsyncFile::open("...");
    CORRAL_TRY {
        // work with file
    }
    CORRAL_FINALLY {
        co_await f.close();
    };

Similarly to C++ destructors, a finally-block is allowed to throw an exception,
but not if there is already an exception in flight.
  • Loading branch information
dprokoptsev committed Jun 3, 2024
1 parent bbb81f0 commit 8983280
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 39 deletions.
2 changes: 1 addition & 1 deletion corral/Nursery.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ class Nursery::Scope : public detail::NurseryScopeBase,
nursery_.parent_ = h;
Task<detail::NurseryBodyRetval> body = callable_(nursery_);
CORRAL_TRACE(" ... nursery %p starting with task %p", &nursery_,
body.promise_);
body.promise_.get());
return nursery_.addTask(std::move(body), this);
}

Expand Down
36 changes: 9 additions & 27 deletions corral/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
namespace corral {
class Nursery;


/// An async task backed by a C++20 coroutine.
///
/// This type shows up primarily as the return value of async functions.
Expand All @@ -41,43 +42,24 @@ template <class T = void> class [[nodiscard]] Task : public detail::TaskTag {
using ReturnType = T;

Task() = default;
Task(Task<T>&& c) noexcept : promise_(std::exchange(c.promise_, nullptr)) {}
explicit Task(detail::Promise<T>& promise) : promise_(&promise) {}

Task& operator=(Task<T>&& c) noexcept {
if (this != &c) {
destroy();
promise_ = std::exchange(c.promise_, nullptr);
}
return *this;
}

explicit operator bool() const { return promise_ != nullptr; }

~Task() { destroy(); }
explicit operator bool() const { return promise_.get() != nullptr; }

/// co_await'ing on a task starts it and suspends the caller until its
/// completion.
auto operator co_await() { return detail::TaskAwaitable<T>(promise_); }

private:
void destroy() {
auto promise = std::exchange(promise_, nullptr);
if constexpr (std::is_same_v<T, void>) {
if (promise == detail::noopPromise()) {
return;
}
}
if (promise) {
promise->destroy();
}
auto operator co_await() {
return detail::TaskAwaitable<T>(promise_.get());
}

detail::Promise<T>* release() { return std::exchange(promise_, nullptr); }
private:
detail::Promise<T>* release() { return promise_.release(); }

private:
detail::Promise<T>* promise_ = nullptr;
detail::PromisePtr<T> promise_;

friend class Nursery;
template <class, class> friend class detail::TryFinally;
};

namespace detail {
Expand Down
16 changes: 16 additions & 0 deletions corral/detail/Promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,20 @@ inline Promise<void>* noopPromise() {
return &p;
}

struct DestroyPromise {
template <class T> void operator()(Promise<T>* p) const {
if constexpr (std::is_same_v<T, void>) {
if (p == detail::noopPromise()) {
return;
}
}
if (p) {
p->destroy();
}
}
};

template <class T>
using PromisePtr = std::unique_ptr<Promise<T>, DestroyPromise>;

} // namespace corral::detail
2 changes: 2 additions & 0 deletions corral/detail/task_awaitables.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,6 @@ class TaskAwaitable final : public TaskResultStorage<T>,
}
};

template <class, class> class TryFinally;

} // namespace corral::detail
182 changes: 182 additions & 0 deletions corral/detail/wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -887,4 +887,186 @@ class AllOfRange : public MuxRange<AllOfRange<Range>, Range> {
};


template <class BodyLambda, class GuardLambda>
class TryFinally final : private TaskParent<void>, public NurseryScopeBase {
public:
TryFinally(BodyLambda body, GuardLambda guard)
: guardLambda_(std::move(guard)) {
new (&bodyLambda_) BodyLambda(std::move(body));
try {
body_.reset(bodyLambda_().release());
} catch (...) {
exception_ = std::current_exception();
bodyLambda_.~BodyLambda();
}
}

~TryFinally() { CORRAL_ASSERT(!body_); }

void await_set_executor(Executor* ex) noexcept {
executor_ = ex;
if (body_) {
body_->setExecutor(ex);
}
}

bool await_ready() const noexcept { return false; }

bool await_early_cancel() const noexcept {
if (body_) {
body_->cancel();
}
return false;
}

Handle await_suspend(Handle h) {
parent_ = h;
if (body_) {
CORRAL_TRACE(" ... try-finally: pr %p", body_.get());
return body_->start(this, parent_);
} else if (beginGuard()) {
CORRAL_TRACE(" ... try-finally (early-failed): guard %p",
guard_.get());
return guard_->start(this, parent_);
} else {
CORRAL_TRACE(" ... try-finally (done)");
return h;
}
}

bool await_cancel(Handle) noexcept {
if (body_) {
body_->cancel();
} else {
// We're already in guard phase, which must run to completion
}
return false;
}

bool await_must_resume() const noexcept {
return completed_ || exception_ != nullptr;
}

void await_resume() {
if (exception_) {
std::rethrow_exception(exception_);
}
}

void await_introspect(detail::TaskTreeCollector& c) const noexcept {
if (body_) {
body_->await_introspect(c);
} else if (guard_) {
guard_->await_introspect(c);
}
}

private:
void storeSuccess() override {
if (body_) {
completed_ = true;
}
}

void storeException() noexcept override {
if (exception_) {
std::terminate(); // multiple exceptions in flight
} else {
exception_ = std::current_exception();
}
}

bool beginGuard() {
body_.reset();
bodyLambda_.~BodyLambda();

try {
guard_.reset(guardLambda_().release());
guard_->setExecutor(executor_);
return true;
} catch (...) {
exception_ = std::current_exception();
guard_.reset();
return false;
}
}

Handle continuation(BasePromise* p) noexcept override {
if (p == body_.get()) {
if (beginGuard()) {
CORRAL_TRACE("pr %p done, starting guard %p", body_.get(),
guard_.get());
return guard_->start(this, parent_);
} else {
CORRAL_TRACE("pr %p done, guard early-failed", body_.get());
return std::exchange(parent_, noopHandle());
}
} else if (p == guard_.get()) {
CORRAL_TRACE("guard %p done", guard_.get());
return std::exchange(parent_, noopHandle());
} else {
CORRAL_ASSERT(!"unexpected continuation");
return noopHandle();
}
}

private:
PromisePtr<void> body_;
union {
// In scope iff body_ holds a non-null value
[[no_unique_address]] BodyLambda bodyLambda_;
};

PromisePtr<void> guard_;
[[no_unique_address]] GuardLambda guardLambda_;

Handle parent_;
std::exception_ptr exception_;
bool completed_ = false;
Executor* executor_ = nullptr;
};

/// A factory for creating a `try-finally` block through
/// `co_await corral::try_(...).finally(...)` syntax.
template <class BodyLambda> class TryFinallyFactory {
BodyLambda bodyLambda_;

public:
explicit TryFinallyFactory(BodyLambda&& bodyLambda)
: bodyLambda_(std::forward<BodyLambda>(bodyLambda)) {}

template <class GuardLambda>
requires(std::is_invocable_r_v<Task<void>, GuardLambda>)
auto finally(GuardLambda&& guardLambda) {
return TryFinally<BodyLambda, decltype(guardLambda)>(
std::move(bodyLambda_), std::forward<GuardLambda>(guardLambda));
}
};

/// A factory for creating a `try-finally` block through
/// `CORRAL_TRY { ... } CORRAL_FINALLY { ... };` syntax.
class TryFinallyMacroFactory {
template <class BodyLambda> class Body {
BodyLambda bodyLambda_;

public:
Body(BodyLambda bodyLambda) : bodyLambda_(std::move(bodyLambda)) {}

template <class GuardLambda>
requires(std::is_invocable_r_v<Task<void>, GuardLambda>)
auto operator%(GuardLambda&& guardLambda) {
return TryFinally<BodyLambda, decltype(guardLambda)>(
std::move(bodyLambda_),
std::forward<GuardLambda>(guardLambda));
}
};

public:
template <class BodyLambda>
requires(std::is_invocable_r_v<Task<void>, BodyLambda>)
auto operator%(BodyLambda&& bodyLambda) {
return Body<BodyLambda>(std::forward<BodyLambda>(bodyLambda));
}
};

} // namespace corral::detail
31 changes: 31 additions & 0 deletions corral/wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,35 @@ template <AwaitableRange<> Range> auto mostOf(Range&& range) {
return detail::MostOfRange<Range>(std::forward<Range>(range));
}

/// A try/finally block allowing both try and finally blocks to be asynchronous,
/// useful instead of a scope guard if the cleanup code is asynchronous.
///
/// AsyncThing thing = co_await AsyncThing::create();
/// co_await corral::try_([&]() -> corral::Task<> {
/// co_await workWithThing();
/// }).finally([&]() -> corral::Task<> {
/// co_await thing.destroy();
/// });
///
/// Unlike anyOf() etc, the try block is fully destroyed (triggering any scope
/// guards etc) before the finally block begins executing.
template <class TryBlock>
requires(std::is_invocable_r_v<Task<void>, TryBlock>)
auto try_(TryBlock&& tryBlock) {
return detail::TryFinallyFactory(std::forward<TryBlock>(tryBlock));
}

/// Same as above but with a different, slightly more laconic, syntax:
///
/// AsyncThing thing = co_await AsyncThing::create();
/// CORRAL_TRY {
/// co_await workWithThing();
/// } CORRAL_FINALLY {
/// co_await thing.destroy();
/// }; // <-- the semicolon is required here
#define CORRAL_TRY \
co_yield ::corral::detail::TryFinallyMacroFactory{} % [&]() \
-> ::corral::Task<void>
#define CORRAL_FINALLY % [&]() -> ::corral::Task<void>

} // namespace corral
39 changes: 28 additions & 11 deletions doc/01_getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,8 @@ cancelling the `acceptorLoop()` task shown above will also cancel any
In certain cases, resource cleanup needs to be asynchronous as well,
making it infeasible to use RAII. For such cases, corral provides an
`untilCancelledAnd()` wrapper, which starts its argument child task when
the wrapper is cancelled and doesn't confirm the cancellation until
the child completes. Typically it would be used via `anyOf()`:
equivalent of try/finally block which allows both clauses to be
asynchronous:
```cpp
struct AsyncFD {
Expand All @@ -403,9 +402,11 @@ struct AsyncFD {
corral::Task<void> workWithAsyncFD() {
AsyncFD fd = co_await AsyncFD::open("...");
co_await corral::anyOf([&]() -> corral::Task<> {
co_await try_([&]() -> Task<void> {
// do something with fd
}, corral::untilCancelledAnd(fd.close()));
}).finally([&]() -> Task<void> {
co_await fd.close();
});
}
```

Expand All @@ -414,6 +415,21 @@ via exception, or is cancelled from the outside, `fd` will be closed
asynchronously, and the outer task will not get resumed until the close
completes.

Another syntax for the above is available, which creatively (ab)uses
C++ macros:

```cpp
corral::Task<void> workWithAsyncFD() {
AsyncFD fd = co_await AsyncFD::open("...");
CORRAL_TRY {
// do something with fd
}
CORRAL_FINALLY {
co_await fd.close();
}; // <- the trailing semicolon is required here
}
```

Because merely entering or returning from an async function is not a
cancellation point _in itself_, the above code snippet can be
rewritten with more tasks without any changes to semantics
Expand All @@ -423,17 +439,18 @@ and without adding any additional cancellation points:
corral::Task<AsyncFD> openAsyncFD() {
co_return co_await AsyncFD::open("...");
}
corral::Task<void> useAsyncFD(AsyncFD&) { /*...*/ }
corral::Task<void> consumeAsyncFD(AsyncFD fd) {
CORRAL_TRY { /* ... */ }
CORRAL_FINALLY { co_await fd.close(); };
}

corral::Task<void> workWithAsyncFD() {
AsyncFD fd = co_await openAsyncFD();
co_await corral::anyOf(
useAsyncFD(fd),
corral::untilCancelledAnd(fd.close()));
co_await consumeAsyncFD(std::move(fd));
}
```
The task passed to `untilCancelledAnd()` will not itself see the
cancellation that caused it to be started; if it's doing something that
If try/finally block was cancelled from the outside, the finally-clause
will not itself see the cancellation. If it's doing something that
might block indefinitely, it should impose an internal timeout to avoid
deadlocking the program.
Loading

0 comments on commit 8983280

Please sign in to comment.