Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support error context in stream/error operations #1149

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions crates/guest-rust/rt/src/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub enum Handle {
LocalClosed,
Read,
Write,
// Local end is closed with an error
// NOTE: this is only valid for write ends
WriteClosedErr(u32),
}

/// The current task being polled (or null if none).
Expand Down Expand Up @@ -176,7 +179,7 @@ pub async unsafe fn await_result(
STATUS_RETURNED | STATUS_DONE => {
alloc::dealloc(params, params_layout);
}
_ => unreachable!(),
_ => unreachable!("unrecognized async call status"),
}
}

Expand All @@ -187,26 +190,50 @@ mod results {
pub const CANCELED: u32 = 0;
}

/// Result of awaiting a asynchronous read or write
#[doc(hidden)]
pub enum AsyncWaitResult {
vados-cosmonic marked this conversation as resolved.
Show resolved Hide resolved
/// Used when a value was successfully sent or received
Values(usize),
/// Represents a successful but error-indicating read
Error(u32),
/// Represents a failed read (closed, canceled, etc)
End,
}

impl AsyncWaitResult {
/// Interpret the results from an async operation that is known to *not* be blocked
fn from_nonblocked_async_result(v: u32) -> Self {
match v {
results::CLOSED | results::CANCELED => Self::End,
v => {
if v & results::CLOSED != 0 {
Self::Error(v & !results::CLOSED)
} else {
Self::Values(v as usize)
}
}
}
}
}

/// Await the completion of a future read or write.
#[doc(hidden)]
pub async unsafe fn await_future_result(
import: unsafe extern "C" fn(u32, *mut u8) -> u32,
future: u32,
address: *mut u8,
) -> bool {
) -> AsyncWaitResult {
let result = import(future, address);
match result {
results::BLOCKED => {
assert!(!CURRENT.is_null());
(*CURRENT).todo += 1;
let (tx, rx) = oneshot::channel();
CALLS.insert(future as _, tx);
let v = rx.await.unwrap();
v == 1
AsyncWaitResult::from_nonblocked_async_result(rx.await.unwrap())
}
results::CLOSED | results::CANCELED => false,
1 => true,
_ => unreachable!(),
v => AsyncWaitResult::from_nonblocked_async_result(v),
}
}

Expand All @@ -217,7 +244,7 @@ pub async unsafe fn await_stream_result(
stream: u32,
address: *mut u8,
count: u32,
) -> Option<usize> {
) -> AsyncWaitResult {
let result = import(stream, address, count);
match result {
results::BLOCKED => {
Expand All @@ -227,13 +254,12 @@ pub async unsafe fn await_stream_result(
CALLS.insert(stream as _, tx);
let v = rx.await.unwrap();
if let results::CLOSED | results::CANCELED = v {
None
AsyncWaitResult::End
} else {
Some(usize::try_from(v).unwrap())
AsyncWaitResult::Values(usize::try_from(v).unwrap())
}
}
results::CLOSED | results::CANCELED => None,
v => Some(usize::try_from(v).unwrap()),
v => AsyncWaitResult::from_nonblocked_async_result(v),
}
}

Expand Down Expand Up @@ -310,6 +336,7 @@ pub unsafe fn callback(ctx: *mut u8, event0: i32, event1: i32, event2: i32) -> i
}

/// Represents the Component Model `error-context` type.
#[derive(PartialEq, Eq)]
pub struct ErrorContext {
handle: u32,
}
Expand Down
46 changes: 31 additions & 15 deletions crates/guest-rust/rt/src/async_support/future_support.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate std;

use {
super::ErrorContext,
super::Handle,
futures::{
channel::oneshot,
Expand All @@ -20,10 +21,10 @@ use {
#[doc(hidden)]
pub struct FutureVtable<T> {
pub write: fn(future: u32, value: T) -> Pin<Box<dyn Future<Output = bool>>>,
pub read: fn(future: u32) -> Pin<Box<dyn Future<Output = Option<T>>>>,
pub read: fn(future: u32) -> Pin<Box<dyn Future<Output = Option<Result<T, ErrorContext>>>>>,
pub cancel_write: fn(future: u32),
pub cancel_read: fn(future: u32),
pub close_writable: fn(future: u32),
pub close_writable: fn(future: u32, err_ctx: u32),
pub close_readable: fn(future: u32),
}

Expand Down Expand Up @@ -78,7 +79,8 @@ impl<T> CancelableWrite<T> {
Handle::LocalOpen
| Handle::LocalWaiting(_)
| Handle::Read
| Handle::LocalClosed => unreachable!(),
| Handle::LocalClosed
| Handle::WriteClosedErr(_) => unreachable!(),
Handle::LocalReady(..) => {
entry.insert(Handle::LocalOpen);
}
Expand Down Expand Up @@ -126,7 +128,9 @@ impl<T> FutureWriter<T> {
Poll::Pending
}
Handle::LocalReady(..) => Poll::Pending,
Handle::LocalClosed => Poll::Ready(()),
Handle::LocalClosed | Handle::WriteClosedErr(_) => {
Poll::Ready(())
}
Handle::LocalWaiting(_) | Handle::Read | Handle::Write => {
unreachable!()
}
Expand All @@ -141,7 +145,7 @@ impl<T> FutureWriter<T> {
_ = tx.send(Box::new(v));
Box::pin(future::ready(()))
}
Handle::LocalClosed => Box::pin(future::ready(())),
Handle::LocalClosed | Handle::WriteClosedErr(_) => Box::pin(future::ready(())),
Handle::Read | Handle::LocalReady(..) => unreachable!(),
Handle::Write => Box::pin((vtable.write)(handle, v).map(drop)),
},
Expand All @@ -161,7 +165,12 @@ impl<T> Drop for FutureWriter<T> {
Handle::Read => unreachable!(),
Handle::Write | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_writable)(self.handle);
(self.vtable.close_writable)(self.handle, 0);
}
Handle::WriteClosedErr(err) => {
let err = *err;
entry.remove();
(self.vtable.close_writable)(self.handle, err);
}
},
});
Expand All @@ -171,13 +180,13 @@ impl<T> Drop for FutureWriter<T> {
/// Represents a read operation which may be canceled prior to completion.
pub struct CancelableRead<T: 'static> {
reader: Option<FutureReader<T>>,
future: Pin<Box<dyn Future<Output = Option<T>>>>,
future: Pin<Box<dyn Future<Output = Option<Result<T, ErrorContext>>>>>,
}

impl<T> Future for CancelableRead<T> {
type Output = Option<T>;
type Output = Option<Result<T, ErrorContext>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T, ErrorContext>>> {
let me = self.get_mut();
match me.future.poll_unpin(cx) {
Poll::Ready(v) => {
Expand Down Expand Up @@ -206,7 +215,8 @@ impl<T> CancelableRead<T> {
Handle::LocalOpen
| Handle::LocalReady(..)
| Handle::Write
| Handle::LocalClosed => unreachable!(),
| Handle::LocalClosed
| Handle::WriteClosedErr(_) => unreachable!(),
Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalOpen);
}
Expand Down Expand Up @@ -262,7 +272,8 @@ impl<T> FutureReader<T> {
| Handle::LocalOpen
| Handle::LocalReady(..)
| Handle::LocalWaiting(_)
| Handle::LocalClosed => {
| Handle::LocalClosed
| Handle::WriteClosedErr(_) => {
unreachable!()
}
},
Expand All @@ -286,7 +297,10 @@ impl<T> FutureReader<T> {
Handle::Read | Handle::LocalClosed => {
entry.remove();
}
Handle::LocalReady(..) | Handle::LocalWaiting(_) | Handle::Write => unreachable!(),
Handle::LocalReady(..)
| Handle::LocalWaiting(_)
| Handle::Write
| Handle::WriteClosedErr(_) => unreachable!(),
},
});

Expand All @@ -295,7 +309,7 @@ impl<T> FutureReader<T> {
}

impl<T> IntoFuture for FutureReader<T> {
type Output = Option<T>;
type Output = Option<Result<T, ErrorContext>>;
type IntoFuture = CancelableRead<T>;

/// Convert this object into a `Future` which will resolve when a value is
Expand All @@ -309,7 +323,9 @@ impl<T> IntoFuture for FutureReader<T> {
future: super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
Handle::Write | Handle::LocalWaiting(_) | Handle::WriteClosedErr(_) => {
unreachable!()
}
Handle::Read => Box::pin(async move { (vtable.read)(handle).await })
as Pin<Box<dyn Future<Output = _>>>,
Handle::LocalOpen => {
Expand Down Expand Up @@ -353,7 +369,7 @@ impl<T> Drop for FutureReader<T> {
entry.remove();
(self.vtable.close_readable)(handle);
}
Handle::Write => unreachable!(),
Handle::Write | Handle::WriteClosedErr(_) => unreachable!(),
},
});
}
Expand Down
Loading
Loading