Skip to content

Commit

Permalink
feat(go)!: rework async I/O
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Aug 26, 2024
1 parent abaef30 commit 2a6c15c
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 1,198 deletions.
499 changes: 179 additions & 320 deletions crates/wit-bindgen-go/src/interface.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions examples/go/streams-server/cmd/streams-server-nats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import (
"os/signal"
"syscall"

"github.com/nats-io/nats.go"
server "wrpc.io/examples/go/streams-server/bindings"
"wrpc.io/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler"
wrpc "wrpc.io/go"
wrpcnats "wrpc.io/go/nats"
"github.com/nats-io/nats.go"
)

type Handler struct{}

func (Handler) Echo(ctx context.Context, req *handler.Req) (wrpc.ReceiveCompleter[[]uint64], wrpc.ReadCompleter, error) {
func (Handler) Echo(ctx context.Context, req *handler.Req) (wrpc.ReceiveCloser[[]uint64], wrpc.ReadCompleter, error) {
slog.InfoContext(ctx, "handling `wrpc-examples:streams/handler.echo`")
return req.Numbers, req.Bytes, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ func ReadFutureStatus(r ByteReader) (bool, error) {
}

// ReadFuture reads a future from `r`
func ReadFuture[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[T], error) {
func ReadFuture[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (Receiver[T], error) {
slog.Debug("reading future status byte")
ok, err := ReadFutureStatus(r)
if err != nil {
return nil, err
}
if !ok {
r, err = r.Index(path...)
r, err := r.Index(path...)
if err != nil {
return nil, fmt.Errorf("failed to get future reader: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions go/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"sync"
"sync/atomic"

wrpc "wrpc.io/go"
"github.com/nats-io/nats.go"
wrpc "wrpc.io/go"
)

// Client is a thin wrapper around *nats.Conn, which is able to serve and invoke wRPC functions
Expand Down Expand Up @@ -170,7 +170,7 @@ func (w *paramWriter) WriteByte(b byte) error {
return nil
}

func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriter, error) {
func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriteCloser, error) {
return &paramWriter{
nc: w.nc,
init: w.init,
Expand Down Expand Up @@ -231,7 +231,7 @@ func (w *resultWriter) Close() error {
return nil
}

func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriter, error) {
func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriteCloser, error) {
return &resultWriter{nc: w.nc, tx: indexPath(w.tx, path...)}, nil
}

Expand Down Expand Up @@ -304,7 +304,7 @@ func (r *streamReader) Close() (err error) {
return nil
}

func (r *streamReader) Index(path ...uint32) (wrpc.IndexReader, error) {
func (r *streamReader) Index(path ...uint32) (wrpc.IndexReadCloser, error) {
r.nestRef.Add(1)
s := indexPath(r.path, path...)
r.nestMu.Lock()
Expand Down
115 changes: 31 additions & 84 deletions go/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,6 @@ import (
"math"
)

type CompleteReader struct {
io.Reader
}

func (*CompleteReader) IsComplete() bool {
return true
}

func NewCompleteReader(r io.Reader) *CompleteReader {
return &CompleteReader{r}
}

type CompleteByteReader struct {
ByteReader
}

func (*CompleteByteReader) IsComplete() bool {
return true
}

func NewCompleteByteReader(r ByteReader) *CompleteByteReader {
return &CompleteByteReader{r}
}

type PendingReader struct {
io.Reader
}

func (*PendingReader) IsComplete() bool {
return false
}

func NewPendingReader(r io.Reader) *PendingReader {
return &PendingReader{r}
}

type PendingByteReader struct {
ByteReader
}

func (*PendingByteReader) IsComplete() bool {
return false
}

func NewPendingByteReader(r ByteReader) *PendingByteReader {
return &PendingByteReader{r}
}

type ByteStreamWriter struct {
r io.Reader
chunk []byte
Expand All @@ -71,9 +23,9 @@ func (v *ByteStreamWriter) WriteTo(w ByteWriter) (err error) {
defer func() {
if fErr := buf.Flush(); fErr != nil {
if err == nil {
err = fmt.Errorf("failed to flush writer: %w", fErr)
err = fmt.Errorf("failed to flush pending byte stream writer: %w", fErr)
} else {
slog.Warn("failed to flush writer", "err", fErr)
slog.Warn("failed to flush pending byte stream writer", "err", fErr)
}
}
}()
Expand Down Expand Up @@ -108,7 +60,7 @@ func (v *ByteStreamWriter) WriteTo(w ByteWriter) (err error) {
}

type ByteStreamReader struct {
r ByteReadCompleter
r ByteReadCloser
buf uint32
}

Expand Down Expand Up @@ -140,19 +92,11 @@ func (r *ByteStreamReader) Read(p []byte) (int, error) {
return rn, nil
}

func (r *ByteStreamReader) IsComplete() bool {
return r.r.IsComplete()
}

func (r *ByteStreamReader) Close() error {
c, ok := r.r.(io.Closer)
if ok {
return c.Close()
}
return nil
return r.r.Close()
}

func NewByteStreamReader(r ByteReadCompleter) *ByteStreamReader {
func NewByteStreamReader(r ByteReadCloser) *ByteStreamReader {
return &ByteStreamReader{
r: r,
}
Expand All @@ -177,37 +121,37 @@ func ReadStreamStatus(r ByteReader) (bool, error) {
}

// ReadByteStream reads a stream of bytes from `r`
func ReadByteStream(r IndexReader, path ...uint32) (ReadCompleter, error) {
func ReadByteStream(r IndexReader, path ...uint32) (io.Reader, error) {
slog.Debug("reading byte stream status byte")
ok, err := ReadStreamStatus(r)
if err != nil {
return nil, err
}
if !ok {
r, err = r.Index(path...)
r, err := r.Index(path...)
if err != nil {
return nil, fmt.Errorf("failed to index reader: %w", err)
}
return NewByteStreamReader(NewPendingByteReader(r)), nil
return NewByteStreamReader(r), nil
}
slog.Debug("reading ready byte stream")
buf, err := ReadByteList(r)
if err != nil {
return nil, fmt.Errorf("failed to read bytes: %w", err)
}
slog.Debug("read ready byte stream", "len", len(buf))
return NewCompleteReader(bytes.NewReader(buf)), nil
return bytes.NewReader(buf), nil
}

// ReadStream reads a stream from `r`
func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[[]T], error) {
func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (Receiver[[]T], error) {
slog.Debug("reading stream status byte")
ok, err := ReadStreamStatus(r)
if err != nil {
return nil, err
}
if !ok {
r, err = r.Index(path...)
r, err := r.Index(path...)
if err != nil {
return nil, fmt.Errorf("failed to index reader: %w", err)
}
Expand Down Expand Up @@ -239,24 +183,27 @@ func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...ui
return NewCompleteReceiver(vs), nil
}

func WriteByteStream(r ReadCompleter, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) {
if r.IsComplete() {
slog.Debug("writing byte stream `stream::ready` status byte")
if err := w.WriteByte(1); err != nil {
return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err)
}
var buf bytes.Buffer
slog.Debug("reading ready byte stream contents")
n, err := io.CopyBuffer(&buf, r, chunk)
if err != nil {
return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err)
}
slog.Debug("writing ready byte stream contents", "len", n)
return nil, WriteByteList(buf.Bytes(), w)
}
func WriteByteStream(r io.Reader, w IndexWriter, chunk []byte, path ...uint32) (err error) {
slog.Debug("writing byte stream `stream::pending` status byte")
if err := w.WriteByte(0); err != nil {
return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err)
return fmt.Errorf("failed to write `stream::pending` byte: %w", err)
}
return &ByteStreamWriter{r, chunk}, nil
wi, err := w.Index(path...)
if err != nil {
return fmt.Errorf("failed to index reader: %w", err)
}
s := &ByteStreamWriter{r, chunk}
defer func() {
if cErr := wi.Close(); cErr != nil {
if err == nil {
err = fmt.Errorf("failed to close pending byte stream: %w", cErr)
} else {
slog.Warn("failed to close pending byte stream", "err", cErr)
}
}
}()
if err := s.WriteTo(wi); err != nil {
return fmt.Errorf("failed to write stream contents: %w", err)
}
return nil
}
56 changes: 7 additions & 49 deletions go/wrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ type IndexReader interface {
io.Reader
io.ByteReader

Index[IndexReader]
Index[IndexReadCloser]
}

type IndexWriter interface {
io.Writer
io.ByteWriter

Index[IndexWriter]
Index[IndexWriteCloser]
}

type IndexReadCloser interface {
Expand All @@ -87,43 +87,13 @@ type ByteReader interface {
io.Reader
}

type Completer interface {
IsComplete() bool
}

type Receiver[T any] interface {
Receive() (T, error)
}

type ReceiveCompleter[T any] interface {
Receiver[T]
Completer
}

type ReadCompleter interface {
io.Reader
Completer
}

type ByteReadCompleter interface {
type ByteReadCloser interface {
ByteReader
Completer
}

type PendingReceiver[T any] struct {
Receiver[T]
}

func (r *PendingReceiver[T]) Receive() (T, error) {
return r.Receiver.Receive()
}

func (*PendingReceiver[T]) IsComplete() bool {
return false
}

func NewPendingReceiver[T any](rx Receiver[T]) *PendingReceiver[T] {
return &PendingReceiver[T]{rx}
io.Closer
}

type CompleteReceiver[T any] struct {
Expand All @@ -141,35 +111,23 @@ func (r *CompleteReceiver[T]) Receive() (T, error) {
return r.v, nil
}

func (*CompleteReceiver[T]) IsComplete() bool {
return true
}

func NewCompleteReceiver[T any](v T) *CompleteReceiver[T] {
return &CompleteReceiver[T]{v, true}
}

type DecodeReceiver[T any] struct {
r IndexReader
r IndexReadCloser
decode func(IndexReader) (T, error)
}

func (r *DecodeReceiver[T]) Receive() (T, error) {
return r.decode(r.r)
}

func (*DecodeReceiver[T]) IsComplete() bool {
return false
}

func (r *DecodeReceiver[T]) Close() error {
c, ok := r.r.(io.Closer)
if ok {
return c.Close()
}
return nil
return r.r.Close()
}

func NewDecodeReceiver[T any](r IndexReader, decode func(IndexReader) (T, error)) *DecodeReceiver[T] {
func NewDecodeReceiver[T any](r IndexReadCloser, decode func(IndexReader) (T, error)) *DecodeReceiver[T] {
return &DecodeReceiver[T]{r, decode}
}
7 changes: 4 additions & 3 deletions tests/go/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ package integration
import (
"bytes"
"context"
"io"
"log/slog"

wrpc "wrpc.io/go"
)

type AsyncHandler struct{}

func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (wrpc.ReadCompleter, wrpc.ReceiveCompleter[[][]string], error) {
func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (io.Reader, wrpc.Receiver[[][]string], error) {
slog.DebugContext(ctx, "handling `with-streams`", "complete", complete)
buf := bytes.NewBuffer([]byte("test"))
str := wrpc.NewCompleteReceiver([][]string{{"foo", "bar"}, {"baz"}})
if complete {
return wrpc.NewCompleteReader(buf), str, nil
return buf, str, nil
} else {
return wrpc.NewPendingByteReader(buf), wrpc.NewPendingReceiver(str), nil
return buf, str, nil
}
}
Loading

0 comments on commit 2a6c15c

Please sign in to comment.