From 6b5863d28534f1c67e35e420fd05dfa863c0fe88 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 1 Sep 2023 11:04:58 -0700 Subject: [PATCH] New backpressure and flushing scheme for output-stream --- wit/streams.wit | 143 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 48 deletions(-) diff --git a/wit/streams.wit b/wit/streams.wit index 98df181..e2631f6 100644 --- a/wit/streams.wit +++ b/wit/streams.wit @@ -134,58 +134,115 @@ interface streams { /// This [represents a resource](https://github.com/WebAssembly/WASI/blob/main/docs/WitInWasi.md#Resources). type output-stream = u32 - /// Perform a non-blocking write of bytes to a stream. + /// An error for output-stream operations. /// - /// This function returns a `u64` and a `stream-status`. The `u64` indicates - /// the number of bytes from `buf` that were written, which may be less than - /// the length of `buf`. The `stream-status` indicates if further writes to - /// the stream are expected to be read. + /// Contrary to input-streams, a closed output-stream is reported using + /// an error. + enum write-error { + /// The last operation (a write or flush) failed before completion. + last-operation-failed, + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed + } + /// Check readiness for writing. This function never blocks. + /// + /// Returns the number of bytes permitted for the next call to `write`, + /// or an error. Calling `write` with more bytes than this function has + /// permitted will trap. + /// + /// When this function returns 0 bytes, the `subscribe-to-output-stream` + /// pollable will become ready when this function will report at least + /// 1 byte, or an error. + check-write: func( + this: output-stream + ) -> result + + /// Perform a write. This function never blocks. /// - /// When the returned `stream-status` is `open`, the `u64` return value may - /// be less than the length of `buf`. This indicates that no more bytes may - /// be written to the stream promptly. In that case the - /// `subscribe-to-output-stream` pollable will indicate when additional bytes - /// may be promptly written. + /// Precondition: check-write gave permit of Ok(n) and contents has a + /// length of less than or equal to n. Otherwise, this function will trap. /// - /// Writing an empty list must return a non-error result with `0` for the - /// `u64` return value, and the current `stream-status`. + /// returns Err(closed) without writing if the stream has closed since + /// the last call to check-write provided a permit. write: func( this: output-stream, - /// Data to write - buf: list - ) -> result> + contents: list + ) -> result<_, write-error> - /// Blocking write of bytes to a stream. + /// Perform a write of up to 4096 bytes, and then flush the stream. Block + /// until all of these operations are complete, or an error occurs. /// - /// This is similar to `write`, except that it blocks until at least one - /// byte can be written. - blocking-write: func( - this: output-stream, - /// Data to write - buf: list - ) -> result> + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe-to-output-stream`, `write`, and `flush`, and is implemented + /// with the following pseudo-code: + /// + /// ```text + /// let pollable = subscribe-to-output-stream(this); + /// while !contents.is_empty() { + /// // Wait for the stream to become writable + /// poll-oneoff(pollable); + /// let Ok(n) = check-write(this); // eliding error handling + /// let len = min(n, contents.len()); + /// let (chunk, rest) = contents.split_at(len); + /// write(this, chunk); // eliding error handling + /// contents = rest; + /// } + /// flush(this); + /// // Wait for completion of `flush` + /// poll-oneoff(pollable); + /// // Check for any errors that arose during `flush` + /// let _ = check-write(this); // eliding error handling + /// ``` + blocking-write-and-flush: func( + this: output-stream, + contents: list + ) -> result<_, write-error> - /// Write multiple zero-bytes to a stream. + /// Request to flush buffered output. This function never blocks. /// - /// This function returns a `u64` indicating the number of zero-bytes - /// that were written; it may be less than `len`. Equivelant to a call to - /// `write` with a list of zeroes of the given length. - write-zeroes: func( + /// This tells the output-stream that the caller intends any buffered + /// output to be flushed. the output which is expected to be flushed + /// is all that has been passed to `write` prior to this call. + /// + /// Upon calling this function, the `output-stream` will not accept any + /// writes (`check-write` will return `ok(0)`) until the flush has + /// completed. The `subscribe-to-output-stream` pollable will become ready + /// when the flush has completed and the stream can accept more writes. + flush: func( this: output-stream, - /// The number of zero-bytes to write - len: u64 - ) -> result> + ) -> result<_, write-error> + + /// Request to flush buffered output, and block until flush completes + /// and stream is ready for writing again. + blocking-flush: func( + this: output-stream, + ) -> result<_, write-error> + + /// Create a `pollable` which will resolve once the output-stream + /// is ready for more writing, or an error has occured. When this + /// pollable is ready, `check-write` will return `ok(n)` with n>0, or an + /// error. + /// + /// If the stream is closed, this pollable is always ready immediately. + /// + /// The created `pollable` is a child resource of the `output-stream`. + /// Implementations may trap if the `output-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe-to-output-stream: func(this: output-stream) -> pollable - /// Write multiple zero bytes to a stream, with blocking. + /// Write zeroes to a stream. /// - /// This is similar to `write-zeroes`, except that it blocks until at least - /// one byte can be written. Equivelant to a call to `blocking-write` with - /// a list of zeroes of the given length. - blocking-write-zeroes: func( + /// this should be used precisely like `write` with the exact same + /// preconditions (must use check-write first), but instead of + /// passing a list of bytes, you simply pass the number of zero-bytes + /// that should be written. + write-zeroes: func( this: output-stream, - /// The number of zero bytes to write + /// The number of zero-bytes to write len: u64 - ) -> result> + ) -> result<_, write-error> /// Read from one stream and write to another. /// @@ -232,16 +289,6 @@ interface streams { src: input-stream ) -> result> - /// Create a `pollable` which will resolve once either the specified stream - /// is ready to accept bytes or the `stream-state` has become closed. - /// - /// Once the stream-state is closed, this pollable is always ready - /// immediately. - /// - /// The created `pollable` is a child resource of the `output-stream`. - /// Implementations may trap if the `output-stream` is dropped before - /// all derived `pollable`s created with this function are dropped. - subscribe-to-output-stream: func(this: output-stream) -> pollable /// Dispose of the specified `output-stream`, after which it may no longer /// be used.