Skip to content

Commit

Permalink
Allow awaitable chaining.
Browse files Browse the repository at this point in the history
Corral best practices advocate for using coroutines for encapsulating
asynchronous control flow, but each invocation of a coroutine requires a frame
allocation on heap, which introduces some overhead. In certain, most trivial,
circumstances one may want to save on such an allocation.

Some most trivial use cases can be optimized by having a non-coroutine function
returning `Task<>` which may do some synchronous stuff and then delegate
to a coroutine. This technique cannot be used, however, if one needs
to transform a value returned by an awaitable, or for only slightly less
trivial use cases "run these two awaitables in this order".

For example, in this example:

   Task<int> fourty_two() {
        // ...complex asynchronous calculations...
        co_return 42;
    }
    Task<int> fourty_three() {
         int ret = co_await fourty_two();
         co_return ret + 1;
     }
-- the latter coroutine only exists to arrange a trivial synchronous operation
to be performed after complex work has been done, and it comes with its own
frame allocation.

This diff adds a new combiner, `corral::then()`, which can be used to pack
a sequence of awaitables into one awaitable. This would allow rewriting
the above example as

    Awaitable<int> auto fourty_three() {
        return fourty_two() | then([](int x) { return just(x + 1); });
    }

Note that argument function to `then()` is supposed to return an awaitable,
so `just()` (or `noop()`) is required if the lambda only does synchronous
transformations, like in the example above.

If the first operation returns a value, its lifetime is extended until
the second awaitable completes, allowing writing something like this:

    Semaphore sem;
    Awaitable<void> auto reallyDoStuff();
    Awaitable<void> auto doStuff() {
        return sem.lock() | then(&reallyDoStuff);
    }

In this example semaphore lock will be held while `reallyDoStuff` runs,
and released afterwards.

`async::noop()` and `async::just()` have been rewritten to return (trivial)
awaitables, which can be casted to `Task<>` if necessary;
so `Awaitable<int> auto giveMeInt() { return just(42); }` won't cause
any heap allocations.

Any more complex control flow primitives -- conditions and loops -- have not
been supported and are unlikely to be going to, since that quickly makes code
unreadable. Using coroutines is still recommended for such use cases,
as resulting code is typically much easier to read.
  • Loading branch information
dprokoptsev committed Sep 16, 2024
1 parent 77647fa commit 1cbda08
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 15 deletions.
42 changes: 38 additions & 4 deletions corral/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,40 @@ namespace detail {
template <class T> Task<T> Promise<T>::get_return_object() {
return Task<T>(*this);
}

/// A non-cancellable awaitable which is immediately ready, producing a
/// value of type T. It can also be implicitly converted to a Task<T>.
template <class T> class ReadyAwaitable {
public:
explicit ReadyAwaitable(T&& value) : value_(std::forward<T>(value)) {}

bool await_early_cancel() const noexcept { return false; }
bool await_ready() const noexcept { return true; }
bool await_suspend(Handle) { return false; }
bool await_cancel(Handle) const { return false; }
bool await_must_resume() const noexcept { return true; }
T await_resume() && { return std::forward<T>(value_); }

template <std::constructible_from<T> U> operator Task<U>() && {
return Task<U>(*new StubPromise<U>(std::forward<T>(value_)));
}

private:
T value_;
};

template <> class ReadyAwaitable<void> {
public:
bool await_early_cancel() const noexcept { return false; }
bool await_ready() const noexcept { return true; }
bool await_suspend(Handle) { return false; }
bool await_cancel(Handle) const { return false; }
bool await_must_resume() const noexcept { return true; }
void await_resume() && {}

operator Task<void>() { return Task<void>(StubPromise<void>::instance()); }
};

} // namespace detail

/// A no-op task. Always await_ready(), and co_await'ing on it is a no-op
Expand All @@ -80,13 +114,13 @@ template <class T> Task<T> Promise<T>::get_return_object() {
/// };
///
/// saving on coroutine frame allocation (compared to `{ co_return; }`).
inline Task<void> noop() {
return Task<void>(detail::StubPromise<void>::instance());
inline Awaitable<void> auto noop() {
return detail::ReadyAwaitable<void>();
}

/// Create a task that immediately returns a given value when co_await'ed.
template <class T> Task<T> just(T value) {
return Task<T>(*new detail::StubPromise<T>(std::forward<T>(value)));
template <class T> Awaitable<T> auto just(T value) {
return detail::ReadyAwaitable<T>(std::forward<T>(value));
}

} // namespace corral
8 changes: 5 additions & 3 deletions corral/concepts.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ concept ImmediateAwaitable = requires(T t, const T ct, Handle h) {
template <class T, class Ret = detail::Unspecified>
concept Awaitable =
detail::ImmediateAwaitable<T, Ret>
|| requires(T t) { { t.operator co_await() } -> detail::ImmediateAwaitable<Ret>; }
|| requires(T t) { { operator co_await(t) } -> detail::ImmediateAwaitable<Ret>; }
|| detail::ThisIsAwaitableTrustMe<T, Ret>;
|| requires(T t) {
{ std::forward<T>(t).operator co_await() } -> detail::ImmediateAwaitable<Ret>;
} || requires(T t) {
{ operator co_await(std::forward<T>(t)) } -> detail::ImmediateAwaitable<Ret>;
} || detail::ThisIsAwaitableTrustMe<T, Ret>;

template <class R, class Ret = detail::Unspecified>
concept AwaitableRange = requires(R r) {
Expand Down
258 changes: 258 additions & 0 deletions corral/detail/Sequence.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// This file is part of corral, a lightweight C++20 coroutine library.
//
// Copyright (c) 2024 Hudson River Trading LLC <[email protected]>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//
// SPDX-License-Identifier: MIT

#pragma once

#include "../concepts.h"
#include "../config.h"
#include "frames.h"
#include "utility.h"

namespace corral::detail {

template <Awaitable First, class ThenFn> class Sequence : private ProxyFrame {
static decltype(auto) getSecond(ThenFn& fn,
AwaitableReturnType<First>& first) {
if constexpr (requires { fn(std::move(first)); }) {
return fn(std::move(first));
} else if constexpr (requires { fn(first); }) {
return fn(first);
} else {
return fn();
}
}

using Second =
decltype(getSecond(std::declval<ThenFn&>(),
std::declval<AwaitableReturnType<First>&>()));

public:
Sequence(First first, ThenFn thenFn)
: first_(std::move(first)),
firstAw_(getAwaitable(std::forward<First>(first_))),
thenFn_(std::move(thenFn)) {}

Sequence(Sequence&&) = default;

bool await_ready() const noexcept { return false; }

void await_set_executor(Executor* e) noexcept {
second_ = e;
firstAw_.await_set_executor(e);
}

auto await_early_cancel() noexcept {
cancelling_ = true;
return firstAw_.await_early_cancel();
}

void await_suspend(Handle h) {
CORRAL_TRACE(" ...sequence %p yielding to...", this);
parent_ = h;
if (firstAw_.await_ready()) {
kickOffSecond();
} else {
this->resumeFn = +[](CoroutineFrame* frame) {
auto* self = static_cast<Sequence*>(frame);
self->kickOffSecond();
};
firstAw_.await_suspend(this->toHandle()).resume();
}
}

bool await_cancel(Handle h) noexcept {
CORRAL_TRACE("sequence %p (%s stage) cancellation requested", this,
inFirstStage() ? "first" : "second");
cancelling_ = true;
if (inFirstStage()) {
return firstAw_.await_cancel(this->toHandle());
} else if (inSecondStage()) {
return second().aw.await_cancel(h);
} else {
return false; // will carry out cancellation later
}
}

bool await_must_resume() const noexcept {
// Note that await_must_resume() is called by our parent when we
// resume them after a cancellation that did not complete synchronously.
// To understand the logic in this method, consider all the places
// where we call parent_.resume(). In particular, if we're still
// inFirstStage(), we must have hit the cancellation check at the
// beginning of kickOffSecond(), which means we've already verified
// that the first stage await_must_resume() returned false, and we
// should return false here without consulting the awaitable further.
// Similarly, if we're in neither the first nor the second stage,
// the second stage must have completed via early cancellation.
bool ret = std::holds_alternative<std::exception_ptr>(second_) ||
(inSecondStage() && second().aw.await_must_resume());
if (!ret && inSecondStage()) {
// Destroy the second stage, which will release any resources
// it might have held
second_.template emplace<std::monostate>();
}
return ret;
}

decltype(auto) await_resume() {
ScopeGuard guard([this] {
// Destroy the second stage and the return value of the first stage
second_.template emplace<std::monostate>();
});

if (auto ex = std::get_if<std::exception_ptr>(&second_)) {
std::rethrow_exception(*ex);
} else {
return second().aw.await_resume();
}
}

void await_introspect(auto& c) const noexcept {
if (inFirstStage()) {
firstAw_.await_introspect(c);
} else if (inSecondStage()) {
second().aw.await_introspect(c);
} else {
c.node("sequence (degenerate)");
}
}

private:
// Explicitly provide a template argument, so immediate awaitables
// would resolve to Second&& instead of Second.
// For the same reason, don't use AwaitableType<> here.
using SecondAwaitable =
decltype(getAwaitable<Second&&>(std::declval<Second>()));

struct SecondStage {
[[no_unique_address]] AwaitableReturnType<First> firstValue;
[[no_unique_address]] Second obj;
[[no_unique_address]] AwaitableAdapter<SecondAwaitable> aw;

explicit SecondStage(Sequence* c)
: firstValue(std::move(c->firstAw_).await_resume()),
obj(getSecond(c->thenFn_, firstValue)),
aw(getAwaitable<Second&&>(std::forward<Second>(obj))) {}
};

bool inFirstStage() const noexcept {
return std::holds_alternative<Executor*>(second_);
}
bool inSecondStage() const noexcept {
return std::holds_alternative<SecondStage>(second_);
}

SecondStage& second() noexcept { return std::get<SecondStage>(second_); }
const SecondStage& second() const noexcept {
return std::get<SecondStage>(second_);
}

void kickOffSecond() noexcept {
if (cancelling_ && !firstAw_.await_must_resume()) {
CORRAL_TRACE("sequence %p (cancelling) first stage completed, "
"confirming cancellation",
this);
parent_.resume();
return;
}

CORRAL_TRACE("sequence %p%s first stage completed, continuing with...",
this, cancelling_ ? " (cancelling)" : "");
CORRAL_ASSERT(inFirstStage());
Executor* ex = std::get<Executor*>(second_);

// Mark first stage as completed
// (this is necessary if thenFn_() attempts to cancel us)
second_.template emplace<std::monostate>();

try {
second_.template emplace<SecondStage>(this);
} catch (...) {
second_.template emplace<std::exception_ptr>(
std::current_exception());
parent_.resume();
return;
}

if (cancelling_) {
if (second().aw.await_early_cancel()) {
second_.template emplace<std::monostate>();

parent_.resume();
return;
}
}

if (second().aw.await_ready()) {
parent_.resume();
} else {
second().aw.await_set_executor(ex);
second().aw.await_suspend(parent_).resume();
}
}

private:
Handle parent_;
[[no_unique_address]] First first_;
[[no_unique_address]] AwaitableAdapter<AwaitableType<First>> firstAw_;

[[no_unique_address]] ThenFn thenFn_;
mutable std::variant<Executor*, // running first stage
SecondStage, // running second stage,
std::monostate, // running neither (either constructing
// second stage, or it confirmed early
// cancellation)
std::exception_ptr> // first stage threw an exception
second_;
bool cancelling_ = false;
};


template <class ThenFn> class SequenceBuilder {
public:
explicit SequenceBuilder(ThenFn fn) : fn_(std::move(fn)) {}

template <Awaitable First>
requires(std::invocable<ThenFn, AwaitableReturnType<First>&> ||
std::invocable<ThenFn, AwaitableReturnType<First> &&> ||
std::invocable<ThenFn>)
friend auto operator|(First&& first, SequenceBuilder&& builder) {
return Sequence(std::forward<First>(first), std::move(builder.fn_));
}

// Allow right associativity of SequenceBuilder's
template <class ThirdFn>
auto operator|(SequenceBuilder<ThirdFn>&& next) && {
return corral::detail::SequenceBuilder(
[fn = std::move(fn_),
next = std::move(next)]<class T>(T&& value) mutable {
return fn(std::forward<T>(value)) | std::move(next);
});
}

private:
ThenFn fn_;
};

} // namespace corral::detail
2 changes: 1 addition & 1 deletion corral/detail/task_awaitables.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ template <class T, class Self> class TaskAwaitableBase {

bool await_ready() const noexcept {
return promise_->checkImmediateResult(
const_cast<Self*>(static_cast<const Self*>(this)));
const_cast<Self*>(static_cast<const Self*>(this)));
}

Handle await_suspend(Handle h) {
Expand Down
8 changes: 3 additions & 5 deletions corral/detail/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ struct TaskTag {};

/// Like std::conditional, but for templates.
template <bool If,
template <class...>
class Then,
template <class...>
class Else>
template <class...> class Then,
template <class...> class Else>
struct ConditionalTmpl {
template <class... Args> using With = Then<Args...>;
};
Expand Down Expand Up @@ -588,7 +586,7 @@ template <class Aw> struct AwaitableAdapter {
return checker_.readyReturned(awaitable_.await_ready());
}

Handle await_suspend(Handle h) {
[[nodiscard]] Handle await_suspend(Handle h) {
#ifdef CORRAL_AWAITABLE_STATE_DEBUG
try {
return awaitSuspend(awaitable_, checker_.aboutToSuspend(h));
Expand Down
Loading

0 comments on commit 1cbda08

Please sign in to comment.