From 1984bf8df0b2dd8ff59ca52006a503c7a1ac2a5b Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Mon, 4 Dec 2023 19:24:26 +0900
Subject: [PATCH 01/20] wit: add wasi-io as a dependency
---
wit/deps.toml | 1 +
1 file changed, 1 insertion(+)
create mode 100644 wit/deps.toml
diff --git a/wit/deps.toml b/wit/deps.toml
new file mode 100644
index 0000000..b178cb2
--- /dev/null
+++ b/wit/deps.toml
@@ -0,0 +1 @@
+io = "https://github.com/WebAssembly/wasi-io/archive/main.tar.gz"
From 5e194fb430d4382a34418f88d857397cc75c4590 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Mon, 4 Dec 2023 19:26:25 +0900
Subject: [PATCH 02/20] .gitignore: remove *.lock
* i don't know why this was added in the first place.
* i want to commit deps.lock and similar files.
---
.gitignore | 1 -
1 file changed, 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index 9cea445..bd292a4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1 @@
*/target/*
-*.lock
\ No newline at end of file
From 66a96fffe26ae31b23e71f238a41c7f55e8417df Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Mon, 4 Dec 2023 19:28:12 +0900
Subject: [PATCH 03/20] run wit-deps
---
wit/deps.lock | 4 +
wit/deps/io/error.wit | 34 ++++++
wit/deps/io/poll.wit | 41 +++++++
wit/deps/io/streams.wit | 251 ++++++++++++++++++++++++++++++++++++++++
wit/deps/io/world.wit | 6 +
5 files changed, 336 insertions(+)
create mode 100644 wit/deps.lock
create mode 100644 wit/deps/io/error.wit
create mode 100644 wit/deps/io/poll.wit
create mode 100644 wit/deps/io/streams.wit
create mode 100644 wit/deps/io/world.wit
diff --git a/wit/deps.lock b/wit/deps.lock
new file mode 100644
index 0000000..0a05da0
--- /dev/null
+++ b/wit/deps.lock
@@ -0,0 +1,4 @@
+[io]
+url = "https://github.com/WebAssembly/wasi-io/archive/main.tar.gz"
+sha256 = "b622db2755978a49d18d35d84d75f66b2b1ed23d7bf413e5c9e152e190cc7d4b"
+sha512 = "d19c9004e75bf3ebe3e34cff498c3d7fee04cd57a7fba7ed12a0c5ad842ba5715c009de77a152c57da0500f6ca0986b6791b6f022829bdd5a024f7bc114c2ff6"
diff --git a/wit/deps/io/error.wit b/wit/deps/io/error.wit
new file mode 100644
index 0000000..31918ac
--- /dev/null
+++ b/wit/deps/io/error.wit
@@ -0,0 +1,34 @@
+package wasi:io@0.2.0-rc-2023-11-10;
+
+
+interface error {
+ /// A resource which represents some error information.
+ ///
+ /// The only method provided by this resource is `to-debug-string`,
+ /// which provides some human-readable information about the error.
+ ///
+ /// In the `wasi:io` package, this resource is returned through the
+ /// `wasi:io/streams/stream-error` type.
+ ///
+ /// To provide more specific error information, other interfaces may
+ /// provide functions to further "downcast" this error into more specific
+ /// error information. For example, `error`s returned in streams derived
+ /// from filesystem types to be described using the filesystem's own
+ /// error-code type, using the function
+ /// `wasi:filesystem/types/filesystem-error-code`, which takes a parameter
+ /// `borrow` and returns
+ /// `option`.
+ ///
+ /// The set of functions which can "downcast" an `error` into a more
+ /// concrete type is open.
+ resource error {
+ /// Returns a string that is suitable to assist humans in debugging
+ /// this error.
+ ///
+ /// WARNING: The returned string should not be consumed mechanically!
+ /// It may change across platforms, hosts, or other implementation
+ /// details. Parsing this string is a major platform-compatibility
+ /// hazard.
+ to-debug-string: func() -> string;
+ }
+}
diff --git a/wit/deps/io/poll.wit b/wit/deps/io/poll.wit
new file mode 100644
index 0000000..81b1cab
--- /dev/null
+++ b/wit/deps/io/poll.wit
@@ -0,0 +1,41 @@
+package wasi:io@0.2.0-rc-2023-11-10;
+
+/// A poll API intended to let users wait for I/O events on multiple handles
+/// at once.
+interface poll {
+ /// `pollable` represents a single I/O event which may be ready, or not.
+ resource pollable {
+
+ /// Return the readiness of a pollable. This function never blocks.
+ ///
+ /// Returns `true` when the pollable is ready, and `false` otherwise.
+ ready: func() -> bool;
+
+ /// `block` returns immediately if the pollable is ready, and otherwise
+ /// blocks until ready.
+ ///
+ /// This function is equivalent to calling `poll.poll` on a list
+ /// containing only this pollable.
+ block: func();
+ }
+
+ /// Poll for completion on a set of pollables.
+ ///
+ /// This function takes a list of pollables, which identify I/O sources of
+ /// interest, and waits until one or more of the events is ready for I/O.
+ ///
+ /// The result `list` contains one or more indices of handles in the
+ /// argument list that is ready for I/O.
+ ///
+ /// If the list contains more elements than can be indexed with a `u32`
+ /// value, this function traps.
+ ///
+ /// A timeout can be implemented by adding a pollable from the
+ /// wasi-clocks API to the list.
+ ///
+ /// This function does not return a `result`; polling in itself does not
+ /// do any I/O so it doesn't fail. If any of the I/O sources identified by
+ /// the pollables has an error, it is indicated by marking the source as
+ /// being reaedy for I/O.
+ poll: func(in: list>) -> list;
+}
diff --git a/wit/deps/io/streams.wit b/wit/deps/io/streams.wit
new file mode 100644
index 0000000..f6f7fe0
--- /dev/null
+++ b/wit/deps/io/streams.wit
@@ -0,0 +1,251 @@
+package wasi:io@0.2.0-rc-2023-11-10;
+
+/// WASI I/O is an I/O abstraction API which is currently focused on providing
+/// stream types.
+///
+/// In the future, the component model is expected to add built-in stream types;
+/// when it does, they are expected to subsume this API.
+interface streams {
+ use error.{error};
+ use poll.{pollable};
+
+ /// An error for input-stream and output-stream operations.
+ variant stream-error {
+ /// The last operation (a write or flush) failed before completion.
+ ///
+ /// More information is available in the `error` payload.
+ last-operation-failed(error),
+ /// The stream is closed: no more input will be accepted by the
+ /// stream. A closed output-stream will return this error on all
+ /// future operations.
+ closed
+ }
+
+ /// An input bytestream.
+ ///
+ /// `input-stream`s are *non-blocking* to the extent practical on underlying
+ /// platforms. I/O operations always return promptly; if fewer bytes are
+ /// promptly available than requested, they return the number of bytes promptly
+ /// available, which could even be zero. To wait for data to be available,
+ /// use the `subscribe` function to obtain a `pollable` which can be polled
+ /// for using `wasi:io/poll`.
+ resource input-stream {
+ /// Perform a non-blocking read from the stream.
+ ///
+ /// This function returns a list of bytes containing the read data,
+ /// when successful. The returned list will contain up to `len` bytes;
+ /// it may return fewer than requested, but not more. The list is
+ /// empty when no bytes are available for reading at this time. The
+ /// pollable given by `subscribe` will be ready when more bytes are
+ /// available.
+ ///
+ /// This function fails with a `stream-error` when the operation
+ /// encounters an error, giving `last-operation-failed`, or when the
+ /// stream is closed, giving `closed`.
+ ///
+ /// When the caller gives a `len` of 0, it represents a request to
+ /// read 0 bytes. If the stream is still open, this call should
+ /// succeed and return an empty list, or otherwise fail with `closed`.
+ ///
+ /// The `len` parameter is a `u64`, which could represent a list of u8 which
+ /// is not possible to allocate in wasm32, or not desirable to allocate as
+ /// as a return value by the callee. The callee may return a list of bytes
+ /// less than `len` in size while more bytes are available for reading.
+ read: func(
+ /// The maximum number of bytes to read
+ len: u64
+ ) -> result, stream-error>;
+
+ /// Read bytes from a stream, after blocking until at least one byte can
+ /// be read. Except for blocking, behavior is identical to `read`.
+ blocking-read: func(
+ /// The maximum number of bytes to read
+ len: u64
+ ) -> result, stream-error>;
+
+ /// Skip bytes from a stream. Returns number of bytes skipped.
+ ///
+ /// Behaves identical to `read`, except instead of returning a list
+ /// of bytes, returns the number of bytes consumed from the stream.
+ skip: func(
+ /// The maximum number of bytes to skip.
+ len: u64,
+ ) -> result;
+
+ /// Skip bytes from a stream, after blocking until at least one byte
+ /// can be skipped. Except for blocking behavior, identical to `skip`.
+ blocking-skip: func(
+ /// The maximum number of bytes to skip.
+ len: u64,
+ ) -> result;
+
+ /// Create a `pollable` which will resolve once either the specified stream
+ /// has bytes available to read or the other end of the stream has been
+ /// closed.
+ /// The created `pollable` is a child resource of the `input-stream`.
+ /// Implementations may trap if the `input-stream` is dropped before
+ /// all derived `pollable`s created with this function are dropped.
+ subscribe: func() -> pollable;
+ }
+
+
+ /// An output bytestream.
+ ///
+ /// `output-stream`s are *non-blocking* to the extent practical on
+ /// underlying platforms. Except where specified otherwise, I/O operations also
+ /// always return promptly, after the number of bytes that can be written
+ /// promptly, which could even be zero. To wait for the stream to be ready to
+ /// accept data, the `subscribe` function to obtain a `pollable` which can be
+ /// polled for using `wasi:io/poll`.
+ resource output-stream {
+ /// Check readiness for writing. This function never blocks.
+ ///
+ /// Returns the number of bytes permitted for the next call to `write`,
+ /// or an error. Calling `write` with more bytes than this function has
+ /// permitted will trap.
+ ///
+ /// When this function returns 0 bytes, the `subscribe` pollable will
+ /// become ready when this function will report at least 1 byte, or an
+ /// error.
+ check-write: func() -> result;
+
+ /// Perform a write. This function never blocks.
+ ///
+ /// Precondition: check-write gave permit of Ok(n) and contents has a
+ /// length of less than or equal to n. Otherwise, this function will trap.
+ ///
+ /// returns Err(closed) without writing if the stream has closed since
+ /// the last call to check-write provided a permit.
+ write: func(
+ contents: list
+ ) -> result<_, stream-error>;
+
+ /// Perform a write of up to 4096 bytes, and then flush the stream. Block
+ /// until all of these operations are complete, or an error occurs.
+ ///
+ /// This is a convenience wrapper around the use of `check-write`,
+ /// `subscribe`, `write`, and `flush`, and is implemented with the
+ /// following pseudo-code:
+ ///
+ /// ```text
+ /// let pollable = this.subscribe();
+ /// while !contents.is_empty() {
+ /// // Wait for the stream to become writable
+ /// pollable.block();
+ /// let Ok(n) = this.check-write(); // eliding error handling
+ /// let len = min(n, contents.len());
+ /// let (chunk, rest) = contents.split_at(len);
+ /// this.write(chunk ); // eliding error handling
+ /// contents = rest;
+ /// }
+ /// this.flush();
+ /// // Wait for completion of `flush`
+ /// pollable.block();
+ /// // Check for any errors that arose during `flush`
+ /// let _ = this.check-write(); // eliding error handling
+ /// ```
+ blocking-write-and-flush: func(
+ contents: list
+ ) -> result<_, stream-error>;
+
+ /// Request to flush buffered output. This function never blocks.
+ ///
+ /// This tells the output-stream that the caller intends any buffered
+ /// output to be flushed. the output which is expected to be flushed
+ /// is all that has been passed to `write` prior to this call.
+ ///
+ /// Upon calling this function, the `output-stream` will not accept any
+ /// writes (`check-write` will return `ok(0)`) until the flush has
+ /// completed. The `subscribe` pollable will become ready when the
+ /// flush has completed and the stream can accept more writes.
+ flush: func() -> result<_, stream-error>;
+
+ /// Request to flush buffered output, and block until flush completes
+ /// and stream is ready for writing again.
+ blocking-flush: func() -> result<_, stream-error>;
+
+ /// Create a `pollable` which will resolve once the output-stream
+ /// is ready for more writing, or an error has occured. When this
+ /// pollable is ready, `check-write` will return `ok(n)` with n>0, or an
+ /// error.
+ ///
+ /// If the stream is closed, this pollable is always ready immediately.
+ ///
+ /// The created `pollable` is a child resource of the `output-stream`.
+ /// Implementations may trap if the `output-stream` is dropped before
+ /// all derived `pollable`s created with this function are dropped.
+ subscribe: func() -> pollable;
+
+ /// Write zeroes to a stream.
+ ///
+ /// This should be used precisely like `write` with the exact same
+ /// preconditions (must use check-write first), but instead of
+ /// passing a list of bytes, you simply pass the number of zero-bytes
+ /// that should be written.
+ write-zeroes: func(
+ /// The number of zero-bytes to write
+ len: u64
+ ) -> result<_, stream-error>;
+
+ /// Perform a write of up to 4096 zeroes, and then flush the stream.
+ /// Block until all of these operations are complete, or an error
+ /// occurs.
+ ///
+ /// This is a convenience wrapper around the use of `check-write`,
+ /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with
+ /// the following pseudo-code:
+ ///
+ /// ```text
+ /// let pollable = this.subscribe();
+ /// while num_zeroes != 0 {
+ /// // Wait for the stream to become writable
+ /// pollable.block();
+ /// let Ok(n) = this.check-write(); // eliding error handling
+ /// let len = min(n, num_zeroes);
+ /// this.write-zeroes(len); // eliding error handling
+ /// num_zeroes -= len;
+ /// }
+ /// this.flush();
+ /// // Wait for completion of `flush`
+ /// pollable.block();
+ /// // Check for any errors that arose during `flush`
+ /// let _ = this.check-write(); // eliding error handling
+ /// ```
+ blocking-write-zeroes-and-flush: func(
+ /// The number of zero-bytes to write
+ len: u64
+ ) -> result<_, stream-error>;
+
+ /// Read from one stream and write to another.
+ ///
+ /// The behavior of splice is equivelant to:
+ /// 1. calling `check-write` on the `output-stream`
+ /// 2. calling `read` on the `input-stream` with the smaller of the
+ /// `check-write` permitted length and the `len` provided to `splice`
+ /// 3. calling `write` on the `output-stream` with that read data.
+ ///
+ /// Any error reported by the call to `check-write`, `read`, or
+ /// `write` ends the splice and reports that error.
+ ///
+ /// This function returns the number of bytes transferred; it may be less
+ /// than `len`.
+ splice: func(
+ /// The stream to read from
+ src: borrow,
+ /// The number of bytes to splice
+ len: u64,
+ ) -> result;
+
+ /// Read from one stream and write to another, with blocking.
+ ///
+ /// This is similar to `splice`, except that it blocks until the
+ /// `output-stream` is ready for writing, and the `input-stream`
+ /// is ready for reading, before performing the `splice`.
+ blocking-splice: func(
+ /// The stream to read from
+ src: borrow,
+ /// The number of bytes to splice
+ len: u64,
+ ) -> result;
+ }
+}
diff --git a/wit/deps/io/world.wit b/wit/deps/io/world.wit
new file mode 100644
index 0000000..8243da2
--- /dev/null
+++ b/wit/deps/io/world.wit
@@ -0,0 +1,6 @@
+package wasi:io@0.2.0-rc-2023-11-10;
+
+world imports {
+ import streams;
+ import poll;
+}
From ba781114ef538e83a2385f89bacd4872a727f395 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Mon, 4 Dec 2023 19:28:57 +0900
Subject: [PATCH 04/20] wit: add pool.subscribe
---
wit/deps/buffer-pool/buffer-pool.wit | 3 +++
1 file changed, 3 insertions(+)
diff --git a/wit/deps/buffer-pool/buffer-pool.wit b/wit/deps/buffer-pool/buffer-pool.wit
index e4f8468..c34f38d 100644
--- a/wit/deps/buffer-pool/buffer-pool.wit
+++ b/wit/deps/buffer-pool/buffer-pool.wit
@@ -3,6 +3,7 @@ package wasi:buffer-pool;
/// sensor frame/buffer management I/F
interface buffer-pool {
+ use wasi:io/poll@0.2.0-rc-2023-11-10.{pollable};
use data-types.{data-type};
enum buffer-error {
@@ -98,6 +99,8 @@ interface buffer-pool {
block-read-frame: func()-> result;
poll-read-frame: func()-> result;
+ subscribe: func() -> pollable;
+
get-statistics: func()-> result;
}
}
From 7b81464d29958954bd7c5bdca8431eb3ee49d9f6 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Mon, 4 Dec 2023 19:29:30 +0900
Subject: [PATCH 05/20] host: add pool.subscribe
---
host_wasmtime/src/main.rs | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index cf72760..bb85893 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -10,6 +10,7 @@ use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::ambient_authority;
use wasmtime_wasi::preview2::DirPerms;
use wasmtime_wasi::preview2::FilePerms;
+use wasmtime_wasi::preview2::Pollable;
use wasmtime_wasi::preview2::Table;
use wasmtime_wasi::preview2::WasiCtx;
use wasmtime_wasi::preview2::WasiCtxBuilder;
@@ -32,6 +33,7 @@ wasmtime::component::bindgen!({
with: {
"wasi:buffer-pool/buffer-pool/pool": Pool,
"wasi:sensor/sensor/device": Device,
+ "wasi:io/poll": wasmtime_wasi::preview2::bindings::io::poll,
},
});
@@ -133,6 +135,13 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
))
}
+ fn subscribe(
+ &mut self,
+ res: Resource,
+ ) -> Result> {
+ todo!()
+ }
+
fn get_statistics(
&mut self,
res: Resource,
From 510bc1a5a659b3d93dc1314bf06800e81d149ee8 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Mon, 4 Dec 2023 19:38:02 +0900
Subject: [PATCH 06/20] regen md
---
sensing.md | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 65 insertions(+), 2 deletions(-)
diff --git a/sensing.md b/sensing.md
index a954551..6f25b2a 100644
--- a/sensing.md
+++ b/sensing.md
@@ -5,6 +5,7 @@
interface wasi:buffer-pool/data-types
interface wasi:sensor/property
interface wasi:sensor/sensor
+interface wasi:io/poll@0.2.0-rc-2023-11-10
interface wasi:buffer-pool/buffer-pool
@@ -231,12 +232,65 @@ this might power on the device.
+
+A poll API intended to let users wait for I/O events on multiple handles
+at once.
+
+Types
+
+pollable
represents a single I/O event which may be ready, or not.
+Functions
+
+Return the readiness of a pollable. This function never blocks.
+Returns true
when the pollable is ready, and false
otherwise.
+Params
+
+Return values
+
+
+block
returns immediately if the pollable is ready, and otherwise
+blocks until ready.
+This function is equivalent to calling poll.poll
on a list
+containing only this pollable.
+Params
+
+
+Poll for completion on a set of pollables.
+This function takes a list of pollables, which identify I/O sources of
+interest, and waits until one or more of the events is ready for I/O.
+The result list<u32>
contains one or more indices of handles in the
+argument list that is ready for I/O.
+If the list contains more elements than can be indexed with a u32
+value, this function traps.
+A timeout can be implemented by adding a pollable from the
+wasi-clocks API to the list.
+This function does not return a result
; polling in itself does not
+do any I/O so it doesn't fail. If any of the I/O sources identified by
+the pollables has an error, it is indicated by marking the source as
+being reaedy for I/O.
+Params
+
+Return values
+
sensor frame/buffer management I/F
Types
-
-data-type
+
+pollable
+
+#### `type data-type`
+[`data-type`](#data_type)
#### `enum buffer-error`
Enum Cases
@@ -399,6 +453,15 @@ for other buffering modes, this is ignored.
+
+Params
+
+Return values
+
Params
From 8632dcae4c9f6b448476f15ebb1496ef29f047f8 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 12:19:11 +0900
Subject: [PATCH 07/20] host: use tokio mpsc
note: while this compiles, it's stiil broken.
---
host_wasmtime/Cargo.toml | 1 +
host_wasmtime/src/pool.rs | 12 ++++++------
2 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/host_wasmtime/Cargo.toml b/host_wasmtime/Cargo.toml
index 31c724d..39f25b1 100644
--- a/host_wasmtime/Cargo.toml
+++ b/host_wasmtime/Cargo.toml
@@ -11,6 +11,7 @@ fraction = "0.14"
tracing = { version = "0.1.40", features = ["max_level_trace"] }
tracing-subscriber = "0.3.18"
image = { version = "0.24.7", default-features = false }
+tokio = { version = "1.26.0", default-features = false }
# released versions of nokhwa is a bit broken for avfoundation.
# https://github.com/l1npengtul/nokhwa/pull/151
diff --git a/host_wasmtime/src/pool.rs b/host_wasmtime/src/pool.rs
index c73f607..2790208 100644
--- a/host_wasmtime/src/pool.rs
+++ b/host_wasmtime/src/pool.rs
@@ -1,10 +1,10 @@
use anyhow::Error;
use anyhow::Result;
-use std::sync::mpsc::sync_channel;
-use std::sync::mpsc::Receiver;
-use std::sync::mpsc::SyncSender;
use std::sync::Mutex;
use std::time::Instant;
+use tokio::sync::mpsc::channel;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::Sender;
use super::*;
use traits::BufferPool;
@@ -16,7 +16,7 @@ use wasi::buffer_pool::buffer_pool::PoolStatistics;
struct SimplePoolSequencer {
sequence_number: u64,
boottime: Instant,
- sender: SyncSender<(u64, u64, Box)>,
+ sender: Sender<(u64, u64, Box)>,
/* stats */
enqueued: u64,
@@ -41,7 +41,7 @@ impl SimplePool {
BufferingMode::BufferingDiscard => mode,
_ => return Err(BufferError::NotSupported),
};
- let (sender, receiver) = sync_channel(num);
+ let (sender, receiver) = channel(num);
Ok(Self {
sequencer: Mutex::new(SimplePoolSequencer {
sequence_number: 0,
@@ -80,7 +80,7 @@ impl BufferPool for SimplePool {
fn dequeue(&self) -> (u64, u64, Box) {
let mut receiver = self.receiver.lock().unwrap();
receiver.dequeued += 1;
- receiver.receiver.recv().unwrap()
+ receiver.receiver.blocking_recv().unwrap()
}
fn get_statistics(&self) -> Result {
let seq = self.sequencer.lock().unwrap();
From 4164ea1e6fa81db0185b283a829a28527c0fe428 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 12:47:43 +0900
Subject: [PATCH 08/20] wit: tweak poll-read-frame and retire block-read-frame
---
wit/deps/buffer-pool/buffer-pool.wit | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/wit/deps/buffer-pool/buffer-pool.wit b/wit/deps/buffer-pool/buffer-pool.wit
index c34f38d..a57f4b7 100644
--- a/wit/deps/buffer-pool/buffer-pool.wit
+++ b/wit/deps/buffer-pool/buffer-pool.wit
@@ -96,8 +96,11 @@ interface buffer-pool {
/// name: the name of the pool. you can use this for device.start().
create: static func(mode:buffering-mode, size:u32, buffer-num:u32, name:string) ->result;
- block-read-frame: func()-> result;
- poll-read-frame: func()-> result;
+ /// try to read frames.
+ /// this function returns 0 frames when
+ /// - max-results = 0
+ /// - or, no frames are immediately available
+ read-frames: func(max-results: u32)-> result, buffer-error>;
subscribe: func() -> pollable;
From d1a6ee115e7af089a682daa71e7d5ad80a479737 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 12:48:58 +0900
Subject: [PATCH 09/20] host: tweak poll-read-frame and retire block-read-frame
---
host_wasmtime/src/dummy_device.rs | 2 +-
host_wasmtime/src/main.rs | 27 ++++++++++-----------------
host_wasmtime/src/nokhwa.rs | 2 +-
host_wasmtime/src/pool.rs | 15 +++++++++++----
host_wasmtime/src/traits.rs | 4 ++--
5 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/host_wasmtime/src/dummy_device.rs b/host_wasmtime/src/dummy_device.rs
index 67b49a6..887fad2 100644
--- a/host_wasmtime/src/dummy_device.rs
+++ b/host_wasmtime/src/dummy_device.rs
@@ -91,7 +91,7 @@ impl SensorDevice for DummyDevice {
thread::sleep(next_frame - now);
}
next_frame += config.frame_duration;
- match pool.enqueue(Box::new(data), None) {
+ match pool.try_enqueue(Box::new(data), None) {
Ok(_) => println!("DummyDevice generated frame enqueued"),
_ => println!("DummyDevice generated frame dropped"),
}
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index bb85893..9da4e64 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -103,36 +103,29 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
Ok(Ok(idx))
}
- fn block_read_frame(
+ fn read_frames(
&mut self,
res: Resource,
+ max_results: u32,
) -> Result<
Result<
- wasi::buffer_pool::buffer_pool::FrameInfo,
+ Vec,
wasi::buffer_pool::buffer_pool::BufferError,
>,
> {
let pool = self.table().get_resource_mut(&res)?;
- let (sequence_number, timestamp, data) = pool.pool.dequeue();
+ if max_results == 0 {
+ return Ok(Ok(vec![]));
+ }
+ let Some((sequence_number, timestamp, data)) = pool.pool.try_dequeue() else {
+ return Ok(Ok(vec![]));
+ };
let frame = wasi::buffer_pool::buffer_pool::FrameInfo {
sequence_number: sequence_number,
timestamp: timestamp,
data: vec![*data],
};
- Ok(Ok(frame))
- }
- fn poll_read_frame(
- &mut self,
- res: Resource,
- ) -> Result<
- Result<
- wasi::buffer_pool::buffer_pool::FrameInfo,
- wasi::buffer_pool::buffer_pool::BufferError,
- >,
- > {
- Ok(Err(
- wasi::buffer_pool::buffer_pool::BufferError::NotSupported,
- ))
+ Ok(Ok(vec![frame]))
}
fn subscribe(
diff --git a/host_wasmtime/src/nokhwa.rs b/host_wasmtime/src/nokhwa.rs
index 9bd900c..fdcf77d 100644
--- a/host_wasmtime/src/nokhwa.rs
+++ b/host_wasmtime/src/nokhwa.rs
@@ -90,7 +90,7 @@ impl SensorDevice for NokhwaDevice {
let data = wasi::buffer_pool::buffer_pool::FrameData::ByValue(
wasi::buffer_pool::data_types::DataType::Image(image),
);
- match pool.enqueue(Box::new(data), None) {
+ match pool.try_enqueue(Box::new(data), None) {
Ok(_) => println!("NokhwaDevice frame enqueued"),
_ => println!("NokhwaDevice frame dropped"),
}
diff --git a/host_wasmtime/src/pool.rs b/host_wasmtime/src/pool.rs
index 2790208..973550c 100644
--- a/host_wasmtime/src/pool.rs
+++ b/host_wasmtime/src/pool.rs
@@ -3,6 +3,7 @@ use anyhow::Result;
use std::sync::Mutex;
use std::time::Instant;
use tokio::sync::mpsc::channel;
+use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
@@ -59,7 +60,7 @@ impl SimplePool {
}
impl BufferPool for SimplePool {
- fn enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error> {
+ fn try_enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error> {
let mut seq = self.sequencer.lock().unwrap();
let timestamp = match timestamp {
Some(t) => t,
@@ -77,10 +78,16 @@ impl BufferPool for SimplePool {
}
Ok(())
}
- fn dequeue(&self) -> (u64, u64, Box) {
+ fn try_dequeue(&self) -> Option<(u64, u64, Box)> {
let mut receiver = self.receiver.lock().unwrap();
- receiver.dequeued += 1;
- receiver.receiver.blocking_recv().unwrap()
+ match receiver.receiver.try_recv() {
+ Ok(ok) => {
+ receiver.dequeued += 1;
+ Some(ok)
+ }
+ Err(TryRecvError::Empty) => None,
+ _ => panic!("disconnected"),
+ }
}
fn get_statistics(&self) -> Result {
let seq = self.sequencer.lock().unwrap();
diff --git a/host_wasmtime/src/traits.rs b/host_wasmtime/src/traits.rs
index 0ed3dcf..1540a27 100644
--- a/host_wasmtime/src/traits.rs
+++ b/host_wasmtime/src/traits.rs
@@ -9,8 +9,8 @@ use wasi::sensor::property::PropertyValue;
use wasi::sensor::sensor::DeviceError;
pub trait BufferPool {
- fn enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error>;
- fn dequeue(&self) -> (u64, u64, Box);
+ fn try_enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error>;
+ fn try_dequeue(&self) -> Option<(u64, u64, Box)>;
fn get_statistics(&self) -> Result;
}
From 49650673234d4a60b513f37ab35bd507e719d5c6 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 12:56:47 +0900
Subject: [PATCH 10/20] guest: use read_frames
---
guest_rust/src/lib.rs | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/guest_rust/src/lib.rs b/guest_rust/src/lib.rs
index f81f2fc..d851d3c 100644
--- a/guest_rust/src/lib.rs
+++ b/guest_rust/src/lib.rs
@@ -195,8 +195,15 @@ fn main2() -> Result<()> {
println!("starting sensor {:?}", sensor);
sensor.start(&pool_name)?;
for _ in 0..60 {
- let frame = pool.block_read_frame()?;
- process_frame(&frame)?;
+ loop {
+ let frames = pool.read_frames(1)?;
+ for ref frame in &frames {
+ process_frame(frame)?;
+ }
+ if frames.len() > 0 {
+ break;
+ }
+ }
}
let stats = pool.get_statistics()?;
println!("pool statistics: {:?}", stats);
From 9e740b7f1f20bc34a44bed80ac3727eeb76983e8 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:39:04 +0900
Subject: [PATCH 11/20] guest: use poll
---
guest_rust/src/lib.rs | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/guest_rust/src/lib.rs b/guest_rust/src/lib.rs
index d851d3c..4c0f8cf 100644
--- a/guest_rust/src/lib.rs
+++ b/guest_rust/src/lib.rs
@@ -194,15 +194,15 @@ fn main2() -> Result<()> {
println!("starting sensor {:?}", sensor);
sensor.start(&pool_name)?;
+ let poll = pool.subscribe();
for _ in 0..60 {
loop {
+ poll.block();
let frames = pool.read_frames(1)?;
+ assert!(frames.len() == 1);
for ref frame in &frames {
process_frame(frame)?;
}
- if frames.len() > 0 {
- break;
- }
}
}
let stats = pool.get_statistics()?;
From bebb6b1d6532db14d8983b389f33b46e2d7a6e12 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:39:30 +0900
Subject: [PATCH 12/20] host: implement poll
---
host_wasmtime/Cargo.toml | 1 +
host_wasmtime/src/main.rs | 17 ++++++++++++++++-
host_wasmtime/src/pool.rs | 25 ++++++++++++++++++-------
host_wasmtime/src/traits.rs | 2 ++
4 files changed, 37 insertions(+), 8 deletions(-)
diff --git a/host_wasmtime/Cargo.toml b/host_wasmtime/Cargo.toml
index 39f25b1..6d36a06 100644
--- a/host_wasmtime/Cargo.toml
+++ b/host_wasmtime/Cargo.toml
@@ -11,6 +11,7 @@ fraction = "0.14"
tracing = { version = "0.1.40", features = ["max_level_trace"] }
tracing-subscriber = "0.3.18"
image = { version = "0.24.7", default-features = false }
+async-trait = "0.1.71"
tokio = { version = "1.26.0", default-features = false }
# released versions of nokhwa is a bit broken for avfoundation.
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index 9da4e64..00dffea 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -11,6 +11,7 @@ use wasmtime_wasi::ambient_authority;
use wasmtime_wasi::preview2::DirPerms;
use wasmtime_wasi::preview2::FilePerms;
use wasmtime_wasi::preview2::Pollable;
+use wasmtime_wasi::preview2::Subscribe;
use wasmtime_wasi::preview2::Table;
use wasmtime_wasi::preview2::WasiCtx;
use wasmtime_wasi::preview2::WasiCtxBuilder;
@@ -44,8 +45,21 @@ trait WasiSensorView {
pub struct Pool {
name: String,
+ next_frame: Option<(u64, u64, Box)>,
pool: Arc,
}
+
+#[async_trait::async_trait]
+impl Subscribe for Pool {
+ async fn ready(&mut self) {
+ if self.next_frame.is_some() {
+ return;
+ }
+ let frame = self.pool.dequeue().await;
+ self.next_frame = Some(frame);
+ }
+}
+
pub struct Device {
device: Box,
}
@@ -97,6 +111,7 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
let pool = Arc::new(pool);
let idx = self.table().push_resource(Pool {
name: name.clone(),
+ next_frame: None,
pool: pool.clone(),
})?;
self.pools().insert(name, pool);
@@ -132,7 +147,7 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
&mut self,
res: Resource,
) -> Result> {
- todo!()
+ wasmtime_wasi::preview2::subscribe(self.table(), res)
}
fn get_statistics(
diff --git a/host_wasmtime/src/pool.rs b/host_wasmtime/src/pool.rs
index 973550c..bdc4ec1 100644
--- a/host_wasmtime/src/pool.rs
+++ b/host_wasmtime/src/pool.rs
@@ -1,11 +1,11 @@
use anyhow::Error;
use anyhow::Result;
-use std::sync::Mutex;
use std::time::Instant;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
+use tokio::sync::Mutex;
use super::*;
use traits::BufferPool;
@@ -59,9 +59,10 @@ impl SimplePool {
}
}
+#[async_trait::async_trait]
impl BufferPool for SimplePool {
fn try_enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error> {
- let mut seq = self.sequencer.lock().unwrap();
+ let mut seq = self.sequencer.blocking_lock();
let timestamp = match timestamp {
Some(t) => t,
_ => seq.boottime.elapsed().as_nanos() as u64,
@@ -79,22 +80,32 @@ impl BufferPool for SimplePool {
Ok(())
}
fn try_dequeue(&self) -> Option<(u64, u64, Box)> {
- let mut receiver = self.receiver.lock().unwrap();
+ let mut receiver = self.receiver.blocking_lock();
match receiver.receiver.try_recv() {
- Ok(ok) => {
+ Ok(frame) => {
receiver.dequeued += 1;
- Some(ok)
+ Some(frame)
}
Err(TryRecvError::Empty) => None,
_ => panic!("disconnected"),
}
}
+ async fn dequeue(&self) -> (u64, u64, Box) {
+ let mut receiver = self.receiver.lock().await;
+ match receiver.receiver.recv().await {
+ Some(frame) => {
+ receiver.dequeued += 1;
+ frame
+ }
+ _ => panic!("disconnected"),
+ }
+ }
fn get_statistics(&self) -> Result {
- let seq = self.sequencer.lock().unwrap();
+ let seq = self.sequencer.blocking_lock();
let enqueued = seq.enqueued;
let dropped = seq.dropped;
drop(seq);
- let receiver = self.receiver.lock().unwrap();
+ let receiver = self.receiver.blocking_lock();
let dequeued = receiver.dequeued;
drop(receiver);
Ok(PoolStatistics {
diff --git a/host_wasmtime/src/traits.rs b/host_wasmtime/src/traits.rs
index 1540a27..15b026c 100644
--- a/host_wasmtime/src/traits.rs
+++ b/host_wasmtime/src/traits.rs
@@ -8,9 +8,11 @@ use wasi::sensor::property::PropertyKey;
use wasi::sensor::property::PropertyValue;
use wasi::sensor::sensor::DeviceError;
+#[async_trait::async_trait]
pub trait BufferPool {
fn try_enqueue(&self, frame: Box, timestamp: Option) -> Result<(), Error>;
fn try_dequeue(&self) -> Option<(u64, u64, Box)>;
+ async fn dequeue(&self) -> (u64, u64, Box);
fn get_statistics(&self) -> Result;
}
From a09a060e45956d9b2b9eb2ff07455288da2954da Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:44:08 +0900
Subject: [PATCH 13/20] bump wasmtime version to 15.0.1
---
.github/workflows/wasmtime.yml | 12 ++++++------
guest_c/build.sh | 2 +-
guest_rust/build.sh | 2 +-
host_wasmtime/Cargo.toml | 4 ++--
4 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/.github/workflows/wasmtime.yml b/.github/workflows/wasmtime.yml
index cf56be6..1eba807 100644
--- a/.github/workflows/wasmtime.yml
+++ b/.github/workflows/wasmtime.yml
@@ -43,16 +43,16 @@ jobs:
- name: Install wasmtime (ubuntu)
if: startsWith(matrix.os, 'ubuntu-')
run: |
- curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasmtime-v14.0.4-x86_64-linux.tar.xz
- tar xvJf wasmtime-v14.0.4-x86_64-linux.tar.xz
- echo "WASMTIME=$(pwd -P)/wasmtime-v14.0.4-x86_64-linux/wasmtime" >> ${GITHUB_ENV}
+ curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasmtime-v15.0.1-x86_64-linux.tar.xz
+ tar xvJf wasmtime-v15.0.1-x86_64-linux.tar.xz
+ echo "WASMTIME=$(pwd -P)/wasmtime-v15.0.1-x86_64-linux/wasmtime" >> ${GITHUB_ENV}
- name: Install wasmtime (macOS)
if: startsWith(matrix.os, 'macos-')
run: |
- curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasmtime-v14.0.4-x86_64-macos.tar.xz
- tar xvzf wasmtime-v14.0.4-x86_64-macos.tar.xz
- echo "WASMTIME=$(pwd -P)/wasmtime-v14.0.4-x86_64-macos/wasmtime" >> ${GITHUB_ENV}
+ curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasmtime-v15.0.1-x86_64-macos.tar.xz
+ tar xvzf wasmtime-v15.0.1-x86_64-macos.tar.xz
+ echo "WASMTIME=$(pwd -P)/wasmtime-v15.0.1-x86_64-macos/wasmtime" >> ${GITHUB_ENV}
- name: Build guest (Rust)
run: |
diff --git a/guest_c/build.sh b/guest_c/build.sh
index 3185e5d..3afa19d 100755
--- a/guest_c/build.sh
+++ b/guest_c/build.sh
@@ -1,7 +1,7 @@
#! /bin/sh
set -e
if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then
- curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasi_snapshot_preview1.reactor.wasm
+ curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasi_snapshot_preview1.reactor.wasm
fi
if [ ! -d libjpeg ]; then
diff --git a/guest_rust/build.sh b/guest_rust/build.sh
index da793b9..98e3482 100755
--- a/guest_rust/build.sh
+++ b/guest_rust/build.sh
@@ -1,7 +1,7 @@
#! /bin/sh
set -e
if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then
- curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasi_snapshot_preview1.reactor.wasm
+ curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasi_snapshot_preview1.reactor.wasm
fi
release_opt=--release
diff --git a/host_wasmtime/Cargo.toml b/host_wasmtime/Cargo.toml
index 6d36a06..26053cb 100644
--- a/host_wasmtime/Cargo.toml
+++ b/host_wasmtime/Cargo.toml
@@ -20,5 +20,5 @@ tokio = { version = "1.26.0", default-features = false }
nokhwa = {git = "https://github.com/yamt/nokhwa", rev = "0.10+fixes", features = ["input-native", "output-threaded"]}
# preview2 and component-model are still moving targets.
-wasmtime = { version = "14.0.4", default-features = false, features = ["component-model", "cranelift"]}
-wasmtime-wasi = { version = "14.0.4", default-features = false, features = ["preview2", "sync"] }
+wasmtime = { version = "15.0.1", default-features = false, features = ["component-model", "cranelift"]}
+wasmtime-wasi = { version = "15.0.1", default-features = false, features = ["preview2", "sync"] }
From 5b7cb5766d87db166cb84e74ff35a1d8aea601f3 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:45:57 +0900
Subject: [PATCH 14/20] host: update table api after wasmtime version bump
---
host_wasmtime/src/main.rs | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index 00dffea..415c0e2 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -109,7 +109,7 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
> {
let pool = SimplePool::new(mode, size as usize, buffer_num as usize)?;
let pool = Arc::new(pool);
- let idx = self.table().push_resource(Pool {
+ let idx = self.table().push(Pool {
name: name.clone(),
next_frame: None,
pool: pool.clone(),
@@ -128,7 +128,7 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
wasi::buffer_pool::buffer_pool::BufferError,
>,
> {
- let pool = self.table().get_resource_mut(&res)?;
+ let pool = self.table().get_mut(&res)?;
if max_results == 0 {
return Ok(Ok(vec![]));
}
@@ -168,9 +168,9 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
&mut self,
res: Resource,
) -> wasmtime::Result<()> {
- let pool = self.table().get_resource(&res)?;
+ let pool = self.table().get(&res)?;
let name = pool.name.clone();
- self.table().delete_resource(res)?;
+ self.table().delete(res)?;
self.pools().remove(&name);
Ok(())
}
@@ -197,7 +197,7 @@ impl wasi::sensor::sensor::HostDevice for T {
let device = Device {
device: device_impl,
};
- let idx = self.table().push_resource(device)?;
+ let idx = self.table().push(device)?;
Ok(Ok(idx))
}
@@ -215,7 +215,7 @@ impl wasi::sensor::sensor::HostDevice for T {
_ => return Ok(Err(wasi::sensor::sensor::DeviceError::NotFound)),
};
let pool = Arc::clone(pool);
- let device = self.table().get_resource_mut(&res)?;
+ let device = self.table().get_mut(&res)?;
Ok(device.device.start_streaming(pool))
}
fn stop(
@@ -230,7 +230,7 @@ impl wasi::sensor::sensor::HostDevice for T {
key: wasi::sensor::property::PropertyKey,
value: wasi::sensor::property::PropertyValue,
) -> Result> {
- let device = self.table().get_resource_mut(&res)?;
+ let device = self.table().get_mut(&res)?;
Ok(device.device.set_property(key, value))
}
fn get_property(
@@ -239,12 +239,12 @@ impl wasi::sensor::sensor::HostDevice for T {
key: wasi::sensor::property::PropertyKey,
) -> Result>
{
- let device = self.table().get_resource_mut(&res)?;
+ let device = self.table().get_mut(&res)?;
Ok(device.device.get_property(key))
}
fn drop(&mut self, res: Resource) -> wasmtime::Result<()> {
trace!("dropping {:?}", res);
- self.table().delete_resource(res)?;
+ self.table().delete(res)?;
Ok(())
}
}
From 0545d2695e1bdd48c89eae3e466e5eb84850cfbf Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:53:29 +0900
Subject: [PATCH 15/20] guest: restore the loop exit condition
---
guest_rust/src/lib.rs | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/guest_rust/src/lib.rs b/guest_rust/src/lib.rs
index 4c0f8cf..0ad481c 100644
--- a/guest_rust/src/lib.rs
+++ b/guest_rust/src/lib.rs
@@ -195,14 +195,14 @@ fn main2() -> Result<()> {
println!("starting sensor {:?}", sensor);
sensor.start(&pool_name)?;
let poll = pool.subscribe();
- for _ in 0..60 {
- loop {
- poll.block();
- let frames = pool.read_frames(1)?;
- assert!(frames.len() == 1);
- for ref frame in &frames {
- process_frame(frame)?;
- }
+ let mut n = 0;
+ while n < 60 {
+ poll.block();
+ let frames = pool.read_frames(1)?;
+ assert!(frames.len() == 1);
+ for ref frame in &frames {
+ process_frame(frame)?;
+ n += 1;
}
}
let stats = pool.get_statistics()?;
From 4c1d28066ce5ea57a1fa711cde7c97149a21a9f0 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:54:16 +0900
Subject: [PATCH 16/20] host: read-frames: don't forget to check next_frame
---
host_wasmtime/src/main.rs | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index 415c0e2..15ece70 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -132,8 +132,12 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
if max_results == 0 {
return Ok(Ok(vec![]));
}
- let Some((sequence_number, timestamp, data)) = pool.pool.try_dequeue() else {
- return Ok(Ok(vec![]));
+ let (sequence_number, timestamp, data) = match pool.next_frame.take() {
+ Some(frame) => frame,
+ None => match pool.pool.try_dequeue() {
+ Some(frame) => frame,
+ None => return Ok(Ok(vec![])),
+ },
};
let frame = wasi::buffer_pool::buffer_pool::FrameInfo {
sequence_number: sequence_number,
From 25ab9b794a7d02092a95bc8b6aa1ff582e7ac382 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 13:57:21 +0900
Subject: [PATCH 17/20] host: add a few XXX comments
---
host_wasmtime/src/main.rs | 2 ++
1 file changed, 2 insertions(+)
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index 15ece70..a1ad41d 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -55,7 +55,9 @@ impl Subscribe for Pool {
if self.next_frame.is_some() {
return;
}
+ // XXX this confuses the flow-control in the pool by 1 frame
let frame = self.pool.dequeue().await;
+ assert!(self.next_frame.is_none()); /* XXX */
self.next_frame = Some(frame);
}
}
From d9146dc25bf56519af4605c323f30213841e614f Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 14:09:18 +0900
Subject: [PATCH 18/20] guest_c: sync with the latest rust version
---
guest_c/main.c | 31 +++++++++++++++++++++++--------
1 file changed, 23 insertions(+), 8 deletions(-)
diff --git a/guest_c/main.c b/guest_c/main.c
index 336b9c0..f01dc16 100644
--- a/guest_c/main.c
+++ b/guest_c/main.c
@@ -156,24 +156,39 @@ exports_wasi_sensor_interface_main()
sensing_borrow_pool_t borrowed_pool =
wasi_buffer_pool_buffer_pool_borrow_pool(pool);
+ sensing_own_pollable_t poll =
+ wasi_buffer_pool_buffer_pool_method_pool_subscribe(
+ borrowed_pool);
+ sensing_borrow_pollable_t borrowed_poll =
+ wasi_io_0_2_0_rc_2023_11_10_poll_borrow_pollable(poll);
int n = 60;
int i;
- for (i = 0; i < n; i++) {
- wasi_buffer_pool_buffer_pool_frame_info_t frame;
- if (!wasi_buffer_pool_buffer_pool_method_pool_block_read_frame(
- borrowed_pool, &frame, &buffer_error)) {
+ for (i = 0; i < n;) {
+ wasi_io_0_2_0_rc_2023_11_10_poll_method_pollable_block(
+ borrowed_poll);
+ sensing_list_wasi_buffer_pool_buffer_pool_frame_info_t frames;
+ if (!wasi_buffer_pool_buffer_pool_method_pool_read_frames(
+ borrowed_pool, 1, &frames, &buffer_error)) {
fprintf(stderr, "block-read-frame failed (error %u)\n",
(unsigned int)buffer_error);
return false;
}
- fprintf(stderr, "got a frame (%u/%u)\n", i + 1, n);
- if (!process_frame_info(&frame)) {
- return false;
+ size_t j;
+ for (j = 0; j < frames.len; j++) {
+ wasi_buffer_pool_buffer_pool_frame_info_t *frame =
+ &frames.ptr[j];
+ i++;
+ fprintf(stderr, "got a frame (%u/%u)\n", i, n);
+ if (!process_frame_info(frame)) {
+ return false;
+ }
}
- wasi_buffer_pool_buffer_pool_frame_info_free(&frame);
+ sensing_list_wasi_buffer_pool_buffer_pool_frame_info_free(
+ &frames);
}
fprintf(stderr, "cleaning up\n");
+ wasi_io_0_2_0_rc_2023_11_10_poll_pollable_drop_own(poll);
wasi_sensor_sensor_device_drop_own(device);
wasi_buffer_pool_buffer_pool_pool_drop_own(pool);
From 12634912ef1b38fadfb23dfeb98e9e497b41fe92 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Tue, 5 Dec 2023 14:09:47 +0900
Subject: [PATCH 19/20] regen md
---
sensing.md | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git a/sensing.md b/sensing.md
index 6f25b2a..a27be68 100644
--- a/sensing.md
+++ b/sensing.md
@@ -435,23 +435,21 @@ for other buffering modes, this is ignored.
-
-Params
-
-Return values
+
+try to read frames.
+this function returns 0 frames when
-
Params
Return values
Params
From b6210f235c0e3eb83243120976be8c65ee22f95e Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi
Date: Wed, 6 Dec 2023 19:34:38 +0900
Subject: [PATCH 20/20] host_wasmtime: update table api after wasmtime version
bump
---
host_wasmtime/src/main.rs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/host_wasmtime/src/main.rs b/host_wasmtime/src/main.rs
index a1ad41d..7066013 100644
--- a/host_wasmtime/src/main.rs
+++ b/host_wasmtime/src/main.rs
@@ -165,7 +165,7 @@ impl wasi::buffer_pool::buffer_pool::HostPool for T {
wasi::buffer_pool::buffer_pool::BufferError,
>,
> {
- let pool = self.table().get_resource(&res)?;
+ let pool = self.table().get(&res)?;
let stats = pool.pool.get_statistics()?;
Ok(Ok(stats))
}