From a8a23f84de56ac594da51b21694b18e67754745a Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 10 Jan 2025 14:01:00 -0800 Subject: [PATCH 1/5] wip: WasiCtx has a WasiExecutor --- crates/wasi/src/ctx.rs | 68 +++++++++++++++++++++++++++++++------ crates/wasi/src/lib.rs | 20 +++++++---- crates/wasi/src/preview0.rs | 9 ++--- crates/wasi/src/preview1.rs | 40 +++++++++++----------- 4 files changed, 97 insertions(+), 40 deletions(-) diff --git a/crates/wasi/src/ctx.rs b/crates/wasi/src/ctx.rs index 693876ea896a..6fdad0b288d3 100644 --- a/crates/wasi/src/ctx.rs +++ b/crates/wasi/src/ctx.rs @@ -463,7 +463,7 @@ impl WasiCtxBuilder { /// Panics if this method is called twice. Each [`WasiCtxBuilder`] can be /// used to create only a single [`WasiCtx`]. Repeated usage of this method /// is not allowed and should use a second builder instead. - pub fn build(&mut self) -> WasiCtx { + pub fn build(&mut self) -> WasiCtx { assert!(!self.built); let Self { @@ -500,6 +500,7 @@ impl WasiCtxBuilder { monotonic_clock, allowed_network_uses, allow_blocking_current_thread, + _executor: std::marker::PhantomData, } } @@ -519,7 +520,7 @@ impl WasiCtxBuilder { /// usage of this method is not allowed and should use a second builder /// instead. #[cfg(feature = "preview1")] - pub fn build_p1(&mut self) -> crate::preview1::WasiP1Ctx { + pub fn build_p1(&mut self) -> crate::preview1::WasiP1Ctx { let wasi = self.build(); crate::preview1::WasiP1Ctx::new(wasi) } @@ -545,7 +546,7 @@ impl WasiCtxBuilder { /// # Example /// /// ``` -/// use wasmtime_wasi::{WasiCtx, ResourceTable, WasiView, WasiCtxBuilder}; +/// use wasmtime_wasi::{WasiCtx, ResourceTable, WasiView, Tokio, WasiCtxBuilder}; /// /// struct MyState { /// ctx: WasiCtx, @@ -553,7 +554,8 @@ impl WasiCtxBuilder { /// } /// /// impl WasiView for MyState { -/// fn ctx(&mut self) -> &mut WasiCtx { &mut self.ctx } +/// type Executor = Tokio; +/// fn ctx(&mut self) -> &mut WasiCtx { &mut self.ctx } /// fn table(&mut self) -> &mut ResourceTable { &mut self.table } /// } /// @@ -572,6 +574,7 @@ impl WasiCtxBuilder { /// } /// ``` pub trait WasiView: Send { + type Executor: WasiExecutor; /// Yields mutable access to the internal resource management that this /// context contains. /// @@ -582,23 +585,25 @@ pub trait WasiView: Send { /// Yields mutable access to the configuration used for this context. /// /// The returned type is created through [`WasiCtxBuilder`]. - fn ctx(&mut self) -> &mut WasiCtx; + fn ctx(&mut self) -> &mut WasiCtx; } impl WasiView for &mut T { + type Executor = T::Executor; fn table(&mut self) -> &mut ResourceTable { T::table(self) } - fn ctx(&mut self) -> &mut WasiCtx { + fn ctx(&mut self) -> &mut WasiCtx { T::ctx(self) } } impl WasiView for Box { + type Executor = T::Executor; fn table(&mut self) -> &mut ResourceTable { T::table(self) } - fn ctx(&mut self) -> &mut WasiCtx { + fn ctx(&mut self) -> &mut WasiCtx { T::ctx(self) } } @@ -619,14 +624,56 @@ impl WasiView for Box { pub struct WasiImpl(pub T); impl WasiView for WasiImpl { + type Executor = T::Executor; fn table(&mut self) -> &mut ResourceTable { T::table(&mut self.0) } - fn ctx(&mut self) -> &mut WasiCtx { + fn ctx(&mut self) -> &mut WasiCtx { T::ctx(&mut self.0) } } +pub trait WasiExecutor: Send { + fn run_blocking(body: F) -> impl std::future::Future + Send + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; +} +pub trait WasiSyncExecutor: WasiExecutor { + fn block_on(f: F) -> F::Output + where + F: Future; +} + +pub struct Tokio; +impl WasiExecutor for Tokio { + async fn run_blocking(body: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + todo!() + } +} +pub struct Standalone; +impl WasiExecutor for Standalone { + async fn run_blocking(body: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + todo!() + } +} +impl WasiSyncExecutor for Standalone { + fn block_on(f: F) -> F::Output + where + F: Future, + { + todo!() + } +} + /// Per-[`Store`] state which holds state necessary to implement WASI from this /// crate. /// @@ -639,7 +686,7 @@ impl WasiView for WasiImpl { /// bindgen-generated traits. /// /// [`Store`]: wasmtime::Store -pub struct WasiCtx { +pub struct WasiCtx { pub(crate) random: Box, pub(crate) insecure_random: Box, pub(crate) insecure_random_seed: u128, @@ -654,9 +701,10 @@ pub struct WasiCtx { pub(crate) socket_addr_check: SocketAddrCheck, pub(crate) allowed_network_uses: AllowedNetworkUses, pub(crate) allow_blocking_current_thread: bool, + pub(crate) _executor: std::marker::PhantomData, } -impl WasiCtx { +impl WasiCtx { /// Convenience function for calling [`WasiCtxBuilder::new`]. pub fn builder() -> WasiCtxBuilder { WasiCtxBuilder::new() diff --git a/crates/wasi/src/lib.rs b/crates/wasi/src/lib.rs index a19c6c42ba6d..0a7534c96781 100644 --- a/crates/wasi/src/lib.rs +++ b/crates/wasi/src/lib.rs @@ -208,7 +208,7 @@ mod udp; mod write_stream; pub use self::clocks::{HostMonotonicClock, HostWallClock}; -pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiImpl, WasiView}; +pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiExecutor, WasiImpl, WasiSyncExecutor, WasiView}; pub use self::error::{I32Exit, TrappableError}; pub use self::filesystem::{DirPerms, FileInputStream, FilePerms, FsError, FsResult}; pub use self::network::{Network, SocketAddrUse, SocketError, SocketResult}; @@ -347,7 +347,7 @@ pub fn add_to_linker_with_options_async( /// ``` /// use wasmtime::{Engine, Result, Store, Config}; /// use wasmtime::component::{ResourceTable, Linker}; -/// use wasmtime_wasi::{WasiCtx, WasiView, WasiCtxBuilder}; +/// use wasmtime_wasi::{WasiCtx, WasiView, WasiCtxBuilder, Standalone}; /// /// fn main() -> Result<()> { /// let engine = Engine::default(); @@ -383,18 +383,24 @@ pub fn add_to_linker_with_options_async( /// fn table(&mut self) -> &mut ResourceTable { &mut self.table } /// } /// ``` -pub fn add_to_linker_sync( - linker: &mut wasmtime::component::Linker, -) -> anyhow::Result<()> { +pub fn add_to_linker_sync(linker: &mut wasmtime::component::Linker) -> anyhow::Result<()> +where + T: WasiView, + ::Executor: WasiSyncExecutor, +{ let options = crate::bindings::sync::LinkOptions::default(); add_to_linker_with_options_sync(linker, &options) } /// Similar to [`add_to_linker_sync`], but with the ability to enable unstable features. -pub fn add_to_linker_with_options_sync( +pub fn add_to_linker_with_options_sync( linker: &mut wasmtime::component::Linker, options: &crate::bindings::sync::LinkOptions, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +where + T: WasiView, + ::Executor: WasiSyncExecutor, +{ let l = linker; let closure = type_annotate::(|t| WasiImpl(t)); diff --git a/crates/wasi/src/preview0.rs b/crates/wasi/src/preview0.rs index 038b98b40df2..3081f8db753e 100644 --- a/crates/wasi/src/preview0.rs +++ b/crates/wasi/src/preview0.rs @@ -7,18 +7,19 @@ use crate::preview0::types::Error; use crate::preview1::types as snapshot1_types; use crate::preview1::wasi_snapshot_preview1::WasiSnapshotPreview1 as Snapshot1; use crate::preview1::WasiP1Ctx; +use crate::{WasiExecutor, WasiSyncExecutor}; use wiggle::{GuestError, GuestMemory, GuestPtr}; -pub fn add_to_linker_async( +pub fn add_to_linker_async( linker: &mut wasmtime::Linker, - f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, + f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, ) -> anyhow::Result<()> { wasi_unstable::add_to_linker(linker, f) } -pub fn add_to_linker_sync( +pub fn add_to_linker_sync( linker: &mut wasmtime::Linker, - f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, + f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, ) -> anyhow::Result<()> { sync::add_wasi_unstable_to_linker(linker, f) } diff --git a/crates/wasi/src/preview1.rs b/crates/wasi/src/preview1.rs index a4aec9b58110..354c6bce2639 100644 --- a/crates/wasi/src/preview1.rs +++ b/crates/wasi/src/preview1.rs @@ -73,7 +73,8 @@ use crate::bindings::{ io::streams, }; use crate::{ - FsError, IsATTY, ResourceTable, StreamError, StreamResult, WasiCtx, WasiImpl, WasiView, + FsError, IsATTY, ResourceTable, StreamError, StreamResult, WasiCtx, WasiExecutor, WasiImpl, + WasiSyncExecutor, WasiView, }; use anyhow::{bail, Context}; use std::collections::{BTreeMap, HashSet}; @@ -137,14 +138,14 @@ use crate::bindings::random::random::Host as _; /// Ok(()) /// } /// ``` -pub struct WasiP1Ctx { +pub struct WasiP1Ctx { table: ResourceTable, - wasi: WasiCtx, + wasi: WasiCtx, adapter: WasiPreview1Adapter, } -impl WasiP1Ctx { - pub(crate) fn new(wasi: WasiCtx) -> Self { +impl WasiP1Ctx { + pub(crate) fn new(wasi: WasiCtx) -> Self { Self { table: ResourceTable::new(), wasi, @@ -157,11 +158,12 @@ impl WasiP1Ctx { } } -impl WasiView for WasiP1Ctx { +impl WasiView for WasiP1Ctx { + type Executor = E; fn table(&mut self) -> &mut ResourceTable { &mut self.table } - fn ctx(&mut self) -> &mut WasiCtx { + fn ctx(&mut self) -> &mut WasiCtx { &mut self.wasi } } @@ -297,7 +299,7 @@ impl DerefMut for Descriptors { impl Descriptors { /// Initializes [Self] using `preopens` - fn new(mut host: WasiImpl<&mut WasiP1Ctx>) -> Result { + fn new(mut host: WasiImpl<&mut WasiP1Ctx>) -> Result { let mut descriptors = Self::default(); descriptors.push(Descriptor::Stdin { stream: host @@ -424,12 +426,12 @@ impl WasiPreview1Adapter { // of the [`WasiPreview1View`] to provide means to return mutably and immutably borrowed [`Descriptors`] // without having to rely on something like `Arc>`, while also being able to // call methods like [`Descriptor::is_file`] and hiding complexity from preview1 method implementations. -struct Transaction<'a> { - view: &'a mut WasiP1Ctx, +struct Transaction<'a, E> { + view: &'a mut WasiP1Ctx, descriptors: Descriptors, } -impl Drop for Transaction<'_> { +impl Drop for Transaction<'_, E> { /// Record changes in the [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] fn drop(&mut self) { let descriptors = mem::take(&mut self.descriptors); @@ -437,7 +439,7 @@ impl Drop for Transaction<'_> { } } -impl Transaction<'_> { +impl Transaction<'_, E> { /// Borrows [`Descriptor`] corresponding to `fd`. /// /// # Errors @@ -517,10 +519,10 @@ impl Transaction<'_> { } } -impl WasiP1Ctx { +impl WasiP1Ctx { /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] /// and returns [`Transaction`] on success - fn transact(&mut self) -> Result, types::Error> { + fn transact(&mut self) -> Result, types::Error> { let descriptors = if let Some(descriptors) = self.adapter.descriptors.take() { descriptors } else { @@ -729,9 +731,9 @@ enum FdWrite { /// Ok(()) /// } /// ``` -pub fn add_to_linker_async( +pub fn add_to_linker_async( linker: &mut wasmtime::Linker, - f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, + f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, ) -> anyhow::Result<()> { crate::preview1::wasi_snapshot_preview1::add_to_linker(linker, f) } @@ -801,9 +803,9 @@ pub fn add_to_linker_async( /// Ok(()) /// } /// ``` -pub fn add_to_linker_sync( +pub fn add_to_linker_sync( linker: &mut wasmtime::Linker, - f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, + f: impl Fn(&mut T) -> &mut WasiP1Ctx + Copy + Send + Sync + 'static, ) -> anyhow::Result<()> { crate::preview1::sync::add_wasi_snapshot_preview1_to_linker(linker, f) } @@ -1140,7 +1142,7 @@ fn first_non_empty_iovec( // Implement the WasiSnapshotPreview1 trait using only the traits that are // required for T, i.e., in terms of the preview 2 wit interface, and state // stored in the WasiPreview1Adapter struct. -impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx { +impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx { #[instrument(skip(self, memory))] fn args_get( &mut self, From 47250b2bb3916f324bd6a825582e2e26f7d9f6d1 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 13 Jan 2025 10:48:20 -0800 Subject: [PATCH 2/5] checkpoint --- crates/wasi/src/ctx.rs | 16 ++--- crates/wasi/src/filesystem.rs | 85 +++++++++++++++++-------- crates/wasi/src/host/filesystem.rs | 75 ++++++++++++---------- crates/wasi/src/host/filesystem/sync.rs | 74 +++++++++++---------- crates/wasi/src/preview1.rs | 82 ++++++++++++++---------- 5 files changed, 197 insertions(+), 135 deletions(-) diff --git a/crates/wasi/src/ctx.rs b/crates/wasi/src/ctx.rs index 6fdad0b288d3..47f0f283dcfb 100644 --- a/crates/wasi/src/ctx.rs +++ b/crates/wasi/src/ctx.rs @@ -38,13 +38,13 @@ use wasmtime::component::ResourceTable; /// ``` /// /// [`Store`]: wasmtime::Store -pub struct WasiCtxBuilder { +pub struct WasiCtxBuilder { stdin: Box, stdout: Box, stderr: Box, env: Vec<(String, String)>, args: Vec, - preopens: Vec<(Dir, String)>, + preopens: Vec<(Dir, String)>, socket_addr_check: SocketAddrCheck, random: Box, insecure_random: Box, @@ -56,7 +56,7 @@ pub struct WasiCtxBuilder { built: bool, } -impl WasiCtxBuilder { +impl WasiCtxBuilder { /// Creates a builder for a new context with default parameters set. /// /// The current defaults are: @@ -463,7 +463,7 @@ impl WasiCtxBuilder { /// Panics if this method is called twice. Each [`WasiCtxBuilder`] can be /// used to create only a single [`WasiCtx`]. Repeated usage of this method /// is not allowed and should use a second builder instead. - pub fn build(&mut self) -> WasiCtx { + pub fn build(&mut self) -> WasiCtx { assert!(!self.built); let Self { @@ -520,7 +520,7 @@ impl WasiCtxBuilder { /// usage of this method is not allowed and should use a second builder /// instead. #[cfg(feature = "preview1")] - pub fn build_p1(&mut self) -> crate::preview1::WasiP1Ctx { + pub fn build_p1(&mut self) -> crate::preview1::WasiP1Ctx { let wasi = self.build(); crate::preview1::WasiP1Ctx::new(wasi) } @@ -633,7 +633,7 @@ impl WasiView for WasiImpl { } } -pub trait WasiExecutor: Send { +pub trait WasiExecutor: Send + Sync + 'static { fn run_blocking(body: F) -> impl std::future::Future + Send where F: FnOnce() -> R + Send + 'static, @@ -694,7 +694,7 @@ pub struct WasiCtx { pub(crate) monotonic_clock: Box, pub(crate) env: Vec<(String, String)>, pub(crate) args: Vec, - pub(crate) preopens: Vec<(Dir, String)>, + pub(crate) preopens: Vec<(Dir, String)>, pub(crate) stdin: Box, pub(crate) stdout: Box, pub(crate) stderr: Box, @@ -706,7 +706,7 @@ pub struct WasiCtx { impl WasiCtx { /// Convenience function for calling [`WasiCtxBuilder::new`]. - pub fn builder() -> WasiCtxBuilder { + pub fn builder() -> WasiCtxBuilder { WasiCtxBuilder::new() } } diff --git a/crates/wasi/src/filesystem.rs b/crates/wasi/src/filesystem.rs index 2c656b6997e5..27d4f3e99e38 100644 --- a/crates/wasi/src/filesystem.rs +++ b/crates/wasi/src/filesystem.rs @@ -2,10 +2,12 @@ use crate::bindings::filesystem::types; use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle}; use crate::{ HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, TrappableError, + WasiExecutor, }; use anyhow::anyhow; use bytes::{Bytes, BytesMut}; use std::io; +use std::marker::PhantomData; use std::mem; use std::sync::Arc; @@ -25,20 +27,20 @@ impl From for FsError { } } -pub enum Descriptor { - File(File), - Dir(Dir), +pub enum Descriptor { + File(File), + Dir(Dir), } -impl Descriptor { - pub fn file(&self) -> Result<&File, types::ErrorCode> { +impl Descriptor { + pub fn file(&self) -> Result<&File, types::ErrorCode> { match self { Descriptor::File(f) => Ok(f), Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor), } } - pub fn dir(&self) -> Result<&Dir, types::ErrorCode> { + pub fn dir(&self) -> Result<&Dir, types::ErrorCode> { match self { Descriptor::Dir(d) => Ok(d), Descriptor::File(_) => Err(types::ErrorCode::NotDirectory), @@ -76,8 +78,7 @@ bitflags::bitflags! { } } -#[derive(Clone)] -pub struct File { +pub struct File { /// The operating system File this struct is mediating access to. /// /// Wrapped in an Arc because the same underlying file is used for @@ -97,9 +98,22 @@ pub struct File { pub open_mode: OpenMode, allow_blocking_current_thread: bool, + _executor: PhantomData, } -impl File { +impl Clone for File { + fn clone(&self) -> Self { + File { + file: self.file.clone(), + perms: self.perms.clone(), + open_mode: self.open_mode.clone(), + allow_blocking_current_thread: self.allow_blocking_current_thread, + _executor: PhantomData, + } + } +} + +impl File { pub fn new( file: cap_std::fs::File, perms: FilePerms, @@ -111,6 +125,7 @@ impl File { perms, open_mode, allow_blocking_current_thread, + _executor: PhantomData, } } @@ -177,8 +192,7 @@ bitflags::bitflags! { } } -#[derive(Clone)] -pub struct Dir { +pub struct Dir { /// The operating system file descriptor this struct is mediating access /// to. /// @@ -202,9 +216,23 @@ pub struct Dir { pub open_mode: OpenMode, allow_blocking_current_thread: bool, + _executor: PhantomData, } -impl Dir { +impl Clone for Dir { + fn clone(&self) -> Self { + Dir { + dir: self.dir.clone(), + perms: self.perms.clone(), + file_perms: self.file_perms.clone(), + open_mode: self.open_mode.clone(), + allow_blocking_current_thread: self.allow_blocking_current_thread, + _executor: PhantomData, + } + } +} + +impl Dir { pub fn new( dir: cap_std::fs::Dir, perms: DirPerms, @@ -218,9 +246,11 @@ impl Dir { file_perms, open_mode, allow_blocking_current_thread, + _executor: PhantomData, } } - +} +impl Dir { /// Execute the blocking `body` function. /// /// Depending on how the WasiCtx was configured, the body may either be: @@ -249,8 +279,8 @@ impl Dir { } } -pub struct FileInputStream { - file: File, +pub struct FileInputStream { + file: File, position: u64, state: ReadState, } @@ -261,15 +291,16 @@ enum ReadState { Error(io::Error), Closed, } -impl FileInputStream { - pub fn new(file: &File, position: u64) -> Self { +impl FileInputStream { + pub fn new(file: &File, position: u64) -> Self { Self { file: file.clone(), position, state: ReadState::Idle, } } - +} +impl FileInputStream { fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState { use system_interface::fs::FileIoExt; @@ -300,7 +331,7 @@ impl FileInputStream { } } #[async_trait::async_trait] -impl HostInputStream for FileInputStream { +impl HostInputStream for FileInputStream { fn read(&mut self, size: usize) -> StreamResult { match &mut self.state { ReadState::Idle => { @@ -368,7 +399,7 @@ impl HostInputStream for FileInputStream { } } #[async_trait::async_trait] -impl Subscribe for FileInputStream { +impl Subscribe for FileInputStream { async fn ready(&mut self) { if let ReadState::Idle = self.state { // The guest hasn't initiated any read, but is nonetheless waiting @@ -392,8 +423,8 @@ pub(crate) enum FileOutputMode { Append, } -pub(crate) struct FileOutputStream { - file: File, +pub(crate) struct FileOutputStream { + file: File, mode: FileOutputMode, state: OutputState, } @@ -408,8 +439,8 @@ enum OutputState { Closed, } -impl FileOutputStream { - pub fn write_at(file: &File, position: u64) -> Self { +impl FileOutputStream { + pub fn write_at(file: &File, position: u64) -> Self { Self { file: file.clone(), mode: FileOutputMode::Position(position), @@ -417,7 +448,7 @@ impl FileOutputStream { } } - pub fn append(file: &File) -> Self { + pub fn append(file: &File) -> Self { Self { file: file.clone(), mode: FileOutputMode::Append, @@ -467,7 +498,7 @@ impl FileOutputStream { const FILE_WRITE_CAPACITY: usize = 1024 * 1024; #[async_trait::async_trait] -impl HostOutputStream for FileOutputStream { +impl HostOutputStream for FileOutputStream { fn write(&mut self, buf: Bytes) -> Result<(), StreamError> { match self.state { OutputState::Ready => {} @@ -566,7 +597,7 @@ impl HostOutputStream for FileOutputStream { } #[async_trait::async_trait] -impl Subscribe for FileOutputStream { +impl Subscribe for FileOutputStream { async fn ready(&mut self) { if let OutputState::Waiting(task) = &mut self.state { self.state = match task.await { diff --git a/crates/wasi/src/host/filesystem.rs b/crates/wasi/src/host/filesystem.rs index 2d31e5c53fc4..4d4750cdd178 100644 --- a/crates/wasi/src/host/filesystem.rs +++ b/crates/wasi/src/host/filesystem.rs @@ -7,7 +7,7 @@ use crate::bindings::io::streams::{InputStream, OutputStream}; use crate::filesystem::{ Descriptor, Dir, File, FileInputStream, FileOutputStream, OpenMode, ReaddirIterator, }; -use crate::{DirPerms, FilePerms, FsError, FsResult, WasiImpl, WasiView}; +use crate::{DirPerms, FilePerms, FsError, FsResult, WasiExecutor, WasiImpl, WasiView}; use anyhow::Context; use wasmtime::component::Resource; @@ -19,7 +19,7 @@ where { fn get_directories( &mut self, - ) -> Result, String)>, anyhow::Error> { + ) -> Result>, String)>, anyhow::Error> { let mut results = Vec::new(); for (dir, name) in self.ctx().preopens.clone() { let fd = self @@ -62,7 +62,7 @@ where { async fn advise( &mut self, - fd: Resource, + fd: Resource>, offset: types::Filesize, len: types::Filesize, advice: types::Advice, @@ -85,7 +85,7 @@ where Ok(()) } - async fn sync_data(&mut self, fd: Resource) -> FsResult<()> { + async fn sync_data(&mut self, fd: Resource>) -> FsResult<()> { let descriptor = self.table().get(&fd)?; match descriptor { @@ -114,7 +114,7 @@ where async fn get_flags( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { use system_interface::fs::{FdFlags, GetSetFdFlags}; use types::DescriptorFlags; @@ -162,7 +162,7 @@ where async fn get_type( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { let descriptor = self.table().get(&fd)?; @@ -177,7 +177,7 @@ where async fn set_size( &mut self, - fd: Resource, + fd: Resource>, size: types::Filesize, ) -> FsResult<()> { let f = self.table().get(&fd)?.file()?; @@ -190,7 +190,7 @@ where async fn set_times( &mut self, - fd: Resource, + fd: Resource>, atim: types::NewTimestamp, mtim: types::NewTimestamp, ) -> FsResult<()> { @@ -221,7 +221,7 @@ where async fn read( &mut self, - fd: Resource, + fd: Resource>, len: types::Filesize, offset: types::Filesize, ) -> FsResult<(Vec, bool)> { @@ -259,7 +259,7 @@ where async fn write( &mut self, - fd: Resource, + fd: Resource>, buf: Vec, offset: types::Filesize, ) -> FsResult { @@ -281,7 +281,7 @@ where async fn read_directory( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult> { let table = self.table(); let d = table.get(&fd)?.dir()?; @@ -344,7 +344,7 @@ where Ok(table.push(ReaddirIterator::new(entries))?) } - async fn sync(&mut self, fd: Resource) -> FsResult<()> { + async fn sync(&mut self, fd: Resource>) -> FsResult<()> { let descriptor = self.table().get(&fd)?; match descriptor { @@ -373,7 +373,7 @@ where async fn create_directory_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult<()> { let table = self.table(); @@ -385,7 +385,10 @@ where Ok(()) } - async fn stat(&mut self, fd: Resource) -> FsResult { + async fn stat( + &mut self, + fd: Resource>, + ) -> FsResult { let descriptor = self.table().get(&fd)?; match descriptor { Descriptor::File(f) => { @@ -403,7 +406,7 @@ where async fn stat_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: types::PathFlags, path: String, ) -> FsResult { @@ -423,7 +426,7 @@ where async fn set_times_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: types::PathFlags, path: String, atim: types::NewTimestamp, @@ -462,11 +465,11 @@ where async fn link_at( &mut self, - fd: Resource, + fd: Resource>, // TODO delete the path flags from this function old_path_flags: types::PathFlags, old_path: String, - new_descriptor: Resource, + new_descriptor: Resource>, new_path: String, ) -> FsResult<()> { let table = self.table(); @@ -490,12 +493,12 @@ where async fn open_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: types::PathFlags, path: String, oflags: types::OpenFlags, flags: types::DescriptorFlags, - ) -> FsResult> { + ) -> FsResult>> { use cap_fs_ext::{FollowSymlinks, OpenOptionsFollowExt, OpenOptionsMaybeDirExt}; use system_interface::fs::{FdFlags, GetSetFdFlags}; use types::{DescriptorFlags, OpenFlags}; @@ -631,7 +634,7 @@ where } } - fn drop(&mut self, fd: Resource) -> anyhow::Result<()> { + fn drop(&mut self, fd: Resource>) -> anyhow::Result<()> { let table = self.table(); // The Drop will close the file/dir, but if the close syscall @@ -646,7 +649,7 @@ where async fn readlink_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult { let table = self.table(); @@ -663,7 +666,7 @@ where async fn remove_directory_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult<()> { let table = self.table(); @@ -676,9 +679,9 @@ where async fn rename_at( &mut self, - fd: Resource, + fd: Resource>, old_path: String, - new_fd: Resource, + new_fd: Resource>, new_path: String, ) -> FsResult<()> { let table = self.table(); @@ -698,7 +701,7 @@ where async fn symlink_at( &mut self, - fd: Resource, + fd: Resource>, src_path: String, dest_path: String, ) -> FsResult<()> { @@ -717,7 +720,7 @@ where async fn unlink_file_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult<()> { use cap_fs_ext::DirExt; @@ -733,7 +736,7 @@ where fn read_via_stream( &mut self, - fd: Resource, + fd: Resource>, offset: types::Filesize, ) -> FsResult> { // Trap if fd lookup fails: @@ -754,7 +757,7 @@ where fn write_via_stream( &mut self, - fd: Resource, + fd: Resource>, offset: types::Filesize, ) -> FsResult> { // Trap if fd lookup fails: @@ -776,7 +779,7 @@ where fn append_via_stream( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult> { // Trap if fd lookup fails: let f = self.table().get(&fd)?.file()?; @@ -797,8 +800,8 @@ where async fn is_same_object( &mut self, - a: Resource, - b: Resource, + a: Resource>, + b: Resource>, ) -> anyhow::Result { use cap_fs_ext::MetadataExt; let descriptor_a = self.table().get(&a)?; @@ -824,7 +827,7 @@ where } async fn metadata_hash( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { let descriptor_a = self.table().get(&fd)?; let meta = get_descriptor_metadata(descriptor_a).await?; @@ -832,7 +835,7 @@ where } async fn metadata_hash_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: types::PathFlags, path: String, ) -> FsResult { @@ -871,7 +874,9 @@ where } } -async fn get_descriptor_metadata(fd: &types::Descriptor) -> FsResult { +async fn get_descriptor_metadata( + fd: &types::Descriptor, +) -> FsResult { match fd { Descriptor::File(f) => { // No permissions check on metadata: if opened, allowed to stat it diff --git a/crates/wasi/src/host/filesystem/sync.rs b/crates/wasi/src/host/filesystem/sync.rs index 5de6b051921d..f364322bef3f 100644 --- a/crates/wasi/src/host/filesystem/sync.rs +++ b/crates/wasi/src/host/filesystem/sync.rs @@ -2,12 +2,13 @@ use crate::bindings::filesystem::types as async_filesystem; use crate::bindings::sync::filesystem::types as sync_filesystem; use crate::bindings::sync::io::streams; use crate::runtime::in_tokio; -use crate::{FsError, FsResult, WasiImpl, WasiView}; +use crate::{FsError, FsResult, WasiImpl, WasiSyncExecutor, WasiView}; use wasmtime::component::Resource; impl sync_filesystem::Host for WasiImpl where T: WasiView, + T::Executor: WasiSyncExecutor, { fn convert_error_code(&mut self, err: FsError) -> anyhow::Result { Ok(async_filesystem::Host::convert_error_code(self, err)?.into()) @@ -24,10 +25,11 @@ where impl sync_filesystem::HostDescriptor for WasiImpl where T: WasiView, + T::Executor: WasiSyncExecutor, { fn advise( &mut self, - fd: Resource, + fd: Resource>, offset: sync_filesystem::Filesize, len: sync_filesystem::Filesize, advice: sync_filesystem::Advice, @@ -37,27 +39,30 @@ where }) } - fn sync_data(&mut self, fd: Resource) -> FsResult<()> { + fn sync_data( + &mut self, + fd: Resource>, + ) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::sync_data(self, fd).await }) } fn get_flags( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { Ok(in_tokio(async { async_filesystem::HostDescriptor::get_flags(self, fd).await })?.into()) } fn get_type( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { Ok(in_tokio(async { async_filesystem::HostDescriptor::get_type(self, fd).await })?.into()) } fn set_size( &mut self, - fd: Resource, + fd: Resource>, size: sync_filesystem::Filesize, ) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::set_size(self, fd, size).await }) @@ -65,7 +70,7 @@ where fn set_times( &mut self, - fd: Resource, + fd: Resource>, atim: sync_filesystem::NewTimestamp, mtim: sync_filesystem::NewTimestamp, ) -> FsResult<()> { @@ -76,7 +81,7 @@ where fn read( &mut self, - fd: Resource, + fd: Resource>, len: sync_filesystem::Filesize, offset: sync_filesystem::Filesize, ) -> FsResult<(Vec, bool)> { @@ -85,7 +90,7 @@ where fn write( &mut self, - fd: Resource, + fd: Resource>, buf: Vec, offset: sync_filesystem::Filesize, ) -> FsResult { @@ -94,18 +99,18 @@ where fn read_directory( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult> { in_tokio(async { async_filesystem::HostDescriptor::read_directory(self, fd).await }) } - fn sync(&mut self, fd: Resource) -> FsResult<()> { + fn sync(&mut self, fd: Resource>) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::sync(self, fd).await }) } fn create_directory_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult<()> { in_tokio(async { @@ -115,14 +120,14 @@ where fn stat( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { Ok(in_tokio(async { async_filesystem::HostDescriptor::stat(self, fd).await })?.into()) } fn stat_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: sync_filesystem::PathFlags, path: String, ) -> FsResult { @@ -134,7 +139,7 @@ where fn set_times_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: sync_filesystem::PathFlags, path: String, atim: sync_filesystem::NewTimestamp, @@ -155,11 +160,11 @@ where fn link_at( &mut self, - fd: Resource, + fd: Resource>, // TODO delete the path flags from this function old_path_flags: sync_filesystem::PathFlags, old_path: String, - new_descriptor: Resource, + new_descriptor: Resource>, new_path: String, ) -> FsResult<()> { in_tokio(async { @@ -177,12 +182,12 @@ where fn open_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: sync_filesystem::PathFlags, path: String, oflags: sync_filesystem::OpenFlags, flags: sync_filesystem::DescriptorFlags, - ) -> FsResult> { + ) -> FsResult>> { in_tokio(async { async_filesystem::HostDescriptor::open_at( self, @@ -196,13 +201,16 @@ where }) } - fn drop(&mut self, fd: Resource) -> anyhow::Result<()> { + fn drop( + &mut self, + fd: Resource>, + ) -> anyhow::Result<()> { async_filesystem::HostDescriptor::drop(self, fd) } fn readlink_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult { in_tokio(async { async_filesystem::HostDescriptor::readlink_at(self, fd, path).await }) @@ -210,7 +218,7 @@ where fn remove_directory_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult<()> { in_tokio(async { @@ -220,9 +228,9 @@ where fn rename_at( &mut self, - fd: Resource, + fd: Resource>, old_path: String, - new_fd: Resource, + new_fd: Resource>, new_path: String, ) -> FsResult<()> { in_tokio(async { @@ -232,7 +240,7 @@ where fn symlink_at( &mut self, - fd: Resource, + fd: Resource>, src_path: String, dest_path: String, ) -> FsResult<()> { @@ -243,7 +251,7 @@ where fn unlink_file_at( &mut self, - fd: Resource, + fd: Resource>, path: String, ) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::unlink_file_at(self, fd, path).await }) @@ -251,7 +259,7 @@ where fn read_via_stream( &mut self, - fd: Resource, + fd: Resource>, offset: sync_filesystem::Filesize, ) -> FsResult> { Ok(async_filesystem::HostDescriptor::read_via_stream( @@ -261,7 +269,7 @@ where fn write_via_stream( &mut self, - fd: Resource, + fd: Resource>, offset: sync_filesystem::Filesize, ) -> FsResult> { Ok(async_filesystem::HostDescriptor::write_via_stream( @@ -271,7 +279,7 @@ where fn append_via_stream( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult> { Ok(async_filesystem::HostDescriptor::append_via_stream( self, fd, @@ -280,14 +288,14 @@ where fn is_same_object( &mut self, - a: Resource, - b: Resource, + a: Resource>, + b: Resource>, ) -> anyhow::Result { in_tokio(async { async_filesystem::HostDescriptor::is_same_object(self, a, b).await }) } fn metadata_hash( &mut self, - fd: Resource, + fd: Resource>, ) -> FsResult { Ok( in_tokio(async { async_filesystem::HostDescriptor::metadata_hash(self, fd).await })? @@ -296,7 +304,7 @@ where } fn metadata_hash_at( &mut self, - fd: Resource, + fd: Resource>, path_flags: sync_filesystem::PathFlags, path: String, ) -> FsResult { diff --git a/crates/wasi/src/preview1.rs b/crates/wasi/src/preview1.rs index 354c6bce2639..c2124701e1ba 100644 --- a/crates/wasi/src/preview1.rs +++ b/crates/wasi/src/preview1.rs @@ -141,7 +141,7 @@ use crate::bindings::random::random::Host as _; pub struct WasiP1Ctx { table: ResourceTable, wasi: WasiCtx, - adapter: WasiPreview1Adapter, + adapter: WasiPreview1Adapter, } impl WasiP1Ctx { @@ -169,9 +169,9 @@ impl WasiView for WasiP1Ctx { } #[derive(Debug)] -struct File { +struct File { /// The handle to the preview2 descriptor of type [`crate::filesystem::Descriptor::File`]. - fd: Resource, + fd: Resource>, /// The current-position pointer. position: Arc, @@ -248,7 +248,7 @@ impl BlockingMode { } #[derive(Debug)] -enum Descriptor { +enum Descriptor { Stdin { stream: Resource, isatty: IsATTY, @@ -263,43 +263,58 @@ enum Descriptor { }, /// A fd of type [`crate::filesystem::Descriptor::Dir`] Directory { - fd: Resource, + fd: Resource>, /// The path this directory was preopened as. /// `None` means this directory was opened using `open-at`. preopen_path: Option, }, /// A fd of type [`crate::filesystem::Descriptor::File`] - File(File), + File(File), } -#[derive(Debug, Default)] -struct WasiPreview1Adapter { - descriptors: Option, +#[derive(Debug)] +struct WasiPreview1Adapter { + descriptors: Option>, +} +impl Default for WasiPreview1Adapter { + fn default() -> Self { + Self { + descriptors: Default::default(), + } + } } -#[derive(Debug, Default)] -struct Descriptors { - used: BTreeMap, +#[derive(Debug)] +struct Descriptors { + used: BTreeMap>, free: Vec, } +impl Default for Descriptors { + fn default() -> Self { + Self { + used: BTreeMap::new(), + free: Default::default(), + } + } +} -impl Deref for Descriptors { - type Target = BTreeMap; +impl Deref for Descriptors { + type Target = BTreeMap>; fn deref(&self) -> &Self::Target { &self.used } } -impl DerefMut for Descriptors { +impl DerefMut for Descriptors { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.used } } -impl Descriptors { +impl Descriptors { /// Initializes [Self] using `preopens` - fn new(mut host: WasiImpl<&mut WasiP1Ctx>) -> Result { + fn new(mut host: WasiImpl<&mut WasiP1Ctx>) -> Result { let mut descriptors = Self::default(); descriptors.push(Descriptor::Stdin { stream: host @@ -390,7 +405,7 @@ impl Descriptors { } /// Removes the [Descriptor] corresponding to `fd` - fn remove(&mut self, fd: types::Fd) -> Option { + fn remove(&mut self, fd: types::Fd) -> Option> { let fd = fd.into(); let desc = self.used.remove(&fd)?; self.free.push(fd); @@ -400,7 +415,7 @@ impl Descriptors { /// Pushes the [Descriptor] returning corresponding number. /// This operation will try to reuse numbers previously removed via [`Self::remove`] /// and rely on [`Self::unused`] if no free numbers are recorded - fn push(&mut self, desc: Descriptor) -> Result { + fn push(&mut self, desc: Descriptor) -> Result { let fd = if let Some(fd) = self.free.pop() { fd } else { @@ -411,7 +426,7 @@ impl Descriptors { } } -impl WasiPreview1Adapter { +impl WasiPreview1Adapter { fn new() -> Self { Self::default() } @@ -428,7 +443,7 @@ impl WasiPreview1Adapter { // call methods like [`Descriptor::is_file`] and hiding complexity from preview1 method implementations. struct Transaction<'a, E> { view: &'a mut WasiP1Ctx, - descriptors: Descriptors, + descriptors: Descriptors, } impl Drop for Transaction<'_, E> { @@ -439,13 +454,13 @@ impl Drop for Transaction<'_, E> { } } -impl Transaction<'_, E> { +impl Transaction<'_, E> { /// Borrows [`Descriptor`] corresponding to `fd`. /// /// # Errors /// /// Returns [`types::Errno::Badf`] if no [`Descriptor`] is found - fn get_descriptor(&self, fd: types::Fd) -> Result<&Descriptor> { + fn get_descriptor(&self, fd: types::Fd) -> Result<&Descriptor> { let fd = fd.into(); let desc = self.descriptors.get(&fd).ok_or(types::Errno::Badf)?; Ok(desc) @@ -453,7 +468,7 @@ impl Transaction<'_, E> { /// Borrows [`File`] corresponding to `fd` /// if it describes a [`Descriptor::File`] - fn get_file(&self, fd: types::Fd) -> Result<&File> { + fn get_file(&self, fd: types::Fd) -> Result<&File> { let fd = fd.into(); match self.descriptors.get(&fd) { Some(Descriptor::File(file)) => Ok(file), @@ -463,7 +478,7 @@ impl Transaction<'_, E> { /// Mutably borrows [`File`] corresponding to `fd` /// if it describes a [`Descriptor::File`] - fn get_file_mut(&mut self, fd: types::Fd) -> Result<&mut File> { + fn get_file_mut(&mut self, fd: types::Fd) -> Result<&mut File> { let fd = fd.into(); match self.descriptors.get_mut(&fd) { Some(Descriptor::File(file)) => Ok(file), @@ -477,7 +492,7 @@ impl Transaction<'_, E> { /// # Errors /// /// Returns [`types::Errno::Spipe`] if the descriptor corresponds to stdio - fn get_seekable(&self, fd: types::Fd) -> Result<&File> { + fn get_seekable(&self, fd: types::Fd) -> Result<&File> { let fd = fd.into(); match self.descriptors.get(&fd) { Some(Descriptor::File(file)) => Ok(file), @@ -492,7 +507,7 @@ impl Transaction<'_, E> { } /// Returns [`filesystem::Descriptor`] corresponding to `fd` - fn get_fd(&self, fd: types::Fd) -> Result> { + fn get_fd(&self, fd: types::Fd) -> Result>> { match self.get_descriptor(fd)? { Descriptor::File(File { fd, .. }) => Ok(fd.borrowed()), Descriptor::Directory { fd, .. } => Ok(fd.borrowed()), @@ -504,13 +519,13 @@ impl Transaction<'_, E> { /// Returns [`filesystem::Descriptor`] corresponding to `fd` /// if it describes a [`Descriptor::File`] - fn get_file_fd(&self, fd: types::Fd) -> Result> { + fn get_file_fd(&self, fd: types::Fd) -> Result>> { self.get_file(fd).map(|File { fd, .. }| fd.borrowed()) } /// Returns [`filesystem::Descriptor`] corresponding to `fd` /// if it describes a [`Descriptor::Directory`] - fn get_dir_fd(&self, fd: types::Fd) -> Result> { + fn get_dir_fd(&self, fd: types::Fd) -> Result>> { let fd = fd.into(); match self.descriptors.get(&fd) { Some(Descriptor::Directory { fd, .. }) => Ok(fd.borrowed()), @@ -537,7 +552,10 @@ impl WasiP1Ctx { /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] /// and returns [`filesystem::Descriptor`] corresponding to `fd` - fn get_fd(&mut self, fd: types::Fd) -> Result, types::Error> { + fn get_fd( + &mut self, + fd: types::Fd, + ) -> Result>, types::Error> { let st = self.transact()?; let fd = st.get_fd(fd)?; Ok(fd) @@ -549,7 +567,7 @@ impl WasiP1Ctx { fn get_file_fd( &mut self, fd: types::Fd, - ) -> Result, types::Error> { + ) -> Result>, types::Error> { let st = self.transact()?; let fd = st.get_file_fd(fd)?; Ok(fd) @@ -562,7 +580,7 @@ impl WasiP1Ctx { fn get_dir_fd( &mut self, fd: types::Fd, - ) -> Result, types::Error> { + ) -> Result>, types::Error> { let st = self.transact()?; let fd = st.get_dir_fd(fd)?; Ok(fd) From baf1c1bcffaac42a2bb76306b06db4a136ff794c Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 13 Jan 2025 11:03:25 -0800 Subject: [PATCH 3/5] dont infect the resource types with executor because bindgen cant cope with that. will have to make some of the methods have generic, but the obvious type will bubble down at those use sites --- crates/wasi/src/ctx.rs | 14 ++--- crates/wasi/src/filesystem.rs | 82 ++++++++----------------- crates/wasi/src/host/filesystem.rs | 73 ++++++++++------------ crates/wasi/src/host/filesystem/sync.rs | 70 ++++++++++----------- crates/wasi/src/preview1.rs | 82 ++++++++++--------------- 5 files changed, 131 insertions(+), 190 deletions(-) diff --git a/crates/wasi/src/ctx.rs b/crates/wasi/src/ctx.rs index 47f0f283dcfb..07d048e897ca 100644 --- a/crates/wasi/src/ctx.rs +++ b/crates/wasi/src/ctx.rs @@ -38,13 +38,13 @@ use wasmtime::component::ResourceTable; /// ``` /// /// [`Store`]: wasmtime::Store -pub struct WasiCtxBuilder { +pub struct WasiCtxBuilder { stdin: Box, stdout: Box, stderr: Box, env: Vec<(String, String)>, args: Vec, - preopens: Vec<(Dir, String)>, + preopens: Vec<(Dir, String)>, socket_addr_check: SocketAddrCheck, random: Box, insecure_random: Box, @@ -56,7 +56,7 @@ pub struct WasiCtxBuilder { built: bool, } -impl WasiCtxBuilder { +impl WasiCtxBuilder { /// Creates a builder for a new context with default parameters set. /// /// The current defaults are: @@ -463,7 +463,7 @@ impl WasiCtxBuilder { /// Panics if this method is called twice. Each [`WasiCtxBuilder`] can be /// used to create only a single [`WasiCtx`]. Repeated usage of this method /// is not allowed and should use a second builder instead. - pub fn build(&mut self) -> WasiCtx { + pub fn build(&mut self) -> WasiCtx { assert!(!self.built); let Self { @@ -520,7 +520,7 @@ impl WasiCtxBuilder { /// usage of this method is not allowed and should use a second builder /// instead. #[cfg(feature = "preview1")] - pub fn build_p1(&mut self) -> crate::preview1::WasiP1Ctx { + pub fn build_p1(&mut self) -> crate::preview1::WasiP1Ctx { let wasi = self.build(); crate::preview1::WasiP1Ctx::new(wasi) } @@ -694,7 +694,7 @@ pub struct WasiCtx { pub(crate) monotonic_clock: Box, pub(crate) env: Vec<(String, String)>, pub(crate) args: Vec, - pub(crate) preopens: Vec<(Dir, String)>, + pub(crate) preopens: Vec<(Dir, String)>, pub(crate) stdin: Box, pub(crate) stdout: Box, pub(crate) stderr: Box, @@ -706,7 +706,7 @@ pub struct WasiCtx { impl WasiCtx { /// Convenience function for calling [`WasiCtxBuilder::new`]. - pub fn builder() -> WasiCtxBuilder { + pub fn builder() -> WasiCtxBuilder { WasiCtxBuilder::new() } } diff --git a/crates/wasi/src/filesystem.rs b/crates/wasi/src/filesystem.rs index 27d4f3e99e38..e5b6fd5af9fe 100644 --- a/crates/wasi/src/filesystem.rs +++ b/crates/wasi/src/filesystem.rs @@ -7,7 +7,6 @@ use crate::{ use anyhow::anyhow; use bytes::{Bytes, BytesMut}; use std::io; -use std::marker::PhantomData; use std::mem; use std::sync::Arc; @@ -27,20 +26,20 @@ impl From for FsError { } } -pub enum Descriptor { - File(File), - Dir(Dir), +pub enum Descriptor { + File(File), + Dir(Dir), } -impl Descriptor { - pub fn file(&self) -> Result<&File, types::ErrorCode> { +impl Descriptor { + pub fn file(&self) -> Result<&File, types::ErrorCode> { match self { Descriptor::File(f) => Ok(f), Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor), } } - pub fn dir(&self) -> Result<&Dir, types::ErrorCode> { + pub fn dir(&self) -> Result<&Dir, types::ErrorCode> { match self { Descriptor::Dir(d) => Ok(d), Descriptor::File(_) => Err(types::ErrorCode::NotDirectory), @@ -78,7 +77,8 @@ bitflags::bitflags! { } } -pub struct File { +#[derive(Clone)] +pub struct File { /// The operating system File this struct is mediating access to. /// /// Wrapped in an Arc because the same underlying file is used for @@ -98,22 +98,9 @@ pub struct File { pub open_mode: OpenMode, allow_blocking_current_thread: bool, - _executor: PhantomData, } -impl Clone for File { - fn clone(&self) -> Self { - File { - file: self.file.clone(), - perms: self.perms.clone(), - open_mode: self.open_mode.clone(), - allow_blocking_current_thread: self.allow_blocking_current_thread, - _executor: PhantomData, - } - } -} - -impl File { +impl File { pub fn new( file: cap_std::fs::File, perms: FilePerms, @@ -125,7 +112,6 @@ impl File { perms, open_mode, allow_blocking_current_thread, - _executor: PhantomData, } } @@ -192,7 +178,8 @@ bitflags::bitflags! { } } -pub struct Dir { +#[derive(Clone)] +pub struct Dir { /// The operating system file descriptor this struct is mediating access /// to. /// @@ -216,23 +203,9 @@ pub struct Dir { pub open_mode: OpenMode, allow_blocking_current_thread: bool, - _executor: PhantomData, -} - -impl Clone for Dir { - fn clone(&self) -> Self { - Dir { - dir: self.dir.clone(), - perms: self.perms.clone(), - file_perms: self.file_perms.clone(), - open_mode: self.open_mode.clone(), - allow_blocking_current_thread: self.allow_blocking_current_thread, - _executor: PhantomData, - } - } } -impl Dir { +impl Dir { pub fn new( dir: cap_std::fs::Dir, perms: DirPerms, @@ -246,11 +219,8 @@ impl Dir { file_perms, open_mode, allow_blocking_current_thread, - _executor: PhantomData, } } -} -impl Dir { /// Execute the blocking `body` function. /// /// Depending on how the WasiCtx was configured, the body may either be: @@ -279,8 +249,8 @@ impl Dir { } } -pub struct FileInputStream { - file: File, +pub struct FileInputStream { + file: File, position: u64, state: ReadState, } @@ -291,8 +261,8 @@ enum ReadState { Error(io::Error), Closed, } -impl FileInputStream { - pub fn new(file: &File, position: u64) -> Self { +impl FileInputStream { + pub fn new(file: &File, position: u64) -> Self { Self { file: file.clone(), position, @@ -300,7 +270,7 @@ impl FileInputStream { } } } -impl FileInputStream { +impl FileInputStream { fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState { use system_interface::fs::FileIoExt; @@ -331,7 +301,7 @@ impl FileInputStream { } } #[async_trait::async_trait] -impl HostInputStream for FileInputStream { +impl HostInputStream for FileInputStream { fn read(&mut self, size: usize) -> StreamResult { match &mut self.state { ReadState::Idle => { @@ -399,7 +369,7 @@ impl HostInputStream for FileInputStream { } } #[async_trait::async_trait] -impl Subscribe for FileInputStream { +impl Subscribe for FileInputStream { async fn ready(&mut self) { if let ReadState::Idle = self.state { // The guest hasn't initiated any read, but is nonetheless waiting @@ -423,8 +393,8 @@ pub(crate) enum FileOutputMode { Append, } -pub(crate) struct FileOutputStream { - file: File, +pub(crate) struct FileOutputStream { + file: File, mode: FileOutputMode, state: OutputState, } @@ -439,8 +409,8 @@ enum OutputState { Closed, } -impl FileOutputStream { - pub fn write_at(file: &File, position: u64) -> Self { +impl FileOutputStream { + pub fn write_at(file: &File, position: u64) -> Self { Self { file: file.clone(), mode: FileOutputMode::Position(position), @@ -448,7 +418,7 @@ impl FileOutputStream { } } - pub fn append(file: &File) -> Self { + pub fn append(file: &File) -> Self { Self { file: file.clone(), mode: FileOutputMode::Append, @@ -498,7 +468,7 @@ impl FileOutputStream { const FILE_WRITE_CAPACITY: usize = 1024 * 1024; #[async_trait::async_trait] -impl HostOutputStream for FileOutputStream { +impl HostOutputStream for FileOutputStream { fn write(&mut self, buf: Bytes) -> Result<(), StreamError> { match self.state { OutputState::Ready => {} @@ -597,7 +567,7 @@ impl HostOutputStream for FileOutputStream { } #[async_trait::async_trait] -impl Subscribe for FileOutputStream { +impl Subscribe for FileOutputStream { async fn ready(&mut self) { if let OutputState::Waiting(task) = &mut self.state { self.state = match task.await { diff --git a/crates/wasi/src/host/filesystem.rs b/crates/wasi/src/host/filesystem.rs index 4d4750cdd178..3520127fc6e4 100644 --- a/crates/wasi/src/host/filesystem.rs +++ b/crates/wasi/src/host/filesystem.rs @@ -19,7 +19,7 @@ where { fn get_directories( &mut self, - ) -> Result>, String)>, anyhow::Error> { + ) -> Result, String)>, anyhow::Error> { let mut results = Vec::new(); for (dir, name) in self.ctx().preopens.clone() { let fd = self @@ -62,7 +62,7 @@ where { async fn advise( &mut self, - fd: Resource>, + fd: Resource, offset: types::Filesize, len: types::Filesize, advice: types::Advice, @@ -85,7 +85,7 @@ where Ok(()) } - async fn sync_data(&mut self, fd: Resource>) -> FsResult<()> { + async fn sync_data(&mut self, fd: Resource) -> FsResult<()> { let descriptor = self.table().get(&fd)?; match descriptor { @@ -114,7 +114,7 @@ where async fn get_flags( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { use system_interface::fs::{FdFlags, GetSetFdFlags}; use types::DescriptorFlags; @@ -162,7 +162,7 @@ where async fn get_type( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { let descriptor = self.table().get(&fd)?; @@ -177,7 +177,7 @@ where async fn set_size( &mut self, - fd: Resource>, + fd: Resource, size: types::Filesize, ) -> FsResult<()> { let f = self.table().get(&fd)?.file()?; @@ -190,7 +190,7 @@ where async fn set_times( &mut self, - fd: Resource>, + fd: Resource, atim: types::NewTimestamp, mtim: types::NewTimestamp, ) -> FsResult<()> { @@ -221,7 +221,7 @@ where async fn read( &mut self, - fd: Resource>, + fd: Resource, len: types::Filesize, offset: types::Filesize, ) -> FsResult<(Vec, bool)> { @@ -259,7 +259,7 @@ where async fn write( &mut self, - fd: Resource>, + fd: Resource, buf: Vec, offset: types::Filesize, ) -> FsResult { @@ -281,7 +281,7 @@ where async fn read_directory( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult> { let table = self.table(); let d = table.get(&fd)?.dir()?; @@ -344,7 +344,7 @@ where Ok(table.push(ReaddirIterator::new(entries))?) } - async fn sync(&mut self, fd: Resource>) -> FsResult<()> { + async fn sync(&mut self, fd: Resource) -> FsResult<()> { let descriptor = self.table().get(&fd)?; match descriptor { @@ -373,7 +373,7 @@ where async fn create_directory_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult<()> { let table = self.table(); @@ -385,10 +385,7 @@ where Ok(()) } - async fn stat( - &mut self, - fd: Resource>, - ) -> FsResult { + async fn stat(&mut self, fd: Resource) -> FsResult { let descriptor = self.table().get(&fd)?; match descriptor { Descriptor::File(f) => { @@ -406,7 +403,7 @@ where async fn stat_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: types::PathFlags, path: String, ) -> FsResult { @@ -426,7 +423,7 @@ where async fn set_times_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: types::PathFlags, path: String, atim: types::NewTimestamp, @@ -465,11 +462,11 @@ where async fn link_at( &mut self, - fd: Resource>, + fd: Resource, // TODO delete the path flags from this function old_path_flags: types::PathFlags, old_path: String, - new_descriptor: Resource>, + new_descriptor: Resource, new_path: String, ) -> FsResult<()> { let table = self.table(); @@ -493,12 +490,12 @@ where async fn open_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: types::PathFlags, path: String, oflags: types::OpenFlags, flags: types::DescriptorFlags, - ) -> FsResult>> { + ) -> FsResult> { use cap_fs_ext::{FollowSymlinks, OpenOptionsFollowExt, OpenOptionsMaybeDirExt}; use system_interface::fs::{FdFlags, GetSetFdFlags}; use types::{DescriptorFlags, OpenFlags}; @@ -634,7 +631,7 @@ where } } - fn drop(&mut self, fd: Resource>) -> anyhow::Result<()> { + fn drop(&mut self, fd: Resource) -> anyhow::Result<()> { let table = self.table(); // The Drop will close the file/dir, but if the close syscall @@ -649,7 +646,7 @@ where async fn readlink_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult { let table = self.table(); @@ -666,7 +663,7 @@ where async fn remove_directory_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult<()> { let table = self.table(); @@ -679,9 +676,9 @@ where async fn rename_at( &mut self, - fd: Resource>, + fd: Resource, old_path: String, - new_fd: Resource>, + new_fd: Resource, new_path: String, ) -> FsResult<()> { let table = self.table(); @@ -701,7 +698,7 @@ where async fn symlink_at( &mut self, - fd: Resource>, + fd: Resource, src_path: String, dest_path: String, ) -> FsResult<()> { @@ -720,7 +717,7 @@ where async fn unlink_file_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult<()> { use cap_fs_ext::DirExt; @@ -736,7 +733,7 @@ where fn read_via_stream( &mut self, - fd: Resource>, + fd: Resource, offset: types::Filesize, ) -> FsResult> { // Trap if fd lookup fails: @@ -757,7 +754,7 @@ where fn write_via_stream( &mut self, - fd: Resource>, + fd: Resource, offset: types::Filesize, ) -> FsResult> { // Trap if fd lookup fails: @@ -779,7 +776,7 @@ where fn append_via_stream( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult> { // Trap if fd lookup fails: let f = self.table().get(&fd)?.file()?; @@ -800,8 +797,8 @@ where async fn is_same_object( &mut self, - a: Resource>, - b: Resource>, + a: Resource, + b: Resource, ) -> anyhow::Result { use cap_fs_ext::MetadataExt; let descriptor_a = self.table().get(&a)?; @@ -827,7 +824,7 @@ where } async fn metadata_hash( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { let descriptor_a = self.table().get(&fd)?; let meta = get_descriptor_metadata(descriptor_a).await?; @@ -835,7 +832,7 @@ where } async fn metadata_hash_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: types::PathFlags, path: String, ) -> FsResult { @@ -874,9 +871,7 @@ where } } -async fn get_descriptor_metadata( - fd: &types::Descriptor, -) -> FsResult { +async fn get_descriptor_metadata(fd: &types::Descriptor) -> FsResult { match fd { Descriptor::File(f) => { // No permissions check on metadata: if opened, allowed to stat it diff --git a/crates/wasi/src/host/filesystem/sync.rs b/crates/wasi/src/host/filesystem/sync.rs index f364322bef3f..85925144863e 100644 --- a/crates/wasi/src/host/filesystem/sync.rs +++ b/crates/wasi/src/host/filesystem/sync.rs @@ -29,7 +29,7 @@ where { fn advise( &mut self, - fd: Resource>, + fd: Resource, offset: sync_filesystem::Filesize, len: sync_filesystem::Filesize, advice: sync_filesystem::Advice, @@ -39,30 +39,27 @@ where }) } - fn sync_data( - &mut self, - fd: Resource>, - ) -> FsResult<()> { + fn sync_data(&mut self, fd: Resource) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::sync_data(self, fd).await }) } fn get_flags( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { Ok(in_tokio(async { async_filesystem::HostDescriptor::get_flags(self, fd).await })?.into()) } fn get_type( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { Ok(in_tokio(async { async_filesystem::HostDescriptor::get_type(self, fd).await })?.into()) } fn set_size( &mut self, - fd: Resource>, + fd: Resource, size: sync_filesystem::Filesize, ) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::set_size(self, fd, size).await }) @@ -70,7 +67,7 @@ where fn set_times( &mut self, - fd: Resource>, + fd: Resource, atim: sync_filesystem::NewTimestamp, mtim: sync_filesystem::NewTimestamp, ) -> FsResult<()> { @@ -81,7 +78,7 @@ where fn read( &mut self, - fd: Resource>, + fd: Resource, len: sync_filesystem::Filesize, offset: sync_filesystem::Filesize, ) -> FsResult<(Vec, bool)> { @@ -90,7 +87,7 @@ where fn write( &mut self, - fd: Resource>, + fd: Resource, buf: Vec, offset: sync_filesystem::Filesize, ) -> FsResult { @@ -99,18 +96,18 @@ where fn read_directory( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult> { in_tokio(async { async_filesystem::HostDescriptor::read_directory(self, fd).await }) } - fn sync(&mut self, fd: Resource>) -> FsResult<()> { + fn sync(&mut self, fd: Resource) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::sync(self, fd).await }) } fn create_directory_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult<()> { in_tokio(async { @@ -120,14 +117,14 @@ where fn stat( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { Ok(in_tokio(async { async_filesystem::HostDescriptor::stat(self, fd).await })?.into()) } fn stat_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: sync_filesystem::PathFlags, path: String, ) -> FsResult { @@ -139,7 +136,7 @@ where fn set_times_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: sync_filesystem::PathFlags, path: String, atim: sync_filesystem::NewTimestamp, @@ -160,11 +157,11 @@ where fn link_at( &mut self, - fd: Resource>, + fd: Resource, // TODO delete the path flags from this function old_path_flags: sync_filesystem::PathFlags, old_path: String, - new_descriptor: Resource>, + new_descriptor: Resource, new_path: String, ) -> FsResult<()> { in_tokio(async { @@ -182,12 +179,12 @@ where fn open_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: sync_filesystem::PathFlags, path: String, oflags: sync_filesystem::OpenFlags, flags: sync_filesystem::DescriptorFlags, - ) -> FsResult>> { + ) -> FsResult> { in_tokio(async { async_filesystem::HostDescriptor::open_at( self, @@ -201,16 +198,13 @@ where }) } - fn drop( - &mut self, - fd: Resource>, - ) -> anyhow::Result<()> { + fn drop(&mut self, fd: Resource) -> anyhow::Result<()> { async_filesystem::HostDescriptor::drop(self, fd) } fn readlink_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult { in_tokio(async { async_filesystem::HostDescriptor::readlink_at(self, fd, path).await }) @@ -218,7 +212,7 @@ where fn remove_directory_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult<()> { in_tokio(async { @@ -228,9 +222,9 @@ where fn rename_at( &mut self, - fd: Resource>, + fd: Resource, old_path: String, - new_fd: Resource>, + new_fd: Resource, new_path: String, ) -> FsResult<()> { in_tokio(async { @@ -240,7 +234,7 @@ where fn symlink_at( &mut self, - fd: Resource>, + fd: Resource, src_path: String, dest_path: String, ) -> FsResult<()> { @@ -251,7 +245,7 @@ where fn unlink_file_at( &mut self, - fd: Resource>, + fd: Resource, path: String, ) -> FsResult<()> { in_tokio(async { async_filesystem::HostDescriptor::unlink_file_at(self, fd, path).await }) @@ -259,7 +253,7 @@ where fn read_via_stream( &mut self, - fd: Resource>, + fd: Resource, offset: sync_filesystem::Filesize, ) -> FsResult> { Ok(async_filesystem::HostDescriptor::read_via_stream( @@ -269,7 +263,7 @@ where fn write_via_stream( &mut self, - fd: Resource>, + fd: Resource, offset: sync_filesystem::Filesize, ) -> FsResult> { Ok(async_filesystem::HostDescriptor::write_via_stream( @@ -279,7 +273,7 @@ where fn append_via_stream( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult> { Ok(async_filesystem::HostDescriptor::append_via_stream( self, fd, @@ -288,14 +282,14 @@ where fn is_same_object( &mut self, - a: Resource>, - b: Resource>, + a: Resource, + b: Resource, ) -> anyhow::Result { in_tokio(async { async_filesystem::HostDescriptor::is_same_object(self, a, b).await }) } fn metadata_hash( &mut self, - fd: Resource>, + fd: Resource, ) -> FsResult { Ok( in_tokio(async { async_filesystem::HostDescriptor::metadata_hash(self, fd).await })? @@ -304,7 +298,7 @@ where } fn metadata_hash_at( &mut self, - fd: Resource>, + fd: Resource, path_flags: sync_filesystem::PathFlags, path: String, ) -> FsResult { diff --git a/crates/wasi/src/preview1.rs b/crates/wasi/src/preview1.rs index c2124701e1ba..354c6bce2639 100644 --- a/crates/wasi/src/preview1.rs +++ b/crates/wasi/src/preview1.rs @@ -141,7 +141,7 @@ use crate::bindings::random::random::Host as _; pub struct WasiP1Ctx { table: ResourceTable, wasi: WasiCtx, - adapter: WasiPreview1Adapter, + adapter: WasiPreview1Adapter, } impl WasiP1Ctx { @@ -169,9 +169,9 @@ impl WasiView for WasiP1Ctx { } #[derive(Debug)] -struct File { +struct File { /// The handle to the preview2 descriptor of type [`crate::filesystem::Descriptor::File`]. - fd: Resource>, + fd: Resource, /// The current-position pointer. position: Arc, @@ -248,7 +248,7 @@ impl BlockingMode { } #[derive(Debug)] -enum Descriptor { +enum Descriptor { Stdin { stream: Resource, isatty: IsATTY, @@ -263,58 +263,43 @@ enum Descriptor { }, /// A fd of type [`crate::filesystem::Descriptor::Dir`] Directory { - fd: Resource>, + fd: Resource, /// The path this directory was preopened as. /// `None` means this directory was opened using `open-at`. preopen_path: Option, }, /// A fd of type [`crate::filesystem::Descriptor::File`] - File(File), + File(File), } -#[derive(Debug)] -struct WasiPreview1Adapter { - descriptors: Option>, -} -impl Default for WasiPreview1Adapter { - fn default() -> Self { - Self { - descriptors: Default::default(), - } - } +#[derive(Debug, Default)] +struct WasiPreview1Adapter { + descriptors: Option, } -#[derive(Debug)] -struct Descriptors { - used: BTreeMap>, +#[derive(Debug, Default)] +struct Descriptors { + used: BTreeMap, free: Vec, } -impl Default for Descriptors { - fn default() -> Self { - Self { - used: BTreeMap::new(), - free: Default::default(), - } - } -} -impl Deref for Descriptors { - type Target = BTreeMap>; +impl Deref for Descriptors { + type Target = BTreeMap; fn deref(&self) -> &Self::Target { &self.used } } -impl DerefMut for Descriptors { +impl DerefMut for Descriptors { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.used } } -impl Descriptors { +impl Descriptors { /// Initializes [Self] using `preopens` - fn new(mut host: WasiImpl<&mut WasiP1Ctx>) -> Result { + fn new(mut host: WasiImpl<&mut WasiP1Ctx>) -> Result { let mut descriptors = Self::default(); descriptors.push(Descriptor::Stdin { stream: host @@ -405,7 +390,7 @@ impl Descriptors { } /// Removes the [Descriptor] corresponding to `fd` - fn remove(&mut self, fd: types::Fd) -> Option> { + fn remove(&mut self, fd: types::Fd) -> Option { let fd = fd.into(); let desc = self.used.remove(&fd)?; self.free.push(fd); @@ -415,7 +400,7 @@ impl Descriptors { /// Pushes the [Descriptor] returning corresponding number. /// This operation will try to reuse numbers previously removed via [`Self::remove`] /// and rely on [`Self::unused`] if no free numbers are recorded - fn push(&mut self, desc: Descriptor) -> Result { + fn push(&mut self, desc: Descriptor) -> Result { let fd = if let Some(fd) = self.free.pop() { fd } else { @@ -426,7 +411,7 @@ impl Descriptors { } } -impl WasiPreview1Adapter { +impl WasiPreview1Adapter { fn new() -> Self { Self::default() } @@ -443,7 +428,7 @@ impl WasiPreview1Adapter { // call methods like [`Descriptor::is_file`] and hiding complexity from preview1 method implementations. struct Transaction<'a, E> { view: &'a mut WasiP1Ctx, - descriptors: Descriptors, + descriptors: Descriptors, } impl Drop for Transaction<'_, E> { @@ -454,13 +439,13 @@ impl Drop for Transaction<'_, E> { } } -impl Transaction<'_, E> { +impl Transaction<'_, E> { /// Borrows [`Descriptor`] corresponding to `fd`. /// /// # Errors /// /// Returns [`types::Errno::Badf`] if no [`Descriptor`] is found - fn get_descriptor(&self, fd: types::Fd) -> Result<&Descriptor> { + fn get_descriptor(&self, fd: types::Fd) -> Result<&Descriptor> { let fd = fd.into(); let desc = self.descriptors.get(&fd).ok_or(types::Errno::Badf)?; Ok(desc) @@ -468,7 +453,7 @@ impl Transaction<'_, E> { /// Borrows [`File`] corresponding to `fd` /// if it describes a [`Descriptor::File`] - fn get_file(&self, fd: types::Fd) -> Result<&File> { + fn get_file(&self, fd: types::Fd) -> Result<&File> { let fd = fd.into(); match self.descriptors.get(&fd) { Some(Descriptor::File(file)) => Ok(file), @@ -478,7 +463,7 @@ impl Transaction<'_, E> { /// Mutably borrows [`File`] corresponding to `fd` /// if it describes a [`Descriptor::File`] - fn get_file_mut(&mut self, fd: types::Fd) -> Result<&mut File> { + fn get_file_mut(&mut self, fd: types::Fd) -> Result<&mut File> { let fd = fd.into(); match self.descriptors.get_mut(&fd) { Some(Descriptor::File(file)) => Ok(file), @@ -492,7 +477,7 @@ impl Transaction<'_, E> { /// # Errors /// /// Returns [`types::Errno::Spipe`] if the descriptor corresponds to stdio - fn get_seekable(&self, fd: types::Fd) -> Result<&File> { + fn get_seekable(&self, fd: types::Fd) -> Result<&File> { let fd = fd.into(); match self.descriptors.get(&fd) { Some(Descriptor::File(file)) => Ok(file), @@ -507,7 +492,7 @@ impl Transaction<'_, E> { } /// Returns [`filesystem::Descriptor`] corresponding to `fd` - fn get_fd(&self, fd: types::Fd) -> Result>> { + fn get_fd(&self, fd: types::Fd) -> Result> { match self.get_descriptor(fd)? { Descriptor::File(File { fd, .. }) => Ok(fd.borrowed()), Descriptor::Directory { fd, .. } => Ok(fd.borrowed()), @@ -519,13 +504,13 @@ impl Transaction<'_, E> { /// Returns [`filesystem::Descriptor`] corresponding to `fd` /// if it describes a [`Descriptor::File`] - fn get_file_fd(&self, fd: types::Fd) -> Result>> { + fn get_file_fd(&self, fd: types::Fd) -> Result> { self.get_file(fd).map(|File { fd, .. }| fd.borrowed()) } /// Returns [`filesystem::Descriptor`] corresponding to `fd` /// if it describes a [`Descriptor::Directory`] - fn get_dir_fd(&self, fd: types::Fd) -> Result>> { + fn get_dir_fd(&self, fd: types::Fd) -> Result> { let fd = fd.into(); match self.descriptors.get(&fd) { Some(Descriptor::Directory { fd, .. }) => Ok(fd.borrowed()), @@ -552,10 +537,7 @@ impl WasiP1Ctx { /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] /// and returns [`filesystem::Descriptor`] corresponding to `fd` - fn get_fd( - &mut self, - fd: types::Fd, - ) -> Result>, types::Error> { + fn get_fd(&mut self, fd: types::Fd) -> Result, types::Error> { let st = self.transact()?; let fd = st.get_fd(fd)?; Ok(fd) @@ -567,7 +549,7 @@ impl WasiP1Ctx { fn get_file_fd( &mut self, fd: types::Fd, - ) -> Result>, types::Error> { + ) -> Result, types::Error> { let st = self.transact()?; let fd = st.get_file_fd(fd)?; Ok(fd) @@ -580,7 +562,7 @@ impl WasiP1Ctx { fn get_dir_fd( &mut self, fd: types::Fd, - ) -> Result>, types::Error> { + ) -> Result, types::Error> { let st = self.transact()?; let fd = st.get_dir_fd(fd)?; Ok(fd) From 0df00003bc23ce1076c48aa3098769f3dd537fbe Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 13 Jan 2025 16:52:44 -0800 Subject: [PATCH 4/5] replace allow_blocking_current_thread, move spawn and spawn_blocking into WasiExecutor trait --- crates/wasi/src/ctx.rs | 89 +---------- crates/wasi/src/filesystem.rs | 93 ++++-------- crates/wasi/src/host/filesystem.rs | 135 ++++++++++------- crates/wasi/src/host/filesystem/sync.rs | 4 +- crates/wasi/src/host/tcp.rs | 7 +- crates/wasi/src/ip_name_lookup.rs | 5 +- crates/wasi/src/lib.rs | 6 +- crates/wasi/src/pipe.rs | 14 +- crates/wasi/src/preview0.rs | 2 +- crates/wasi/src/preview1.rs | 63 +++----- crates/wasi/src/runtime.rs | 192 ++++++++++++++++-------- crates/wasi/src/tcp.rs | 57 ++++--- crates/wasi/src/write_stream.rs | 5 +- 13 files changed, 329 insertions(+), 343 deletions(-) diff --git a/crates/wasi/src/ctx.rs b/crates/wasi/src/ctx.rs index 07d048e897ca..a7e7ef6bd919 100644 --- a/crates/wasi/src/ctx.rs +++ b/crates/wasi/src/ctx.rs @@ -5,7 +5,9 @@ use crate::{ }, filesystem::{Dir, OpenMode}, network::{SocketAddrCheck, SocketAddrUse}, - pipe, random, stdio, + pipe, random, + runtime::WasiExecutor, + stdio, stdio::{StdinStream, StdoutStream}, DirPerms, FilePerms, }; @@ -52,7 +54,6 @@ pub struct WasiCtxBuilder { wall_clock: Box, monotonic_clock: Box, allowed_network_uses: AllowedNetworkUses, - allow_blocking_current_thread: bool, built: bool, } @@ -101,7 +102,6 @@ impl WasiCtxBuilder { wall_clock: wall_clock(), monotonic_clock: monotonic_clock(), allowed_network_uses: AllowedNetworkUses::default(), - allow_blocking_current_thread: false, built: false, } } @@ -175,37 +175,6 @@ impl WasiCtxBuilder { self.inherit_stdin().inherit_stdout().inherit_stderr() } - /// Configures whether or not blocking operations made through this - /// `WasiCtx` are allowed to block the current thread. - /// - /// WASI is currently implemented on top of the Rust - /// [Tokio](https://tokio.rs/) library. While most WASI APIs are - /// non-blocking some are instead blocking from the perspective of - /// WebAssembly. For example opening a file is a blocking operation with - /// respect to WebAssembly but it's implemented as an asynchronous operation - /// on the host. This is currently done with Tokio's - /// [`spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html). - /// - /// When WebAssembly is used in a synchronous context, for example when - /// [`Config::async_support`] is disabled, then this asynchronous operation - /// is quickly turned back into a synchronous operation with a `block_on` in - /// Rust. This switching back-and-forth between a blocking a non-blocking - /// context can have overhead, and this option exists to help alleviate this - /// overhead. - /// - /// This option indicates that for WASI functions that are blocking from the - /// perspective of WebAssembly it's ok to block the native thread as well. - /// This means that this back-and-forth between async and sync won't happen - /// and instead blocking operations are performed on-thread (such as opening - /// a file). This can improve the performance of WASI operations when async - /// support is disabled. - /// - /// [`Config::async_support`]: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.async_support - pub fn allow_blocking_current_thread(&mut self, enable: bool) -> &mut Self { - self.allow_blocking_current_thread = enable; - self - } - /// Appends multiple environment variables at once for this builder. /// /// All environment variables are appended to the list of environment @@ -340,13 +309,7 @@ impl WasiCtxBuilder { open_mode |= OpenMode::WRITE; } self.preopens.push(( - Dir::new( - dir, - dir_perms, - file_perms, - open_mode, - self.allow_blocking_current_thread, - ), + Dir::new(dir, dir_perms, file_perms, open_mode), guest_path.as_ref().to_owned(), )); Ok(self) @@ -480,7 +443,6 @@ impl WasiCtxBuilder { wall_clock, monotonic_clock, allowed_network_uses, - allow_blocking_current_thread, built: _, } = mem::replace(self, Self::new()); self.built = true; @@ -499,7 +461,6 @@ impl WasiCtxBuilder { wall_clock, monotonic_clock, allowed_network_uses, - allow_blocking_current_thread, _executor: std::marker::PhantomData, } } @@ -633,47 +594,6 @@ impl WasiView for WasiImpl { } } -pub trait WasiExecutor: Send + Sync + 'static { - fn run_blocking(body: F) -> impl std::future::Future + Send - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static; -} -pub trait WasiSyncExecutor: WasiExecutor { - fn block_on(f: F) -> F::Output - where - F: Future; -} - -pub struct Tokio; -impl WasiExecutor for Tokio { - async fn run_blocking(body: F) -> R - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - todo!() - } -} -pub struct Standalone; -impl WasiExecutor for Standalone { - async fn run_blocking(body: F) -> R - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - todo!() - } -} -impl WasiSyncExecutor for Standalone { - fn block_on(f: F) -> F::Output - where - F: Future, - { - todo!() - } -} - /// Per-[`Store`] state which holds state necessary to implement WASI from this /// crate. /// @@ -700,7 +620,6 @@ pub struct WasiCtx { pub(crate) stderr: Box, pub(crate) socket_addr_check: SocketAddrCheck, pub(crate) allowed_network_uses: AllowedNetworkUses, - pub(crate) allow_blocking_current_thread: bool, pub(crate) _executor: std::marker::PhantomData, } diff --git a/crates/wasi/src/filesystem.rs b/crates/wasi/src/filesystem.rs index e5b6fd5af9fe..6c154164abab 100644 --- a/crates/wasi/src/filesystem.rs +++ b/crates/wasi/src/filesystem.rs @@ -1,8 +1,7 @@ use crate::bindings::filesystem::types; -use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle}; +use crate::runtime::{AbortOnDropJoinHandle, WasiExecutor}; use crate::{ HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, TrappableError, - WasiExecutor, }; use anyhow::anyhow; use bytes::{Bytes, BytesMut}; @@ -96,22 +95,14 @@ pub struct File { /// doesn't presently provide a cross-platform equivalent of reading the /// oflags back out using fcntl. pub open_mode: OpenMode, - - allow_blocking_current_thread: bool, } impl File { - pub fn new( - file: cap_std::fs::File, - perms: FilePerms, - open_mode: OpenMode, - allow_blocking_current_thread: bool, - ) -> Self { + pub fn new(file: cap_std::fs::File, perms: FilePerms, open_mode: OpenMode) -> Self { Self { file: Arc::new(file), perms, open_mode, - allow_blocking_current_thread, } } @@ -126,38 +117,24 @@ impl File { /// /// Intentionally blocking the executor thread might seem unorthodox, but is /// not actually a problem for specific workloads. See: - /// - [`crate::WasiCtxBuilder::allow_blocking_current_thread`] /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973) /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190) - pub(crate) async fn run_blocking(&self, body: F) -> R + pub(crate) async fn run_blocking(&self, body: F) -> R where F: FnOnce(&cap_std::fs::File) -> R + Send + 'static, R: Send + 'static, { - match self.as_blocking_file() { - Some(file) => body(file), - None => self.spawn_blocking(body).await, - } + let file = self.file.clone(); + E::run_blocking(move || body(&file)).await } - pub(crate) fn spawn_blocking(&self, body: F) -> AbortOnDropJoinHandle + pub(crate) fn spawn_blocking(&self, body: F) -> AbortOnDropJoinHandle where F: FnOnce(&cap_std::fs::File) -> R + Send + 'static, R: Send + 'static, { let f = self.file.clone(); - spawn_blocking(move || body(&f)) - } - - /// Returns `Some` when the current thread is allowed to block in filesystem - /// operations, and otherwise returns `None` to indicate that - /// `spawn_blocking` must be used. - pub(crate) fn as_blocking_file(&self) -> Option<&cap_std::fs::File> { - if self.allow_blocking_current_thread { - Some(&self.file) - } else { - None - } + E::spawn_blocking(move || body(&f)) } } @@ -201,8 +178,6 @@ pub struct Dir { /// doesn't presently provide a cross-platform equivalent of reading the /// oflags back out using fcntl. pub open_mode: OpenMode, - - allow_blocking_current_thread: bool, } impl Dir { @@ -211,19 +186,17 @@ impl Dir { perms: DirPerms, file_perms: FilePerms, open_mode: OpenMode, - allow_blocking_current_thread: bool, ) -> Self { Dir { dir: Arc::new(dir), perms, file_perms, open_mode, - allow_blocking_current_thread, } } /// Execute the blocking `body` function. /// - /// Depending on how the WasiCtx was configured, the body may either be: + /// Depending on the WasiCtx's Executor type parameter, the body may either be: /// - Executed directly on the current thread. In this case the `async` /// signature of this method is effectively a lie and the returned /// Future will always be immediately Ready. Or: @@ -232,27 +205,23 @@ impl Dir { /// /// Intentionally blocking the executor thread might seem unorthodox, but is /// not actually a problem for specific workloads. See: - /// - [`crate::WasiCtxBuilder::allow_blocking_current_thread`] /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973) /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190) - pub(crate) async fn run_blocking(&self, body: F) -> R + pub(crate) async fn run_blocking(&self, body: F) -> R where F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static, R: Send + 'static, { - if self.allow_blocking_current_thread { - body(&self.dir) - } else { - let d = self.dir.clone(); - spawn_blocking(move || body(&d)).await - } + let d = self.dir.clone(); + E::run_blocking(move || body(&d)).await } } -pub struct FileInputStream { +pub struct FileInputStream { file: File, position: u64, state: ReadState, + _executor: std::marker::PhantomData, } enum ReadState { Idle, @@ -261,16 +230,17 @@ enum ReadState { Error(io::Error), Closed, } -impl FileInputStream { +impl FileInputStream { pub fn new(file: &File, position: u64) -> Self { Self { file: file.clone(), position, state: ReadState::Idle, + _executor: std::marker::PhantomData, } } } -impl FileInputStream { +impl FileInputStream { fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState { use system_interface::fs::FileIoExt; @@ -301,7 +271,7 @@ impl FileInputStream { } } #[async_trait::async_trait] -impl HostInputStream for FileInputStream { +impl HostInputStream for FileInputStream { fn read(&mut self, size: usize) -> StreamResult { match &mut self.state { ReadState::Idle => { @@ -312,7 +282,7 @@ impl HostInputStream for FileInputStream { let p = self.position; self.state = ReadState::Waiting( self.file - .spawn_blocking(move |f| Self::blocking_read(f, p, size)), + .spawn_blocking::(move |f| Self::blocking_read(f, p, size)), ); Ok(Bytes::new()) } @@ -343,7 +313,7 @@ impl HostInputStream for FileInputStream { let p = self.position; self.state = self .file - .run_blocking(move |f| Self::blocking_read(f, p, size)) + .run_blocking::(move |f| Self::blocking_read(f, p, size)) .await; } @@ -369,7 +339,7 @@ impl HostInputStream for FileInputStream { } } #[async_trait::async_trait] -impl Subscribe for FileInputStream { +impl Subscribe for FileInputStream { async fn ready(&mut self) { if let ReadState::Idle = self.state { // The guest hasn't initiated any read, but is nonetheless waiting @@ -377,10 +347,10 @@ impl Subscribe for FileInputStream { const DEFAULT_READ_SIZE: usize = 4096; let p = self.position; - self.state = ReadState::Waiting( - self.file - .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)), - ); + self.state = + ReadState::Waiting(self.file.spawn_blocking::(move |f| { + Self::blocking_read(f, p, DEFAULT_READ_SIZE) + })); } self.wait_ready().await @@ -393,10 +363,11 @@ pub(crate) enum FileOutputMode { Append, } -pub(crate) struct FileOutputStream { +pub(crate) struct FileOutputStream { file: File, mode: FileOutputMode, state: OutputState, + _executor: std::marker::PhantomData, } enum OutputState { @@ -409,12 +380,13 @@ enum OutputState { Closed, } -impl FileOutputStream { +impl FileOutputStream { pub fn write_at(file: &File, position: u64) -> Self { Self { file: file.clone(), mode: FileOutputMode::Position(position), state: OutputState::Ready, + _executor: std::marker::PhantomData, } } @@ -423,6 +395,7 @@ impl FileOutputStream { file: file.clone(), mode: FileOutputMode::Append, state: OutputState::Ready, + _executor: std::marker::PhantomData, } } @@ -468,7 +441,7 @@ impl FileOutputStream { const FILE_WRITE_CAPACITY: usize = 1024 * 1024; #[async_trait::async_trait] -impl HostOutputStream for FileOutputStream { +impl HostOutputStream for FileOutputStream { fn write(&mut self, buf: Bytes) -> Result<(), StreamError> { match self.state { OutputState::Ready => {} @@ -484,7 +457,7 @@ impl HostOutputStream for FileOutputStream { let m = self.mode; self.state = OutputState::Waiting( self.file - .spawn_blocking(move |f| Self::blocking_write(f, buf, m)), + .spawn_blocking::(move |f| Self::blocking_write(f, buf, m)), ); Ok(()) } @@ -506,7 +479,7 @@ impl HostOutputStream for FileOutputStream { let m = self.mode; match self .file - .run_blocking(move |f| Self::blocking_write(f, buf, m)) + .run_blocking::(move |f| Self::blocking_write(f, buf, m)) .await { Ok(nwritten) => { @@ -567,7 +540,7 @@ impl HostOutputStream for FileOutputStream { } #[async_trait::async_trait] -impl Subscribe for FileOutputStream { +impl Subscribe for FileOutputStream { async fn ready(&mut self) { if let OutputState::Waiting(task) = &mut self.state { self.state = match task.await { diff --git a/crates/wasi/src/host/filesystem.rs b/crates/wasi/src/host/filesystem.rs index 3520127fc6e4..9dac52856c55 100644 --- a/crates/wasi/src/host/filesystem.rs +++ b/crates/wasi/src/host/filesystem.rs @@ -7,7 +7,8 @@ use crate::bindings::io::streams::{InputStream, OutputStream}; use crate::filesystem::{ Descriptor, Dir, File, FileInputStream, FileOutputStream, OpenMode, ReaddirIterator, }; -use crate::{DirPerms, FilePerms, FsError, FsResult, WasiExecutor, WasiImpl, WasiView}; +use crate::runtime::WasiExecutor; +use crate::{DirPerms, FilePerms, FsError, FsResult, WasiImpl, WasiView}; use anyhow::Context; use wasmtime::component::Resource; @@ -80,7 +81,7 @@ where }; let f = self.table().get(&fd)?.file()?; - f.run_blocking(move |f| f.advise(offset, len, advice)) + f.run_blocking::(move |f| f.advise(offset, len, advice)) .await?; Ok(()) } @@ -90,7 +91,7 @@ where match descriptor { Descriptor::File(f) => { - match f.run_blocking(|f| f.sync_data()).await { + match f.run_blocking::(|f| f.sync_data()).await { Ok(()) => Ok(()), // On windows, `sync_data` uses `FileFlushBuffers` which fails with // `ERROR_ACCESS_DENIED` if the file is not upen for writing. Ignore @@ -106,8 +107,10 @@ where } } Descriptor::Dir(d) => { - d.run_blocking(|d| Ok(d.open(std::path::Component::CurDir)?.sync_data()?)) - .await + d.run_blocking::(|d| { + Ok(d.open(std::path::Component::CurDir)?.sync_data()?) + }) + .await } } } @@ -136,7 +139,9 @@ where let descriptor = self.table().get(&fd)?; match descriptor { Descriptor::File(f) => { - let flags = f.run_blocking(|f| f.get_fd_flags()).await?; + let flags = f + .run_blocking::(|f| f.get_fd_flags()) + .await?; let mut flags = get_from_fdflags(flags); if f.open_mode.contains(OpenMode::READ) { flags |= DescriptorFlags::READ; @@ -147,7 +152,9 @@ where Ok(flags) } Descriptor::Dir(d) => { - let flags = d.run_blocking(|d| d.get_fd_flags()).await?; + let flags = d + .run_blocking::(|d| d.get_fd_flags()) + .await?; let mut flags = get_from_fdflags(flags); if d.open_mode.contains(OpenMode::READ) { flags |= DescriptorFlags::READ; @@ -168,7 +175,9 @@ where match descriptor { Descriptor::File(f) => { - let meta = f.run_blocking(|f| f.metadata()).await?; + let meta = f + .run_blocking::(|f| f.metadata()) + .await?; Ok(descriptortype_from(meta.file_type())) } Descriptor::Dir(_) => Ok(types::DescriptorType::Directory), @@ -184,7 +193,8 @@ where if !f.perms.contains(FilePerms::WRITE) { Err(ErrorCode::NotPermitted)?; } - f.run_blocking(move |f| f.set_len(size)).await?; + f.run_blocking::(move |f| f.set_len(size)) + .await?; Ok(()) } @@ -204,7 +214,8 @@ where } let atim = systemtimespec_from(atim)?; let mtim = systemtimespec_from(mtim)?; - f.run_blocking(|f| f.set_times(atim, mtim)).await?; + f.run_blocking::(|f| f.set_times(atim, mtim)) + .await?; Ok(()) } Descriptor::Dir(d) => { @@ -213,7 +224,8 @@ where } let atim = systemtimespec_from(atim)?; let mtim = systemtimespec_from(mtim)?; - d.run_blocking(|d| d.set_times(atim, mtim)).await?; + d.run_blocking::(|d| d.set_times(atim, mtim)) + .await?; Ok(()) } } @@ -236,7 +248,7 @@ where } let (mut buffer, r) = f - .run_blocking(move |f| { + .run_blocking::(move |f| { let mut buffer = vec![0; len.try_into().unwrap_or(usize::MAX)]; let r = f.read_vectored_at(&mut [IoSliceMut::new(&mut buffer)], offset); (buffer, r) @@ -273,7 +285,9 @@ where } let bytes_written = f - .run_blocking(move |f| f.write_vectored_at(&[IoSlice::new(&buf)], offset)) + .run_blocking::(move |f| { + f.write_vectored_at(&[IoSlice::new(&buf)], offset) + }) .await?; Ok(types::Filesize::try_from(bytes_written).expect("usize fits in Filesize")) @@ -300,7 +314,7 @@ where } let entries = d - .run_blocking(|d| { + .run_blocking::(|d| { // Both `entries` and `metadata` perform syscalls, which is why they are done // within this `block` call, rather than delay calculating the metadata // for entries when they're demanded later in the iterator chain. @@ -349,7 +363,7 @@ where match descriptor { Descriptor::File(f) => { - match f.run_blocking(|f| f.sync_all()).await { + match f.run_blocking::(|f| f.sync_all()).await { Ok(()) => Ok(()), // On windows, `sync_data` uses `FileFlushBuffers` which fails with // `ERROR_ACCESS_DENIED` if the file is not upen for writing. Ignore @@ -365,8 +379,10 @@ where } } Descriptor::Dir(d) => { - d.run_blocking(|d| Ok(d.open(std::path::Component::CurDir)?.sync_all()?)) - .await + d.run_blocking::(|d| { + Ok(d.open(std::path::Component::CurDir)?.sync_all()?) + }) + .await } } } @@ -381,7 +397,8 @@ where if !d.perms.contains(DirPerms::MUTATE) { return Err(ErrorCode::NotPermitted.into()); } - d.run_blocking(move |d| d.create_dir(&path)).await?; + d.run_blocking::(move |d| d.create_dir(&path)) + .await?; Ok(()) } @@ -390,12 +407,16 @@ where match descriptor { Descriptor::File(f) => { // No permissions check on stat: if opened, allowed to stat it - let meta = f.run_blocking(|f| f.metadata()).await?; + let meta = f + .run_blocking::(|f| f.metadata()) + .await?; Ok(descriptorstat_from(meta)) } Descriptor::Dir(d) => { // No permissions check on stat: if opened, allowed to stat it - let meta = d.run_blocking(|d| d.dir_metadata()).await?; + let meta = d + .run_blocking::(|d| d.dir_metadata()) + .await?; Ok(descriptorstat_from(meta)) } } @@ -414,9 +435,11 @@ where } let meta = if symlink_follow(path_flags) { - d.run_blocking(move |d| d.metadata(&path)).await? + d.run_blocking::(move |d| d.metadata(&path)) + .await? } else { - d.run_blocking(move |d| d.symlink_metadata(&path)).await? + d.run_blocking::(move |d| d.symlink_metadata(&path)) + .await? }; Ok(descriptorstat_from(meta)) } @@ -439,7 +462,7 @@ where let atim = systemtimespec_from(atim)?; let mtim = systemtimespec_from(mtim)?; if symlink_follow(path_flags) { - d.run_blocking(move |d| { + d.run_blocking::(move |d| { d.set_times( &path, atim.map(cap_fs_ext::SystemTimeSpec::from_std), @@ -448,7 +471,7 @@ where }) .await?; } else { - d.run_blocking(move |d| { + d.run_blocking::(move |d| { d.set_symlink_times( &path, atim.map(cap_fs_ext::SystemTimeSpec::from_std), @@ -483,7 +506,9 @@ where } let new_dir_handle = std::sync::Arc::clone(&new_dir.dir); old_dir - .run_blocking(move |d| d.hard_link(&old_path, &new_dir_handle, &new_path)) + .run_blocking::(move |d| { + d.hard_link(&old_path, &new_dir_handle, &new_path) + }) .await?; Ok(()) } @@ -500,7 +525,6 @@ where use system_interface::fs::{FdFlags, GetSetFdFlags}; use types::{DescriptorFlags, OpenFlags}; - let allow_blocking_current_thread = self.ctx().allow_blocking_current_thread; let table = self.table(); let d = table.get(&fd)?.dir()?; if !d.perms.contains(DirPerms::READ) { @@ -593,7 +617,7 @@ where } let opened = d - .run_blocking::<_, std::io::Result>(move |d| { + .run_blocking::>(move |d| { let mut opened = d.open_with(&path, &opts)?; if opened.metadata()?.is_dir() { Ok(OpenResult::Dir(cap_std::fs::Dir::from_std_file( @@ -617,15 +641,11 @@ where d.perms, d.file_perms, open_mode, - allow_blocking_current_thread, )))?), - OpenResult::File(file) => Ok(table.push(Descriptor::File(File::new( - file, - d.file_perms, - open_mode, - allow_blocking_current_thread, - )))?), + OpenResult::File(file) => { + Ok(table.push(Descriptor::File(File::new(file, d.file_perms, open_mode)))?) + } OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()), } @@ -654,7 +674,9 @@ where if !d.perms.contains(DirPerms::READ) { return Err(ErrorCode::NotPermitted.into()); } - let link = d.run_blocking(move |d| d.read_link(&path)).await?; + let link = d + .run_blocking::(move |d| d.read_link(&path)) + .await?; Ok(link .into_os_string() .into_string() @@ -671,7 +693,10 @@ where if !d.perms.contains(DirPerms::MUTATE) { return Err(ErrorCode::NotPermitted.into()); } - Ok(d.run_blocking(move |d| d.remove_dir(&path)).await?) + Ok( + d.run_blocking::(move |d| d.remove_dir(&path)) + .await?, + ) } async fn rename_at( @@ -692,7 +717,9 @@ where } let new_dir_handle = std::sync::Arc::clone(&new_dir.dir); Ok(old_dir - .run_blocking(move |d| d.rename(&old_path, &new_dir_handle, &new_path)) + .run_blocking::(move |d| { + d.rename(&old_path, &new_dir_handle, &new_path) + }) .await?) } @@ -711,8 +738,10 @@ where if !d.perms.contains(DirPerms::MUTATE) { return Err(ErrorCode::NotPermitted.into()); } - Ok(d.run_blocking(move |d| d.symlink(&src_path, &dest_path)) - .await?) + Ok( + d.run_blocking::(move |d| d.symlink(&src_path, &dest_path)) + .await?, + ) } async fn unlink_file_at( @@ -727,8 +756,10 @@ where if !d.perms.contains(DirPerms::MUTATE) { return Err(ErrorCode::NotPermitted.into()); } - Ok(d.run_blocking(move |d| d.remove_file_or_symlink(&path)) - .await?) + Ok( + d.run_blocking::(move |d| d.remove_file_or_symlink(&path)) + .await?, + ) } fn read_via_stream( @@ -744,7 +775,7 @@ where } // Create a stream view for it. - let reader: InputStream = Box::new(FileInputStream::new(f, offset)); + let reader: InputStream = Box::new(FileInputStream::::new(f, offset)); // Insert the stream view into the table. Trap if the table is full. let index = self.table().push(reader)?; @@ -765,7 +796,7 @@ where } // Create a stream view for it. - let writer = FileOutputStream::write_at(f, offset); + let writer = FileOutputStream::::write_at(f, offset); let writer: OutputStream = Box::new(writer); // Insert the stream view into the table. Trap if the table is full. @@ -786,7 +817,7 @@ where } // Create a stream view for it. - let appender = FileOutputStream::append(f); + let appender = FileOutputStream::::append(f); let appender: OutputStream = Box::new(appender); // Insert the stream view into the table. Trap if the table is full. @@ -802,9 +833,9 @@ where ) -> anyhow::Result { use cap_fs_ext::MetadataExt; let descriptor_a = self.table().get(&a)?; - let meta_a = get_descriptor_metadata(descriptor_a).await?; + let meta_a = get_descriptor_metadata::(descriptor_a).await?; let descriptor_b = self.table().get(&b)?; - let meta_b = get_descriptor_metadata(descriptor_b).await?; + let meta_b = get_descriptor_metadata::(descriptor_b).await?; if meta_a.dev() == meta_b.dev() && meta_a.ino() == meta_b.ino() { // MetadataHashValue does not derive eq, so use a pair of // comparisons to check equality: @@ -827,7 +858,7 @@ where fd: Resource, ) -> FsResult { let descriptor_a = self.table().get(&fd)?; - let meta = get_descriptor_metadata(descriptor_a).await?; + let meta = get_descriptor_metadata::(descriptor_a).await?; Ok(calculate_metadata_hash(&meta)) } async fn metadata_hash_at( @@ -840,7 +871,7 @@ where let d = table.get(&fd)?.dir()?; // No permissions check on metadata: if dir opened, allowed to stat it let meta = d - .run_blocking(move |d| { + .run_blocking::(move |d| { if symlink_follow(path_flags) { d.metadata(path) } else { @@ -871,15 +902,17 @@ where } } -async fn get_descriptor_metadata(fd: &types::Descriptor) -> FsResult { +async fn get_descriptor_metadata( + fd: &types::Descriptor, +) -> FsResult { match fd { Descriptor::File(f) => { // No permissions check on metadata: if opened, allowed to stat it - Ok(f.run_blocking(|f| f.metadata()).await?) + Ok(f.run_blocking::(|f| f.metadata()).await?) } Descriptor::Dir(d) => { // No permissions check on metadata: if opened, allowed to stat it - Ok(d.run_blocking(|d| d.dir_metadata()).await?) + Ok(d.run_blocking::(|d| d.dir_metadata()).await?) } } } diff --git a/crates/wasi/src/host/filesystem/sync.rs b/crates/wasi/src/host/filesystem/sync.rs index 85925144863e..6d837dfc9e9b 100644 --- a/crates/wasi/src/host/filesystem/sync.rs +++ b/crates/wasi/src/host/filesystem/sync.rs @@ -1,8 +1,8 @@ use crate::bindings::filesystem::types as async_filesystem; use crate::bindings::sync::filesystem::types as sync_filesystem; use crate::bindings::sync::io::streams; -use crate::runtime::in_tokio; -use crate::{FsError, FsResult, WasiImpl, WasiSyncExecutor, WasiView}; +use crate::runtime::{in_tokio, WasiSyncExecutor}; +use crate::{FsError, FsResult, WasiImpl, WasiView}; use wasmtime::component::Resource; impl sync_filesystem::Host for WasiImpl diff --git a/crates/wasi/src/host/tcp.rs b/crates/wasi/src/host/tcp.rs index 160cb8a757c0..b9fdd1fa8aad 100644 --- a/crates/wasi/src/host/tcp.rs +++ b/crates/wasi/src/host/tcp.rs @@ -69,6 +69,7 @@ where Ok(()) } + // FIXME: make this async fn fn finish_connect( &mut self, this: Resource, @@ -76,7 +77,7 @@ where let table = self.table(); let socket = table.get_mut(&this)?; - let (input, output) = socket.finish_connect()?; + let (input, output) = socket.finish_connect::()?; let input_stream = self.table().push_child(input, &this)?; let output_stream = self.table().push_child(output, &this)?; @@ -110,7 +111,7 @@ where let table = self.table(); let socket = table.get_mut(&this)?; - let (tcp_socket, input, output) = socket.accept()?; + let (tcp_socket, input, output) = socket.accept::()?; let tcp_socket = self.table().push(tcp_socket)?; let input_stream = self.table().push_child(input, &tcp_socket)?; @@ -297,7 +298,7 @@ where ShutdownType::Send => std::net::Shutdown::Write, ShutdownType::Both => std::net::Shutdown::Both, }; - socket.shutdown(how) + socket.shutdown::(how) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { diff --git a/crates/wasi/src/ip_name_lookup.rs b/crates/wasi/src/ip_name_lookup.rs index e013cefb01fc..c85a44a7b3ef 100644 --- a/crates/wasi/src/ip_name_lookup.rs +++ b/crates/wasi/src/ip_name_lookup.rs @@ -2,7 +2,7 @@ use crate::bindings::sockets::ip_name_lookup::{Host, HostResolveAddressStream}; use crate::bindings::sockets::network::{ErrorCode, IpAddress, Network}; use crate::host::network::util; use crate::poll::{subscribe, Pollable, Subscribe}; -use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle}; +use crate::runtime::{AbortOnDropJoinHandle, WasiExecutor}; use crate::{SocketError, WasiImpl, WasiView}; use anyhow::Result; use std::mem; @@ -22,6 +22,7 @@ pub enum ResolveAddressStream { impl Host for WasiImpl where T: WasiView, + T::Executor: WasiExecutor, { fn resolve_addresses( &mut self, @@ -36,7 +37,7 @@ where return Err(ErrorCode::PermanentResolverFailure.into()); } - let task = spawn_blocking(move || blocking_resolve(&host)); + let task = ::Executor::spawn_blocking(move || blocking_resolve(&host)); let resource = self.table().push(ResolveAddressStream::Waiting(task))?; Ok(resource) } diff --git a/crates/wasi/src/lib.rs b/crates/wasi/src/lib.rs index 0a7534c96781..173fc720f536 100644 --- a/crates/wasi/src/lib.rs +++ b/crates/wasi/src/lib.rs @@ -208,7 +208,7 @@ mod udp; mod write_stream; pub use self::clocks::{HostMonotonicClock, HostWallClock}; -pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiExecutor, WasiImpl, WasiSyncExecutor, WasiView}; +pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiImpl, WasiView}; pub use self::error::{I32Exit, TrappableError}; pub use self::filesystem::{DirPerms, FileInputStream, FilePerms, FsError, FsResult}; pub use self::network::{Network, SocketAddrUse, SocketError, SocketResult}; @@ -386,7 +386,7 @@ pub fn add_to_linker_with_options_async( pub fn add_to_linker_sync(linker: &mut wasmtime::component::Linker) -> anyhow::Result<()> where T: WasiView, - ::Executor: WasiSyncExecutor, + ::Executor: runtime::WasiSyncExecutor, { let options = crate::bindings::sync::LinkOptions::default(); add_to_linker_with_options_sync(linker, &options) @@ -399,7 +399,7 @@ pub fn add_to_linker_with_options_sync( ) -> anyhow::Result<()> where T: WasiView, - ::Executor: WasiSyncExecutor, + ::Executor: runtime::WasiSyncExecutor, { let l = linker; let closure = type_annotate::(|t| WasiImpl(t)); diff --git a/crates/wasi/src/pipe.rs b/crates/wasi/src/pipe.rs index e1fe36ce5217..96f2f61bbfb0 100644 --- a/crates/wasi/src/pipe.rs +++ b/crates/wasi/src/pipe.rs @@ -8,6 +8,7 @@ //! but the virtual pipes can be instantiated with any `Read` or `Write` type. //! use crate::poll::Subscribe; +use crate::runtime::WasiExecutor; use crate::{HostInputStream, HostOutputStream, StreamError}; use anyhow::anyhow; use bytes::Bytes; @@ -119,9 +120,11 @@ pub struct AsyncReadStream { impl AsyncReadStream { /// Create a [`AsyncReadStream`]. In order to use the [`HostInputStream`] impl /// provided by this struct, the argument must impl [`tokio::io::AsyncRead`]. - pub fn new(mut reader: T) -> Self { + pub fn new( + mut reader: T, + ) -> Self { let (sender, receiver) = mpsc::channel(1); - let join_handle = crate::runtime::spawn(async move { + let join_handle = E::spawn(async move { loop { use tokio::io::AsyncReadExt; let mut buf = bytes::BytesMut::with_capacity(4096); @@ -281,6 +284,7 @@ impl Subscribe for ClosedOutputStream { #[cfg(test)] mod test { use super::*; + use crate::runtime::Tokio; use std::time::Duration; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -322,7 +326,7 @@ mod test { #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn empty_read_stream() { - let mut reader = AsyncReadStream::new(tokio::io::empty()); + let mut reader = AsyncReadStream::new::<_, Tokio>(tokio::io::empty()); // In a multi-threaded context, the value of state is not deterministic -- the spawned // reader task may run on a different thread. @@ -342,7 +346,7 @@ mod test { #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn infinite_read_stream() { - let mut reader = AsyncReadStream::new(tokio::io::repeat(0)); + let mut reader = AsyncReadStream::new::<_, Tokio>(tokio::io::repeat(0)); let bs = reader.read(10).unwrap(); if bs.is_empty() { @@ -470,7 +474,7 @@ mod test { // behavior at some point, but this test shows the behavior as it is implemented: async fn backpressure_read_stream() { let (r, mut w) = simplex(16 * 1024); // Make sure this buffer isn't a bottleneck - let mut reader = AsyncReadStream::new(r); + let mut reader = AsyncReadStream::new::<_, Tokio>(r); let writer_task = tokio::task::spawn(async move { // Write twice as much as we can buffer up in an AsyncReadStream: diff --git a/crates/wasi/src/preview0.rs b/crates/wasi/src/preview0.rs index 3081f8db753e..93e8aa146847 100644 --- a/crates/wasi/src/preview0.rs +++ b/crates/wasi/src/preview0.rs @@ -7,7 +7,7 @@ use crate::preview0::types::Error; use crate::preview1::types as snapshot1_types; use crate::preview1::wasi_snapshot_preview1::WasiSnapshotPreview1 as Snapshot1; use crate::preview1::WasiP1Ctx; -use crate::{WasiExecutor, WasiSyncExecutor}; +use crate::runtime::{WasiExecutor, WasiSyncExecutor}; use wiggle::{GuestError, GuestMemory, GuestPtr}; pub fn add_to_linker_async( diff --git a/crates/wasi/src/preview1.rs b/crates/wasi/src/preview1.rs index 354c6bce2639..b1259fd24e6a 100644 --- a/crates/wasi/src/preview1.rs +++ b/crates/wasi/src/preview1.rs @@ -72,9 +72,9 @@ use crate::bindings::{ filesystem::{preopens::Host as _, types as filesystem}, io::streams, }; +use crate::runtime::{WasiExecutor, WasiSyncExecutor}; use crate::{ - FsError, IsATTY, ResourceTable, StreamError, StreamResult, WasiCtx, WasiExecutor, WasiImpl, - WasiSyncExecutor, WasiView, + FsError, IsATTY, ResourceTable, StreamError, StreamResult, WasiCtx, WasiImpl, WasiView, }; use anyhow::{bail, Context}; use std::collections::{BTreeMap, HashSet}; @@ -607,19 +607,8 @@ impl WasiP1Ctx { (false, FdWrite::At(pos)) => f.write_at(&buf, pos), (false, FdWrite::AtCur) => f.write_at(&buf, pos), }; - - let nwritten = match f.as_blocking_file() { - // If we can block then skip the copy out of wasm memory and - // write directly to `f`. - Some(f) => do_write(f, &memory.as_cow(buf)?), - // ... otherwise copy out of wasm memory and use - // `spawn_blocking` to do this write in a thread that can - // block. - None => { - let buf = memory.to_vec(buf)?; - f.run_blocking(move |f| do_write(f, &buf)).await - } - }; + let buf = memory.to_vec(buf)?; + let nwritten = f.run_blocking::(move |f| do_write(f, &buf)).await; let nwritten = nwritten.map_err(|e| StreamError::LastOperationFailed(e.into()))?; @@ -1641,32 +1630,20 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx let pos = position.load(Ordering::Relaxed); let file = self.table().get(&fd)?.file()?; let iov = first_non_empty_iovec(memory, iovs)?; - let bytes_read = match (file.as_blocking_file(), memory.as_slice_mut(iov)?) { - // Try to read directly into wasm memory where possible - // when the current thread can block and additionally wasm - // memory isn't shared. - (Some(file), Some(mut buf)) => file - .read_at(&mut buf, pos) - .map_err(|e| StreamError::LastOperationFailed(e.into()))?, - // ... otherwise fall back to performing the read on a - // blocking thread and which copies the data back into wasm - // memory. - (_, buf) => { - drop(buf); - let mut buf = vec![0; iov.len() as usize]; - let buf = file - .run_blocking(move |file| -> Result<_, types::Error> { - let bytes_read = file - .read_at(&mut buf, pos) - .map_err(|e| StreamError::LastOperationFailed(e.into()))?; - buf.truncate(bytes_read); - Ok(buf) - }) - .await?; - let iov = iov.get_range(0..u32::try_from(buf.len())?).unwrap(); - memory.copy_from_slice(&buf, iov)?; - buf.len() - } + let bytes_read = { + let mut buf = vec![0; iov.len() as usize]; + let buf = file + .run_blocking::(move |file| -> Result<_, types::Error> { + let bytes_read = file + .read_at(&mut buf, pos) + .map_err(|e| StreamError::LastOperationFailed(e.into()))?; + buf.truncate(bytes_read); + Ok(buf) + }) + .await?; + let iov = iov.get_range(0..u32::try_from(buf.len())?).unwrap(); + memory.copy_from_slice(&buf, iov)?; + buf.len() }; let pos = pos @@ -2280,9 +2257,9 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx if !clocksub .flags .contains(types::Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME) - && self.ctx().allow_blocking_current_thread { - std::thread::sleep(std::time::Duration::from_nanos(clocksub.timeout)); + tokio::time::sleep(std::time::Duration::from_nanos(clocksub.timeout).into()) + .await; memory.write( events, types::Event { diff --git a/crates/wasi/src/runtime.rs b/crates/wasi/src/runtime.rs index 8115f8daa59c..b0eea4871d53 100644 --- a/crates/wasi/src/runtime.rs +++ b/crates/wasi/src/runtime.rs @@ -19,87 +19,149 @@ //! Each of these facilities should be used by dependencies of wasmtime-wasi //! which when implementing component bindings. +mod task; + use std::future::Future; use std::pin::Pin; use std::sync::LazyLock; use std::task::{Context, Poll}; -pub(crate) static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_time() - .enable_io() - .build() - .unwrap() -}); +pub use task::AbortOnDropJoinHandle; -/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when -/// the handle is dropped. -/// -/// This behavior makes it easier to tie a worker task to the lifetime of a Resource -/// by keeping this handle owned by the Resource. -#[derive(Debug)] -pub struct AbortOnDropJoinHandle(tokio::task::JoinHandle); -impl AbortOnDropJoinHandle { - /// Abort the task and wait for it to finish. Optionally returns the result - /// of the task if it ran to completion prior to being aborted. - pub(crate) async fn cancel(mut self) -> Option { - self.0.abort(); - - match (&mut self.0).await { - Ok(value) => Some(value), - Err(err) if err.is_cancelled() => None, - Err(err) => std::panic::resume_unwind(err.into_panic()), - } - } +pub trait WasiExecutor: Send + Sync + 'static { + /// Configures whether or not blocking operations made through this + /// `WasiExecutor` are allowed to block the current thread. + /// + /// Both `WasiExecutor` impls are is currently built on top of the Rust + /// [Tokio](https://tokio.rs/) library. While most WASI APIs are + /// non-blocking, some are instead blocking from the perspective of + /// WebAssembly. For example opening a file is a blocking operation with + /// respect to WebAssembly but it's implemented as an asynchronous operation + /// on the host. This is currently done with Tokio's + /// [`spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html). + /// + /// When WebAssembly is used in a synchronous context, for example when + /// [`Config::async_support`] is disabled, then this asynchronous operation + /// is quickly turned back into a synchronous operation with a `block_on` in + /// Rust (specifically the `WasiSyncExecutor::block_on`, which is a proxy + /// for tokio's in the provided `Standalone` impl). This switching + /// back-and-forth between a blocking a non-blocking context can have + /// overhead, and this option exists to help alleviate this overhead. + /// + /// This option indicates that for WASI functions that are blocking from the + /// perspective of WebAssembly it's ok to block the native thread as well. + /// This means that this back-and-forth between async and sync won't happen + /// and instead blocking operations are performed on-thread (such as opening + /// a file). This can improve the performance of WASI operations when async + /// support is disabled. + /// + /// [`Config::async_support`]: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.async_support + fn run_blocking(body: F) -> impl std::future::Future + Send + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; + + fn spawn(f: F) -> AbortOnDropJoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static; + + fn spawn_blocking(f: F) -> AbortOnDropJoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; } -impl Drop for AbortOnDropJoinHandle { - fn drop(&mut self) { - self.0.abort() - } + +pub trait WasiSyncExecutor: WasiExecutor { + fn block_on(f: F) -> F::Output + where + F: Future; } -impl std::ops::Deref for AbortOnDropJoinHandle { - type Target = tokio::task::JoinHandle; - fn deref(&self) -> &Self::Target { - &self.0 + +pub struct Tokio; +impl WasiExecutor for Tokio { + async fn run_blocking(body: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + Self::spawn_blocking(move || body()).await } -} -impl std::ops::DerefMut for AbortOnDropJoinHandle { - fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle { - &mut self.0 + + fn spawn(f: F) -> AbortOnDropJoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let j = tokio::task::spawn(f); + AbortOnDropJoinHandle::from(j) } -} -impl From> for AbortOnDropJoinHandle { - fn from(jh: tokio::task::JoinHandle) -> Self { - AbortOnDropJoinHandle(jh) + + fn spawn_blocking(f: F) -> AbortOnDropJoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let j = tokio::task::spawn_blocking(f); + AbortOnDropJoinHandle::from(j) } } -impl Future for AbortOnDropJoinHandle { - type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.as_mut().0).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")), - } +pub struct Standalone; +impl WasiExecutor for Standalone { + /// Execute the blocking `body` function. + /// + /// This implementation runs the blocking `body` directly on the current + /// thread. In this case the `async` signature of this method is + /// effectively a lie and the returned Future will always be immediately + /// Ready. + /// + /// Intentionally blocking the executor thread might seem unorthodox, but + /// is not actually a problem for specific workloads. See: + /// - [`crate::WasiExecutor::run_blocking`] + /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973) + /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190) + async fn run_blocking(body: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + body() } -} -pub fn spawn(f: F) -> AbortOnDropJoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f)); - AbortOnDropJoinHandle(j) -} + fn spawn(f: F) -> AbortOnDropJoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f)); + AbortOnDropJoinHandle::from(j) + } -pub fn spawn_blocking(f: F) -> AbortOnDropJoinHandle -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f)); - AbortOnDropJoinHandle(j) + fn spawn_blocking(f: F) -> AbortOnDropJoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f)); + AbortOnDropJoinHandle::from(j) + } } +impl WasiSyncExecutor for Standalone { + fn block_on(f: F) -> F::Output + where + F: Future, + { + in_tokio(f) + } +} + +pub(crate) static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .unwrap() +}); pub fn in_tokio(f: F) -> F::Output { match tokio::runtime::Handle::try_current() { diff --git a/crates/wasi/src/tcp.rs b/crates/wasi/src/tcp.rs index 5b05ebb25339..c0f62d1d0f80 100644 --- a/crates/wasi/src/tcp.rs +++ b/crates/wasi/src/tcp.rs @@ -1,7 +1,7 @@ use crate::bindings::sockets::tcp::ErrorCode; use crate::host::network; use crate::network::SocketAddressFamily; -use crate::runtime::{with_ambient_tokio_runtime, AbortOnDropJoinHandle}; +use crate::runtime::{with_ambient_tokio_runtime, AbortOnDropJoinHandle, WasiExecutor}; use crate::{ HostInputStream, HostOutputStream, InputStream, OutputStream, SocketError, SocketResult, StreamError, Subscribe, @@ -263,11 +263,12 @@ impl TcpSocket { Ok(()) } - pub fn finish_connect(&mut self) -> SocketResult<(InputStream, OutputStream)> { + pub fn finish_connect(&mut self) -> SocketResult<(InputStream, OutputStream)> { let previous_state = std::mem::replace(&mut self.tcp_state, TcpState::Closed); let result = match previous_state { TcpState::ConnectReady(result) => result, TcpState::Connecting(mut future) => { + // FIXME: this function needs to become async! let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); match with_ambient_tokio_runtime(|| future.as_mut().poll(&mut cx)) { Poll::Ready(result) => result, @@ -294,7 +295,7 @@ impl TcpSocket { writer: writer.clone(), }; let input: InputStream = Box::new(TcpReadStream(reader)); - let output: OutputStream = Box::new(TcpWriteStream(writer)); + let output: OutputStream = Box::new(TcpWriteStream::::new(writer)); Ok((input, output)) } Err(err) => { @@ -360,7 +361,8 @@ impl TcpSocket { } } - pub fn accept(&mut self) -> SocketResult<(Self, InputStream, OutputStream)> { + // FIXME: make this async fn as well + pub fn accept(&mut self) -> SocketResult<(Self, InputStream, OutputStream)> { let TcpState::Listening { listener, pending_accept, @@ -369,6 +371,7 @@ impl TcpSocket { return Err(ErrorCode::InvalidState.into()); }; + // FIXME: await on this properly let result = match pending_accept.take() { Some(result) => result, None => { @@ -446,7 +449,7 @@ impl TcpSocket { let writer = Arc::new(Mutex::new(TcpWriter::new(client.clone()))); let input: InputStream = Box::new(TcpReadStream(reader.clone())); - let output: OutputStream = Box::new(TcpWriteStream(writer.clone())); + let output: OutputStream = Box::new(TcpWriteStream::::new(writer.clone())); let tcp_socket = TcpSocket::from_state( TcpState::Connected { stream: client, @@ -638,7 +641,7 @@ impl TcpSocket { Ok(()) } - pub fn shutdown(&self, how: Shutdown) -> SocketResult<()> { + pub fn shutdown(&self, how: Shutdown) -> SocketResult<()> { let TcpState::Connected { reader, writer, .. } = &self.tcp_state else { return Err(ErrorCode::InvalidState.into()); }; @@ -648,7 +651,7 @@ impl TcpSocket { } if let Shutdown::Both | Shutdown::Write = how { - try_lock_for_socket(writer)?.shutdown(); + try_lock_for_socket(writer)?.shutdown::(); } Ok(()) @@ -800,11 +803,11 @@ impl TcpWriter { /// Write `bytes` in a background task, remembering the task handle for use in a future call to /// `write_ready` - fn background_write(&mut self, mut bytes: bytes::Bytes) { + fn background_write(&mut self, mut bytes: bytes::Bytes) { assert!(matches!(self.state, WriteState::Ready)); let stream = self.stream.clone(); - self.state = WriteState::Writing(crate::runtime::spawn(async move { + self.state = WriteState::Writing(E::spawn(async move { // Note: we are not using the AsyncWrite impl here, and instead using the TcpStream // primitive try_write, which goes directly to attempt a write with mio. This has // two advantages: 1. this operation takes a &TcpStream instead of a &mut TcpStream @@ -825,7 +828,7 @@ impl TcpWriter { })); } - fn write(&mut self, mut bytes: bytes::Bytes) -> Result<(), StreamError> { + fn write(&mut self, mut bytes: bytes::Bytes) -> Result<(), StreamError> { match self.state { WriteState::Ready => {} WriteState::Closed => return Err(StreamError::Closed), @@ -844,7 +847,7 @@ impl TcpWriter { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // As `try_write` indicated that it would have blocked, we'll perform the write // in the background to allow us to return immediately. - self.background_write(bytes); + self.background_write::(bytes); return Ok(()); } @@ -899,7 +902,7 @@ impl TcpWriter { Ok(SOCKET_READY_SIZE) } - fn shutdown(&mut self) { + fn shutdown(&mut self) { self.state = match mem::replace(&mut self.state, WriteState::Closed) { // No write in progress, immediately shut down: WriteState::Ready => { @@ -910,7 +913,7 @@ impl TcpWriter { // Schedule the shutdown after the current write has finished: WriteState::Writing(write) => { let stream = self.stream.clone(); - WriteState::Closing(crate::runtime::spawn(async move { + WriteState::Closing(E::spawn(async move { let result = write.await; native_shutdown(&stream, Shutdown::Write); result @@ -951,31 +954,43 @@ impl TcpWriter { } } -struct TcpWriteStream(Arc>); +struct TcpWriteStream { + writer: Arc>, + _executor: std::marker::PhantomData, +} + +impl TcpWriteStream { + pub fn new(writer: Arc>) -> Self { + Self { + writer, + _executor: std::marker::PhantomData, + } + } +} #[async_trait::async_trait] -impl HostOutputStream for TcpWriteStream { +impl HostOutputStream for TcpWriteStream { fn write(&mut self, bytes: bytes::Bytes) -> Result<(), StreamError> { - try_lock_for_stream(&self.0)?.write(bytes) + try_lock_for_stream(&self.writer)?.write::(bytes) } fn flush(&mut self) -> Result<(), StreamError> { - try_lock_for_stream(&self.0)?.flush() + try_lock_for_stream(&self.writer)?.flush() } fn check_write(&mut self) -> Result { - try_lock_for_stream(&self.0)?.check_write() + try_lock_for_stream(&self.writer)?.check_write() } async fn cancel(&mut self) { - self.0.lock().await.cancel().await + self.writer.lock().await.cancel().await } } #[async_trait::async_trait] -impl Subscribe for TcpWriteStream { +impl Subscribe for TcpWriteStream { async fn ready(&mut self) { - self.0.lock().await.ready().await + self.writer.lock().await.ready().await } } diff --git a/crates/wasi/src/write_stream.rs b/crates/wasi/src/write_stream.rs index fe3658662ff2..f7b60e8dfa13 100644 --- a/crates/wasi/src/write_stream.rs +++ b/crates/wasi/src/write_stream.rs @@ -1,3 +1,4 @@ +use crate::runtime::WasiExecutor; use crate::{HostOutputStream, StreamError, Subscribe}; use anyhow::anyhow; use bytes::Bytes; @@ -145,14 +146,14 @@ pub struct AsyncWriteStream { impl AsyncWriteStream { /// Create a [`AsyncWriteStream`]. In order to use the [`HostOutputStream`] impl /// provided by this struct, the argument must impl [`tokio::io::AsyncWrite`]. - pub fn new( + pub fn new( write_budget: usize, writer: T, ) -> Self { let worker = Arc::new(Worker::new(write_budget)); let w = Arc::clone(&worker); - let join_handle = crate::runtime::spawn(async move { w.work(writer).await }); + let join_handle = E::spawn(async move { w.work(writer).await }); AsyncWriteStream { worker, From ec66dbcb6c72df1261b059ec9bb6f4b962a25a61 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 13 Jan 2025 16:53:38 -0800 Subject: [PATCH 5/5] code motion: AbortOnDropJoinHandle lives in a mod just for now at some point soon this will get some other abstraction so we aren't tied directly to tokio for tasks --- crates/wasi/src/runtime/task.rs | 53 +++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 crates/wasi/src/runtime/task.rs diff --git a/crates/wasi/src/runtime/task.rs b/crates/wasi/src/runtime/task.rs new file mode 100644 index 000000000000..389998b2f6ce --- /dev/null +++ b/crates/wasi/src/runtime/task.rs @@ -0,0 +1,53 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when +/// the handle is dropped. +/// +/// This behavior makes it easier to tie a worker task to the lifetime of a Resource +/// by keeping this handle owned by the Resource. +#[derive(Debug)] +pub struct AbortOnDropJoinHandle(tokio::task::JoinHandle); +impl AbortOnDropJoinHandle { + /// Abort the task and wait for it to finish. Optionally returns the result + /// of the task if it ran to completion prior to being aborted. + pub(crate) async fn cancel(mut self) -> Option { + self.0.abort(); + + match (&mut self.0).await { + Ok(value) => Some(value), + Err(err) if err.is_cancelled() => None, + Err(err) => std::panic::resume_unwind(err.into_panic()), + } + } +} +impl Drop for AbortOnDropJoinHandle { + fn drop(&mut self) { + self.0.abort() + } +} +impl std::ops::Deref for AbortOnDropJoinHandle { + type Target = tokio::task::JoinHandle; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl std::ops::DerefMut for AbortOnDropJoinHandle { + fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle { + &mut self.0 + } +} +impl From> for AbortOnDropJoinHandle { + fn from(jh: tokio::task::JoinHandle) -> Self { + AbortOnDropJoinHandle(jh) + } +} +impl Future for AbortOnDropJoinHandle { + type Output = T; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.as_mut().0).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")), + } + } +}