diff --git a/.github/workflows/wasmtime.yml b/.github/workflows/wasmtime.yml index cf56be6..1eba807 100644 --- a/.github/workflows/wasmtime.yml +++ b/.github/workflows/wasmtime.yml @@ -43,16 +43,16 @@ jobs: - name: Install wasmtime (ubuntu) if: startsWith(matrix.os, 'ubuntu-') run: | - curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasmtime-v14.0.4-x86_64-linux.tar.xz - tar xvJf wasmtime-v14.0.4-x86_64-linux.tar.xz - echo "WASMTIME=$(pwd -P)/wasmtime-v14.0.4-x86_64-linux/wasmtime" >> ${GITHUB_ENV} + curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasmtime-v15.0.1-x86_64-linux.tar.xz + tar xvJf wasmtime-v15.0.1-x86_64-linux.tar.xz + echo "WASMTIME=$(pwd -P)/wasmtime-v15.0.1-x86_64-linux/wasmtime" >> ${GITHUB_ENV} - name: Install wasmtime (macOS) if: startsWith(matrix.os, 'macos-') run: | - curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasmtime-v14.0.4-x86_64-macos.tar.xz - tar xvzf wasmtime-v14.0.4-x86_64-macos.tar.xz - echo "WASMTIME=$(pwd -P)/wasmtime-v14.0.4-x86_64-macos/wasmtime" >> ${GITHUB_ENV} + curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasmtime-v15.0.1-x86_64-macos.tar.xz + tar xvzf wasmtime-v15.0.1-x86_64-macos.tar.xz + echo "WASMTIME=$(pwd -P)/wasmtime-v15.0.1-x86_64-macos/wasmtime" >> ${GITHUB_ENV} - name: Build guest (Rust) run: | diff --git a/.gitignore b/.gitignore index 9cea445..bd292a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ */target/* -*.lock \ No newline at end of file diff --git a/guest_c/build.sh b/guest_c/build.sh index 3185e5d..3afa19d 100755 --- a/guest_c/build.sh +++ b/guest_c/build.sh @@ -1,7 +1,7 @@ #! /bin/sh set -e if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then - curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasi_snapshot_preview1.reactor.wasm + curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasi_snapshot_preview1.reactor.wasm fi if [ ! -d libjpeg ]; then diff --git a/guest_c/main.c b/guest_c/main.c index 336b9c0..f01dc16 100644 --- a/guest_c/main.c +++ b/guest_c/main.c @@ -156,24 +156,39 @@ exports_wasi_sensor_interface_main() sensing_borrow_pool_t borrowed_pool = wasi_buffer_pool_buffer_pool_borrow_pool(pool); + sensing_own_pollable_t poll = + wasi_buffer_pool_buffer_pool_method_pool_subscribe( + borrowed_pool); + sensing_borrow_pollable_t borrowed_poll = + wasi_io_0_2_0_rc_2023_11_10_poll_borrow_pollable(poll); int n = 60; int i; - for (i = 0; i < n; i++) { - wasi_buffer_pool_buffer_pool_frame_info_t frame; - if (!wasi_buffer_pool_buffer_pool_method_pool_block_read_frame( - borrowed_pool, &frame, &buffer_error)) { + for (i = 0; i < n;) { + wasi_io_0_2_0_rc_2023_11_10_poll_method_pollable_block( + borrowed_poll); + sensing_list_wasi_buffer_pool_buffer_pool_frame_info_t frames; + if (!wasi_buffer_pool_buffer_pool_method_pool_read_frames( + borrowed_pool, 1, &frames, &buffer_error)) { fprintf(stderr, "block-read-frame failed (error %u)\n", (unsigned int)buffer_error); return false; } - fprintf(stderr, "got a frame (%u/%u)\n", i + 1, n); - if (!process_frame_info(&frame)) { - return false; + size_t j; + for (j = 0; j < frames.len; j++) { + wasi_buffer_pool_buffer_pool_frame_info_t *frame = + &frames.ptr[j]; + i++; + fprintf(stderr, "got a frame (%u/%u)\n", i, n); + if (!process_frame_info(frame)) { + return false; + } } - wasi_buffer_pool_buffer_pool_frame_info_free(&frame); + sensing_list_wasi_buffer_pool_buffer_pool_frame_info_free( + &frames); } fprintf(stderr, "cleaning up\n"); + wasi_io_0_2_0_rc_2023_11_10_poll_pollable_drop_own(poll); wasi_sensor_sensor_device_drop_own(device); wasi_buffer_pool_buffer_pool_pool_drop_own(pool); diff --git a/guest_rust/build.sh b/guest_rust/build.sh index da793b9..98e3482 100755 --- a/guest_rust/build.sh +++ b/guest_rust/build.sh @@ -1,7 +1,7 @@ #! /bin/sh set -e if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then - curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasi_snapshot_preview1.reactor.wasm + curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasi_snapshot_preview1.reactor.wasm fi release_opt=--release diff --git a/guest_rust/src/lib.rs b/guest_rust/src/lib.rs index f81f2fc..0ad481c 100644 --- a/guest_rust/src/lib.rs +++ b/guest_rust/src/lib.rs @@ -194,9 +194,16 @@ fn main2() -> Result<()> { println!("starting sensor {:?}", sensor); sensor.start(&pool_name)?; - for _ in 0..60 { - let frame = pool.block_read_frame()?; - process_frame(&frame)?; + let poll = pool.subscribe(); + let mut n = 0; + while n < 60 { + poll.block(); + let frames = pool.read_frames(1)?; + assert!(frames.len() == 1); + for ref frame in &frames { + process_frame(frame)?; + n += 1; + } } let stats = pool.get_statistics()?; println!("pool statistics: {:?}", stats); diff --git a/host_wasmtime/Cargo.toml b/host_wasmtime/Cargo.toml index 31c724d..26053cb 100644 --- a/host_wasmtime/Cargo.toml +++ b/host_wasmtime/Cargo.toml @@ -11,6 +11,8 @@ fraction = "0.14" tracing = { version = "0.1.40", features = ["max_level_trace"] } tracing-subscriber = "0.3.18" image = { version = "0.24.7", default-features = false } +async-trait = "0.1.71" +tokio = { version = "1.26.0", default-features = false } # released versions of nokhwa is a bit broken for avfoundation. # https://github.com/l1npengtul/nokhwa/pull/151 @@ -18,5 +20,5 @@ image = { version = "0.24.7", default-features = false } nokhwa = {git = "https://github.com/yamt/nokhwa", rev = "0.10+fixes", features = ["input-native", "output-threaded"]} # preview2 and component-model are still moving targets. -wasmtime = { version = "14.0.4", default-features = false, features = ["component-model", "cranelift"]} -wasmtime-wasi = { version = "14.0.4", default-features = false, features = ["preview2", "sync"] } +wasmtime = { version = "15.0.1", default-features = false, features = ["component-model", "cranelift"]} +wasmtime-wasi = { version = "15.0.1", default-features = false, features = ["preview2", "sync"] } diff --git a/host_wasmtime/src/dummy_device.rs b/host_wasmtime/src/dummy_device.rs index 67b49a6..887fad2 100644 --- a/host_wasmtime/src/dummy_device.rs +++ b/host_wasmtime/src/dummy_device.rs @@ -91,7 +91,7 @@ impl SensorDevice for DummyDevice { thread::sleep(next_frame - now); } next_frame += config.frame_duration; - match pool.enqueue(Box::new(data), None) { + match pool.try_enqueue(Box::new(data), None) { Ok(_) => println!("DummyDevice generated frame enqueued"), _ => println!("DummyDevice generated frame dropped"), } diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs index cf72760..7066013 100644 --- a/host_wasmtime/src/main.rs +++ b/host_wasmtime/src/main.rs @@ -10,6 +10,8 @@ use wasmtime::{Config, Engine, Store}; use wasmtime_wasi::ambient_authority; use wasmtime_wasi::preview2::DirPerms; use wasmtime_wasi::preview2::FilePerms; +use wasmtime_wasi::preview2::Pollable; +use wasmtime_wasi::preview2::Subscribe; use wasmtime_wasi::preview2::Table; use wasmtime_wasi::preview2::WasiCtx; use wasmtime_wasi::preview2::WasiCtxBuilder; @@ -32,6 +34,7 @@ wasmtime::component::bindgen!({ with: { "wasi:buffer-pool/buffer-pool/pool": Pool, "wasi:sensor/sensor/device": Device, + "wasi:io/poll": wasmtime_wasi::preview2::bindings::io::poll, }, }); @@ -42,8 +45,23 @@ trait WasiSensorView { pub struct Pool { name: String, + next_frame: Option<(u64, u64, Box)>, pool: Arc, } + +#[async_trait::async_trait] +impl Subscribe for Pool { + async fn ready(&mut self) { + if self.next_frame.is_some() { + return; + } + // XXX this confuses the flow-control in the pool by 1 frame + let frame = self.pool.dequeue().await; + assert!(self.next_frame.is_none()); /* XXX */ + self.next_frame = Some(frame); + } +} + pub struct Device { device: Box, } @@ -93,44 +111,49 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T { > { let pool = SimplePool::new(mode, size as usize, buffer_num as usize)?; let pool = Arc::new(pool); - let idx = self.table().push_resource(Pool { + let idx = self.table().push(Pool { name: name.clone(), + next_frame: None, pool: pool.clone(), })?; self.pools().insert(name, pool); Ok(Ok(idx)) } - fn block_read_frame( + fn read_frames( &mut self, res: Resource, + max_results: u32, ) -> Result< Result< - wasi::buffer_pool::buffer_pool::FrameInfo, + Vec, wasi::buffer_pool::buffer_pool::BufferError, >, > { - let pool = self.table().get_resource_mut(&res)?; - let (sequence_number, timestamp, data) = pool.pool.dequeue(); + let pool = self.table().get_mut(&res)?; + if max_results == 0 { + return Ok(Ok(vec![])); + } + let (sequence_number, timestamp, data) = match pool.next_frame.take() { + Some(frame) => frame, + None => match pool.pool.try_dequeue() { + Some(frame) => frame, + None => return Ok(Ok(vec![])), + }, + }; let frame = wasi::buffer_pool::buffer_pool::FrameInfo { sequence_number: sequence_number, timestamp: timestamp, data: vec![*data], }; - Ok(Ok(frame)) + Ok(Ok(vec![frame])) } - fn poll_read_frame( + + fn subscribe( &mut self, res: Resource, - ) -> Result< - Result< - wasi::buffer_pool::buffer_pool::FrameInfo, - wasi::buffer_pool::buffer_pool::BufferError, - >, - > { - Ok(Err( - wasi::buffer_pool::buffer_pool::BufferError::NotSupported, - )) + ) -> Result> { + wasmtime_wasi::preview2::subscribe(self.table(), res) } fn get_statistics( @@ -142,7 +165,7 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T { wasi::buffer_pool::buffer_pool::BufferError, >, > { - let pool = self.table().get_resource(&res)?; + let pool = self.table().get(&res)?; let stats = pool.pool.get_statistics()?; Ok(Ok(stats)) } @@ -151,9 +174,9 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T { &mut self, res: Resource, ) -> wasmtime::Result<()> { - let pool = self.table().get_resource(&res)?; + let pool = self.table().get(&res)?; let name = pool.name.clone(); - self.table().delete_resource(res)?; + self.table().delete(res)?; self.pools().remove(&name); Ok(()) } @@ -180,7 +203,7 @@ impl wasi::sensor::sensor::HostDevice for T { let device = Device { device: device_impl, }; - let idx = self.table().push_resource(device)?; + let idx = self.table().push(device)?; Ok(Ok(idx)) } @@ -198,7 +221,7 @@ impl wasi::sensor::sensor::HostDevice for T { _ => return Ok(Err(wasi::sensor::sensor::DeviceError::NotFound)), }; let pool = Arc::clone(pool); - let device = self.table().get_resource_mut(&res)?; + let device = self.table().get_mut(&res)?; Ok(device.device.start_streaming(pool)) } fn stop( @@ -213,7 +236,7 @@ impl wasi::sensor::sensor::HostDevice for T { key: wasi::sensor::property::PropertyKey, value: wasi::sensor::property::PropertyValue, ) -> Result> { - let device = self.table().get_resource_mut(&res)?; + let device = self.table().get_mut(&res)?; Ok(device.device.set_property(key, value)) } fn get_property( @@ -222,12 +245,12 @@ impl wasi::sensor::sensor::HostDevice for T { key: wasi::sensor::property::PropertyKey, ) -> Result> { - let device = self.table().get_resource_mut(&res)?; + let device = self.table().get_mut(&res)?; Ok(device.device.get_property(key)) } fn drop(&mut self, res: Resource) -> wasmtime::Result<()> { trace!("dropping {:?}", res); - self.table().delete_resource(res)?; + self.table().delete(res)?; Ok(()) } } diff --git a/host_wasmtime/src/nokhwa.rs b/host_wasmtime/src/nokhwa.rs index 9bd900c..fdcf77d 100644 --- a/host_wasmtime/src/nokhwa.rs +++ b/host_wasmtime/src/nokhwa.rs @@ -90,7 +90,7 @@ impl SensorDevice for NokhwaDevice { let data = wasi::buffer_pool::buffer_pool::FrameData::ByValue( wasi::buffer_pool::data_types::DataType::Image(image), ); - match pool.enqueue(Box::new(data), None) { + match pool.try_enqueue(Box::new(data), None) { Ok(_) => println!("NokhwaDevice frame enqueued"), _ => println!("NokhwaDevice frame dropped"), } diff --git a/host_wasmtime/src/pool.rs b/host_wasmtime/src/pool.rs index c73f607..bdc4ec1 100644 --- a/host_wasmtime/src/pool.rs +++ b/host_wasmtime/src/pool.rs @@ -1,10 +1,11 @@ use anyhow::Error; use anyhow::Result; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::Receiver; -use std::sync::mpsc::SyncSender; -use std::sync::Mutex; use std::time::Instant; +use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; use super::*; use traits::BufferPool; @@ -16,7 +17,7 @@ use wasi::buffer_pool::buffer_pool::PoolStatistics; struct SimplePoolSequencer { sequence_number: u64, boottime: Instant, - sender: SyncSender<(u64, u64, Box)>, + sender: Sender<(u64, u64, Box)>, /* stats */ enqueued: u64, @@ -41,7 +42,7 @@ impl SimplePool { BufferingMode::BufferingDiscard => mode, _ => return Err(BufferError::NotSupported), }; - let (sender, receiver) = sync_channel(num); + let (sender, receiver) = channel(num); Ok(Self { sequencer: Mutex::new(SimplePoolSequencer { sequence_number: 0, @@ -58,9 +59,10 @@ impl SimplePool { } } +#[async_trait::async_trait] impl BufferPool for SimplePool { - fn enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error> { - let mut seq = self.sequencer.lock().unwrap(); + fn try_enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error> { + let mut seq = self.sequencer.blocking_lock(); let timestamp = match timestamp { Some(t) => t, _ => seq.boottime.elapsed().as_nanos() as u64, @@ -77,17 +79,33 @@ impl BufferPool for SimplePool { } Ok(()) } - fn dequeue(&self) -> (u64, u64, Box) { - let mut receiver = self.receiver.lock().unwrap(); - receiver.dequeued += 1; - receiver.receiver.recv().unwrap() + fn try_dequeue(&self) -> Option<(u64, u64, Box)> { + let mut receiver = self.receiver.blocking_lock(); + match receiver.receiver.try_recv() { + Ok(frame) => { + receiver.dequeued += 1; + Some(frame) + } + Err(TryRecvError::Empty) => None, + _ => panic!("disconnected"), + } + } + async fn dequeue(&self) -> (u64, u64, Box) { + let mut receiver = self.receiver.lock().await; + match receiver.receiver.recv().await { + Some(frame) => { + receiver.dequeued += 1; + frame + } + _ => panic!("disconnected"), + } } fn get_statistics(&self) -> Result { - let seq = self.sequencer.lock().unwrap(); + let seq = self.sequencer.blocking_lock(); let enqueued = seq.enqueued; let dropped = seq.dropped; drop(seq); - let receiver = self.receiver.lock().unwrap(); + let receiver = self.receiver.blocking_lock(); let dequeued = receiver.dequeued; drop(receiver); Ok(PoolStatistics { diff --git a/host_wasmtime/src/traits.rs b/host_wasmtime/src/traits.rs index 0ed3dcf..15b026c 100644 --- a/host_wasmtime/src/traits.rs +++ b/host_wasmtime/src/traits.rs @@ -8,9 +8,11 @@ use wasi::sensor::property::PropertyKey; use wasi::sensor::property::PropertyValue; use wasi::sensor::sensor::DeviceError; +#[async_trait::async_trait] pub trait BufferPool { - fn enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error>; - fn dequeue(&self) -> (u64, u64, Box); + fn try_enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error>; + fn try_dequeue(&self) -> Option<(u64, u64, Box)>; + async fn dequeue(&self) -> (u64, u64, Box); fn get_statistics(&self) -> Result; } diff --git a/sensing.md b/sensing.md index a954551..a27be68 100644 --- a/sensing.md +++ b/sensing.md @@ -5,6 +5,7 @@
  • interface wasi:buffer-pool/data-types
  • interface wasi:sensor/property
  • interface wasi:sensor/sensor
  • +
  • interface wasi:io/poll@0.2.0-rc-2023-11-10
  • interface wasi:buffer-pool/buffer-pool
  • @@ -231,12 +232,65 @@ this might power on the device.

    +

    Import interface wasi:io/poll@0.2.0-rc-2023-11-10

    +

    A poll API intended to let users wait for I/O events on multiple handles +at once.

    +
    +

    Types

    +

    resource pollable

    +

    pollable represents a single I/O event which may be ready, or not.

    +

    Functions

    +

    [method]pollable.ready: func

    +

    Return the readiness of a pollable. This function never blocks.

    +

    Returns true when the pollable is ready, and false otherwise.

    +
    Params
    + +
    Return values
    +
      +
    • bool
    • +
    +

    [method]pollable.block: func

    +

    block returns immediately if the pollable is ready, and otherwise +blocks until ready.

    +

    This function is equivalent to calling poll.poll on a list +containing only this pollable.

    +
    Params
    + +

    poll: func

    +

    Poll for completion on a set of pollables.

    +

    This function takes a list of pollables, which identify I/O sources of +interest, and waits until one or more of the events is ready for I/O.

    +

    The result list<u32> contains one or more indices of handles in the +argument list that is ready for I/O.

    +

    If the list contains more elements than can be indexed with a u32 +value, this function traps.

    +

    A timeout can be implemented by adding a pollable from the +wasi-clocks API to the list.

    +

    This function does not return a result; polling in itself does not +do any I/O so it doesn't fail. If any of the I/O sources identified by +the pollables has an error, it is indicated by marking the source as +being reaedy for I/O.

    +
    Params
    + +
    Return values
    +
      +
    • list<u32>
    • +

    Import interface wasi:buffer-pool/buffer-pool

    sensor frame/buffer management I/F


    Types

    -

    type data-type

    -

    data-type

    +

    type pollable

    +

    pollable

    +

    +#### `type data-type` +[`data-type`](#data_type)

    #### `enum buffer-error`

    Enum Cases
    @@ -381,23 +435,30 @@ for other buffering modes, this is ignored.

    -

    [method]pool.block-read-frame: func

    +

    [method]pool.read-frames: func

    +

    try to read frames. +this function returns 0 frames when

    +
      +
    • max-results = 0
    • +
    • or, no frames are immediately available
    • +
    Params
    Return values
    -

    [method]pool.poll-read-frame: func

    +

    [method]pool.subscribe: func

    Params
    Return values

    [method]pool.get-statistics: func

    Params
    diff --git a/wit/deps.lock b/wit/deps.lock new file mode 100644 index 0000000..0a05da0 --- /dev/null +++ b/wit/deps.lock @@ -0,0 +1,4 @@ +[io] +url = "https://github.com/WebAssembly/wasi-io/archive/main.tar.gz" +sha256 = "b622db2755978a49d18d35d84d75f66b2b1ed23d7bf413e5c9e152e190cc7d4b" +sha512 = "d19c9004e75bf3ebe3e34cff498c3d7fee04cd57a7fba7ed12a0c5ad842ba5715c009de77a152c57da0500f6ca0986b6791b6f022829bdd5a024f7bc114c2ff6" diff --git a/wit/deps.toml b/wit/deps.toml new file mode 100644 index 0000000..b178cb2 --- /dev/null +++ b/wit/deps.toml @@ -0,0 +1 @@ +io = "https://github.com/WebAssembly/wasi-io/archive/main.tar.gz" diff --git a/wit/deps/buffer-pool/buffer-pool.wit b/wit/deps/buffer-pool/buffer-pool.wit index e4f8468..a57f4b7 100644 --- a/wit/deps/buffer-pool/buffer-pool.wit +++ b/wit/deps/buffer-pool/buffer-pool.wit @@ -3,6 +3,7 @@ package wasi:buffer-pool; /// sensor frame/buffer management I/F interface buffer-pool { + use wasi:io/poll@0.2.0-rc-2023-11-10.{pollable}; use data-types.{data-type}; enum buffer-error { @@ -95,8 +96,13 @@ interface buffer-pool { /// name: the name of the pool. you can use this for device.start(). create: static func(mode:buffering-mode, size:u32, buffer-num:u32, name:string) ->result; - block-read-frame: func()-> result; - poll-read-frame: func()-> result; + /// try to read frames. + /// this function returns 0 frames when + /// - max-results = 0 + /// - or, no frames are immediately available + read-frames: func(max-results: u32)-> result, buffer-error>; + + subscribe: func() -> pollable; get-statistics: func()-> result; } diff --git a/wit/deps/io/error.wit b/wit/deps/io/error.wit new file mode 100644 index 0000000..31918ac --- /dev/null +++ b/wit/deps/io/error.wit @@ -0,0 +1,34 @@ +package wasi:io@0.2.0-rc-2023-11-10; + + +interface error { + /// A resource which represents some error information. + /// + /// The only method provided by this resource is `to-debug-string`, + /// which provides some human-readable information about the error. + /// + /// In the `wasi:io` package, this resource is returned through the + /// `wasi:io/streams/stream-error` type. + /// + /// To provide more specific error information, other interfaces may + /// provide functions to further "downcast" this error into more specific + /// error information. For example, `error`s returned in streams derived + /// from filesystem types to be described using the filesystem's own + /// error-code type, using the function + /// `wasi:filesystem/types/filesystem-error-code`, which takes a parameter + /// `borrow` and returns + /// `option`. + /// + /// The set of functions which can "downcast" an `error` into a more + /// concrete type is open. + resource error { + /// Returns a string that is suitable to assist humans in debugging + /// this error. + /// + /// WARNING: The returned string should not be consumed mechanically! + /// It may change across platforms, hosts, or other implementation + /// details. Parsing this string is a major platform-compatibility + /// hazard. + to-debug-string: func() -> string; + } +} diff --git a/wit/deps/io/poll.wit b/wit/deps/io/poll.wit new file mode 100644 index 0000000..81b1cab --- /dev/null +++ b/wit/deps/io/poll.wit @@ -0,0 +1,41 @@ +package wasi:io@0.2.0-rc-2023-11-10; + +/// A poll API intended to let users wait for I/O events on multiple handles +/// at once. +interface poll { + /// `pollable` represents a single I/O event which may be ready, or not. + resource pollable { + + /// Return the readiness of a pollable. This function never blocks. + /// + /// Returns `true` when the pollable is ready, and `false` otherwise. + ready: func() -> bool; + + /// `block` returns immediately if the pollable is ready, and otherwise + /// blocks until ready. + /// + /// This function is equivalent to calling `poll.poll` on a list + /// containing only this pollable. + block: func(); + } + + /// Poll for completion on a set of pollables. + /// + /// This function takes a list of pollables, which identify I/O sources of + /// interest, and waits until one or more of the events is ready for I/O. + /// + /// The result `list` contains one or more indices of handles in the + /// argument list that is ready for I/O. + /// + /// If the list contains more elements than can be indexed with a `u32` + /// value, this function traps. + /// + /// A timeout can be implemented by adding a pollable from the + /// wasi-clocks API to the list. + /// + /// This function does not return a `result`; polling in itself does not + /// do any I/O so it doesn't fail. If any of the I/O sources identified by + /// the pollables has an error, it is indicated by marking the source as + /// being reaedy for I/O. + poll: func(in: list>) -> list; +} diff --git a/wit/deps/io/streams.wit b/wit/deps/io/streams.wit new file mode 100644 index 0000000..f6f7fe0 --- /dev/null +++ b/wit/deps/io/streams.wit @@ -0,0 +1,251 @@ +package wasi:io@0.2.0-rc-2023-11-10; + +/// WASI I/O is an I/O abstraction API which is currently focused on providing +/// stream types. +/// +/// In the future, the component model is expected to add built-in stream types; +/// when it does, they are expected to subsume this API. +interface streams { + use error.{error}; + use poll.{pollable}; + + /// An error for input-stream and output-stream operations. + variant stream-error { + /// The last operation (a write or flush) failed before completion. + /// + /// More information is available in the `error` payload. + last-operation-failed(error), + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed + } + + /// An input bytestream. + /// + /// `input-stream`s are *non-blocking* to the extent practical on underlying + /// platforms. I/O operations always return promptly; if fewer bytes are + /// promptly available than requested, they return the number of bytes promptly + /// available, which could even be zero. To wait for data to be available, + /// use the `subscribe` function to obtain a `pollable` which can be polled + /// for using `wasi:io/poll`. + resource input-stream { + /// Perform a non-blocking read from the stream. + /// + /// This function returns a list of bytes containing the read data, + /// when successful. The returned list will contain up to `len` bytes; + /// it may return fewer than requested, but not more. The list is + /// empty when no bytes are available for reading at this time. The + /// pollable given by `subscribe` will be ready when more bytes are + /// available. + /// + /// This function fails with a `stream-error` when the operation + /// encounters an error, giving `last-operation-failed`, or when the + /// stream is closed, giving `closed`. + /// + /// When the caller gives a `len` of 0, it represents a request to + /// read 0 bytes. If the stream is still open, this call should + /// succeed and return an empty list, or otherwise fail with `closed`. + /// + /// The `len` parameter is a `u64`, which could represent a list of u8 which + /// is not possible to allocate in wasm32, or not desirable to allocate as + /// as a return value by the callee. The callee may return a list of bytes + /// less than `len` in size while more bytes are available for reading. + read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Read bytes from a stream, after blocking until at least one byte can + /// be read. Except for blocking, behavior is identical to `read`. + blocking-read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Skip bytes from a stream. Returns number of bytes skipped. + /// + /// Behaves identical to `read`, except instead of returning a list + /// of bytes, returns the number of bytes consumed from the stream. + skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Skip bytes from a stream, after blocking until at least one byte + /// can be skipped. Except for blocking behavior, identical to `skip`. + blocking-skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Create a `pollable` which will resolve once either the specified stream + /// has bytes available to read or the other end of the stream has been + /// closed. + /// The created `pollable` is a child resource of the `input-stream`. + /// Implementations may trap if the `input-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe: func() -> pollable; + } + + + /// An output bytestream. + /// + /// `output-stream`s are *non-blocking* to the extent practical on + /// underlying platforms. Except where specified otherwise, I/O operations also + /// always return promptly, after the number of bytes that can be written + /// promptly, which could even be zero. To wait for the stream to be ready to + /// accept data, the `subscribe` function to obtain a `pollable` which can be + /// polled for using `wasi:io/poll`. + resource output-stream { + /// Check readiness for writing. This function never blocks. + /// + /// Returns the number of bytes permitted for the next call to `write`, + /// or an error. Calling `write` with more bytes than this function has + /// permitted will trap. + /// + /// When this function returns 0 bytes, the `subscribe` pollable will + /// become ready when this function will report at least 1 byte, or an + /// error. + check-write: func() -> result; + + /// Perform a write. This function never blocks. + /// + /// Precondition: check-write gave permit of Ok(n) and contents has a + /// length of less than or equal to n. Otherwise, this function will trap. + /// + /// returns Err(closed) without writing if the stream has closed since + /// the last call to check-write provided a permit. + write: func( + contents: list + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 bytes, and then flush the stream. Block + /// until all of these operations are complete, or an error occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write`, and `flush`, and is implemented with the + /// following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while !contents.is_empty() { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, contents.len()); + /// let (chunk, rest) = contents.split_at(len); + /// this.write(chunk ); // eliding error handling + /// contents = rest; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + blocking-write-and-flush: func( + contents: list + ) -> result<_, stream-error>; + + /// Request to flush buffered output. This function never blocks. + /// + /// This tells the output-stream that the caller intends any buffered + /// output to be flushed. the output which is expected to be flushed + /// is all that has been passed to `write` prior to this call. + /// + /// Upon calling this function, the `output-stream` will not accept any + /// writes (`check-write` will return `ok(0)`) until the flush has + /// completed. The `subscribe` pollable will become ready when the + /// flush has completed and the stream can accept more writes. + flush: func() -> result<_, stream-error>; + + /// Request to flush buffered output, and block until flush completes + /// and stream is ready for writing again. + blocking-flush: func() -> result<_, stream-error>; + + /// Create a `pollable` which will resolve once the output-stream + /// is ready for more writing, or an error has occured. When this + /// pollable is ready, `check-write` will return `ok(n)` with n>0, or an + /// error. + /// + /// If the stream is closed, this pollable is always ready immediately. + /// + /// The created `pollable` is a child resource of the `output-stream`. + /// Implementations may trap if the `output-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe: func() -> pollable; + + /// Write zeroes to a stream. + /// + /// This should be used precisely like `write` with the exact same + /// preconditions (must use check-write first), but instead of + /// passing a list of bytes, you simply pass the number of zero-bytes + /// that should be written. + write-zeroes: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 zeroes, and then flush the stream. + /// Block until all of these operations are complete, or an error + /// occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with + /// the following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while num_zeroes != 0 { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, num_zeroes); + /// this.write-zeroes(len); // eliding error handling + /// num_zeroes -= len; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + blocking-write-zeroes-and-flush: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Read from one stream and write to another. + /// + /// The behavior of splice is equivelant to: + /// 1. calling `check-write` on the `output-stream` + /// 2. calling `read` on the `input-stream` with the smaller of the + /// `check-write` permitted length and the `len` provided to `splice` + /// 3. calling `write` on the `output-stream` with that read data. + /// + /// Any error reported by the call to `check-write`, `read`, or + /// `write` ends the splice and reports that error. + /// + /// This function returns the number of bytes transferred; it may be less + /// than `len`. + splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + + /// Read from one stream and write to another, with blocking. + /// + /// This is similar to `splice`, except that it blocks until the + /// `output-stream` is ready for writing, and the `input-stream` + /// is ready for reading, before performing the `splice`. + blocking-splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + } +} diff --git a/wit/deps/io/world.wit b/wit/deps/io/world.wit new file mode 100644 index 0000000..8243da2 --- /dev/null +++ b/wit/deps/io/world.wit @@ -0,0 +1,6 @@ +package wasi:io@0.2.0-rc-2023-11-10; + +world imports { + import streams; + import poll; +}