From 5ba49b194b9139f5483167e1ca662aa7d2012410 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Mon, 26 Aug 2024 21:06:43 +0200 Subject: [PATCH] feat(go)!: rework async I/O - Remove `Completer` abstraction and just always send async values as "pending" in Go. We may want to revisit this in the future with a better approach (e.g. encoding nested async values concurrently and using a `select` to synchronise at encoding position) - Expose `Close` for stream and future types, although for now the callers have to assert for it. This is not the final solution, just an intermediate state. More breaking changes are incoming for this functionality Signed-off-by: Roman Volosatovs --- crates/wit-bindgen-go/src/interface.rs | 499 +++++-------- .../cmd/hello-client-nats/main.go | 2 +- .../cmd/hello-server-nats/main.go | 2 +- .../cmd/resources-server-nats/main.go | 4 +- .../streams/handler/bindings.wrpc.go | 344 +++------ .../cmd/streams-client-nats/main.go | 2 +- .../streams/handler/bindings.wrpc.go | 658 +++++------------- .../cmd/streams-server-nats/main.go | 5 +- go/future.go | 4 +- go/nats/client.go | 8 +- go/stream.go | 115 +-- go/wrpc.go | 56 +- tests/go/async.go | 7 +- tests/go/async_test.go | 41 +- tests/go/cmd/sync-server-nats/main.go | 2 +- tests/go/resources.go | 2 +- tests/go/resources_test.go | 5 +- tests/go/sync_test.go | 5 +- tests/go/types_test.go | 3 +- 19 files changed, 559 insertions(+), 1205 deletions(-) diff --git a/crates/wit-bindgen-go/src/interface.rs b/crates/wit-bindgen-go/src/interface.rs index 19a2d386..8454e788 100644 --- a/crates/wit-bindgen-go/src/interface.rs +++ b/crates/wit-bindgen-go/src/interface.rs @@ -910,13 +910,14 @@ impl InterfaceGenerator<'_> { match ty { Some(ty) if is_list_of(self.resolve, Type::U8, ty) => { let bytes = self.deps.bytes(); + let io = self.deps.io(); let fmt = self.deps.fmt(); let slog = self.deps.slog(); let wrpc = self.deps.wrpc(); uwriteln!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReadCompleter, error) {{ + r#"func(r {wrpc}.IndexReader, path ...uint32) ({io}.Reader, error) {{ {slog}.Debug("reading byte list future status byte") status, err := r.ReadByte() if err != nil {{ @@ -924,13 +925,11 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} - return {wrpc}.NewByteStreamReader({wrpc}.NewPendingByteReader(r)), nil + return {wrpc}.NewByteStreamReader(r), nil case 1: {slog}.Debug("reading ready byte list future contents") buf, err := "# @@ -943,7 +942,7 @@ impl InterfaceGenerator<'_> { return nil, {fmt}.Errorf("failed to read ready byte list future contents: %w", err) }} {slog}.Debug("read ready byte list future contents", "len", len(buf)) - return {wrpc}.NewCompleteReader({bytes}.NewReader(buf)), nil + return {bytes}.NewReader(buf), nil default: return nil, {fmt}.Errorf("invalid byte list future status byte %d", status) }} @@ -963,7 +962,7 @@ impl InterfaceGenerator<'_> { uwrite!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReceiveCompleter["# + r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.Receiver["# ); self.print_opt_ty(ty, true); uwrite!( @@ -976,11 +975,9 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("# ); @@ -1032,13 +1029,14 @@ impl InterfaceGenerator<'_> { match element { Some(ty) if is_ty(self.resolve, Type::U8, ty) => { let bytes = self.deps.bytes(); + let io = self.deps.io(); let fmt = self.deps.fmt(); let slog = self.deps.slog(); let wrpc = self.deps.wrpc(); uwriteln!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReadCompleter, error) {{ + r#"func(r {wrpc}.IndexReader, path ...uint32) ({io}.Reader, error) {{ {slog}.Debug("reading byte stream status byte") status, err := r.ReadByte() if err != nil {{ @@ -1046,13 +1044,11 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} - return {wrpc}.NewByteStreamReader({wrpc}.NewPendingByteReader(r)), nil + return {wrpc}.NewByteStreamReader(r), nil case 1: {slog}.Debug("reading ready byte stream contents") buf, err := "# @@ -1065,7 +1061,7 @@ impl InterfaceGenerator<'_> { return nil, {fmt}.Errorf("failed to read ready byte stream contents: %w", err) }} {slog}.Debug("read ready byte stream contents", "len", len(buf)) - return {wrpc}.NewCompleteReader({bytes}.NewReader(buf)), nil + return {bytes}.NewReader(buf), nil default: return nil, {fmt}.Errorf("invalid stream status byte %d", status) }} @@ -1088,7 +1084,7 @@ impl InterfaceGenerator<'_> { uwrite!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReceiveCompleter["# + r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.Receiver["# ); self.print_list(ty); uwrite!( @@ -1101,11 +1097,9 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} var total uint32 return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("# @@ -1928,7 +1922,6 @@ impl InterfaceGenerator<'_> { fn print_write_future(&mut self, ty: &Option, name: &str, writer: &str) { match ty { Some(ty) if is_list_of(self.resolve, Type::U8, ty) => { - let bytes = self.deps.bytes(); let fmt = self.deps.fmt(); let io = self.deps.io(); let math = self.deps.math(); @@ -1936,72 +1929,41 @@ impl InterfaceGenerator<'_> { let wrpc = self.deps.wrpc(); uwrite!( self.src, - r#"func(v {wrpc}.ReadCompleter, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + r#"func(v {io}.Reader, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ + {slog}.Debug("writing byte list future `future::pending` status byte") + if err = w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready byte list future: %w", cErr) + err = {fmt}.Errorf("failed to close pending byte list future: %w", cErr) }} else {{ - slog.Warn("failed to close ready byte list future", "err", cErr) + {slog}.Warn("failed to close pending byte list future", "err", cErr) }} }} }} }}() - {slog}.Debug("writing byte list future `future::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `future::ready` byte: %w", err) - }} - {slog}.Debug("reading ready byte list future contents") - var buf {bytes}.Buffer - var n int64 - n, err = {io}.Copy(&buf, v) + {slog}.Debug("reading pending byte list future contents") + chunk, err := {io}.ReadAll(chunk) if err != nil {{ - return nil, {fmt}.Errorf("failed to read ready byte list future contents: %w", err) + return {fmt}.Errorf("failed to read pending byte list future: %w", err) }} - {slog}.Debug("writing ready byte list future contents", "len", n) - if err = {wrpc}.WriteByteList(buf.Bytes(), w); err != nil {{ - return nil, {fmt}.Errorf("failed to write ready byte list future contents: %w", err) + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("pending byte list future length of %d overflows a 32-bit integer", n) }} - return nil, nil - }} else {{ - {slog}.Debug("writing byte list future `future::pending` status byte") - if err = w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + {slog}.Debug("writing pending byte list future length", "len", n) + if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending byte list future length of %d: %w", n, err) }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending byte list future: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending byte list future", "err", cErr) - }} - }} - }} - }}() - {slog}.Debug("reading pending byte list future contents") - chunk, err := {io}.ReadAll(chunk) - if err != nil {{ - return {fmt}.Errorf("failed to read pending byte list future: %w", err) - }} - if n > {math}.MaxUint32 {{ - return {fmt}.Errorf("pending byte list future length of %d overflows a 32-bit integer", n) - }} - {slog}.Debug("writing pending byte list future length", "len", n) - if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ - return {fmt}.Errorf("failed to write pending byte list future length of %d: %w", n, err) - }} - _, err = w.Write(chunk[:n]) - if err != nil {{ - return {fmt}.Errorf("failed to write pending byte list future contents: %w", err) - }} - }}, nil - }} + _, err = w.Write(chunk[:n]) + if err != nil {{ + return {fmt}.Errorf("failed to write pending byte list future contents: %w", err) + }} + }}, nil }}({name}, {writer})"#, ); } @@ -2010,83 +1972,48 @@ impl InterfaceGenerator<'_> { let io = self.deps.io(); let slog = self.deps.slog(); let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(v {wrpc}.ReceiveCompleter[",); + uwrite!(self.src, "func(v {wrpc}.Receiver[",); self.print_opt_ty(ty, true); uwrite!( self.src, r#"], w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + {slog}.Debug("writing future `future::pending` status byte") + if err := w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready future: %w", cErr) + err = {fmt}.Errorf("failed to close pending future: %w", cErr) }} else {{ - slog.Warn("failed to close ready future", "err", cErr) + {slog}.Warn("failed to close pending future", "err", cErr) }} }} }} }}() - {slog}.Debug("writing future `future::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `future::ready` byte: %w", err) - }} - {slog}.Debug("receiving ready future contents") + {slog}.Debug("receiving outgoing pending future contents") rx, err := v.Receive() - if err != nil && err != {io}.EOF {{ - return nil, {fmt}.Errorf("failed to receive ready future contents: %w", err) + if err != nil {{ + return {fmt}.Errorf("failed to receive outgoing pending future: %w", err) }} - {slog}.Debug("writing ready future contents") - write, err := "#, + {slog}.Debug("writing pending future element") + write, err :="#, ); self.print_write_ty(ty, "rx", "w"); uwrite!( self.src, r#" if err != nil {{ - return nil, {fmt}.Errorf("failed to write ready future contents: %w", err) + return {fmt}.Errorf("failed to write pending future element: %w", err) }} - return write, nil - }} else {{ - {slog}.Debug("writing future `future::pending` status byte") - if err := w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + if write != nil {{ + return write(w) }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending future: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending future", "err", cErr) - }} - }} - }} - }}() - {slog}.Debug("receiving outgoing pending future contents") - rx, err := v.Receive() - if err != nil {{ - return {fmt}.Errorf("failed to receive outgoing pending future: %w", err) - }} - {slog}.Debug("writing pending future element") - write, err :="#, - ); - self.print_write_ty(ty, "rx", "w"); - uwrite!( - self.src, - r#" - if err != nil {{ - return {fmt}.Errorf("failed to write pending future element: %w", err) - }} - if write != nil {{ - return write(w) - }} - return nil - }}, nil - }} + return nil + }}, nil }}({name}, {writer})"#, ); } @@ -2097,7 +2024,6 @@ impl InterfaceGenerator<'_> { fn print_write_stream(&mut self, Stream { element, .. }: &Stream, name: &str, writer: &str) { match element { Some(ty) if is_ty(self.resolve, Type::U8, ty) => { - let bytes = self.deps.bytes(); let fmt = self.deps.fmt(); let io = self.deps.io(); let math = self.deps.math(); @@ -2105,85 +2031,54 @@ impl InterfaceGenerator<'_> { let wrpc = self.deps.wrpc(); uwrite!( self.src, - r#"func(v {wrpc}.ReadCompleter, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + r#"func(v {io}.Reader, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ + {slog}.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready byte stream: %w", cErr) + err = {fmt}.Errorf("failed to close pending byte stream: %w", cErr) }} else {{ - slog.Warn("failed to close ready byte stream", "err", cErr) + {slog}.Warn("failed to close pending byte stream", "err", cErr) }} }} }} }}() - {slog}.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `stream::ready` byte: %w", err) - }} - {slog}.Debug("reading ready byte stream contents") - var buf {bytes}.Buffer - var n int64 - n, err = {io}.Copy(&buf, v) - if err != nil {{ - return nil, {fmt}.Errorf("failed to read ready byte stream contents: %w", err) - }} - {slog}.Debug("writing ready byte stream contents", "len", n) - if err = {wrpc}.WriteByteList(buf.Bytes(), w); err != nil {{ - return nil, {fmt}.Errorf("failed to write ready byte stream contents: %w", err) - }} - return nil, nil - }} else {{ - {slog}.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending byte stream: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending byte stream", "err", cErr) - }} - }} - }} - }}() - chunk := make([]byte, 8096) - for {{ - var end bool - {slog}.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == {io}.EOF {{ - end = true - {slog}.Debug("pending byte stream reached EOF") - }} else if err != nil {{ - return {fmt}.Errorf("failed to read pending byte stream chunk: %w", err) - }} - if n > {math}.MaxUint32 {{ - return {fmt}.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - }} - {slog}.Debug("writing pending byte stream chunk length", "len", n) - if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ - return {fmt}.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - }} - _, err = w.Write(chunk[:n]) - if err != nil {{ - return {fmt}.Errorf("failed to write pending byte stream chunk contents: %w", err) - }} - if end {{ - if err := w.WriteByte(0); err != nil {{ - return {fmt}.Errorf("failed to write pending byte stream end byte: %w", err) - }} - return nil + chunk := make([]byte, 8096) + for {{ + var end bool + {slog}.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == {io}.EOF {{ + end = true + {slog}.Debug("pending byte stream reached EOF") + }} else if err != nil {{ + return {fmt}.Errorf("failed to read pending byte stream chunk: %w", err) + }} + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + }} + {slog}.Debug("writing pending byte stream chunk length", "len", n) + if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + }} + _, err = w.Write(chunk[:n]) + if err != nil {{ + return {fmt}.Errorf("failed to write pending byte stream chunk contents: %w", err) + }} + if end {{ + if err := w.WriteByte(0); err != nil {{ + return {fmt}.Errorf("failed to write pending byte stream end byte: %w", err) }} + return nil }} - }}, nil - }} + }} + }}, nil }}({name}, {writer})"#, ); } @@ -2196,140 +2091,91 @@ impl InterfaceGenerator<'_> { let slog = self.deps.slog(); let sync = self.deps.sync(); let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(v {wrpc}.ReceiveCompleter[",); + uwrite!(self.src, "func(v {wrpc}.Receiver[",); self.print_list(ty); uwrite!( self.src, r#"], w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + {slog}.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready stream: %w", cErr) + err = {fmt}.Errorf("failed to close pending stream: %w", cErr) }} else {{ - slog.Warn("failed to close ready stream", "err", cErr) + {slog}.Warn("failed to close pending stream", "err", cErr) }} }} }} }}() - {slog}.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `stream::ready` byte: %w", err) - }} - {slog}.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != {io}.EOF {{ - return nil, {fmt}.Errorf("failed to receive ready stream contents: %w", err) - }} - if err != {io}.EOF && len(vs) > 0 {{ - for {{ - chunk, err := v.Receive() - if err != nil && err != {io}.EOF {{ - return nil, {fmt}.Errorf("failed to receive ready stream contents: %w", err) - }} - if len(chunk) > 0 {{ - vs = append(vs, chunk...) - }} - if err == {io}.EOF {{ - break - }} + var wg {sync}.WaitGroup + var wgErr {atomic}.Value + var total uint32 + for {{ + var end bool + {slog}.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == {io}.EOF {{ + end = true + {slog}.Debug("outgoing pending stream reached EOF") + }} else if err != nil {{ + return {fmt}.Errorf("failed to receive outgoing pending stream chunk: %w", err) }} - }} - {slog}.Debug("writing ready stream contents", "len", len(vs)) - write, err := "#, - ); - self.print_write_list(ty, "vs", "w"); - uwrite!( - self.src, - r#" - if err != nil {{ - return nil, {fmt}.Errorf("failed to write ready stream contents: %w", err) - }} - return write, nil - }} else {{ - {slog}.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending stream: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending stream", "err", cErr) - }} - }} - }} - }}() - var wg {sync}.WaitGroup - var wgErr {atomic}.Value - var total uint32 - for {{ - var end bool - {slog}.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == {io}.EOF {{ - end = true - {slog}.Debug("outgoing pending stream reached EOF") - }} else if err != nil {{ - return {fmt}.Errorf("failed to receive outgoing pending stream chunk: %w", err) - }} - if n > {math}.MaxUint32 {{ - return {fmt}.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - }} - if {math}.MaxUint32 - uint32(n) < total {{ - return {errors}.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - }} - {slog}.Debug("writing pending stream chunk length", "len", n) - if err = {wrpc}.WriteUint32(uint32(n), w); err != nil {{ - return {fmt}.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - }} - for _, v := range chunk {{ - {slog}.Debug("writing pending stream element", "i", total) - write, err :="#, + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) + }} + if {math}.MaxUint32 - uint32(n) < total {{ + return {errors}.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") + }} + {slog}.Debug("writing pending stream chunk length", "len", n) + if err = {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending stream chunk length of %d: %w", n, err) + }} + for _, v := range chunk {{ + {slog}.Debug("writing pending stream element", "i", total) + write, err :="#, ); self.print_write_ty(ty, "v", "w"); uwrite!( self.src, r#" + if err != nil {{ + return {fmt}.Errorf("failed to write pending stream chunk element %d: %w", total, err) + }} + if write != nil {{ + wg.Add(1) + w, err := w.Index(total) if err != nil {{ - return {fmt}.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return {fmt}.Errorf("failed to index writer: %w", err) }} - if write != nil {{ - wg.Add(1) - w, err := w.Index(total) - if err != nil {{ - return {fmt}.Errorf("failed to index writer: %w", err) + go func() {{ + defer wg.Done() + if err := write(w); err != nil {{ + wgErr.Store(err) }} - go func() {{ - defer wg.Done() - if err := write(w); err != nil {{ - wgErr.Store(err) - }} - }}() - }} - total++ + }}() }} - if end {{ - if err := w.WriteByte(0); err != nil {{ - return {fmt}.Errorf("failed to write pending stream end byte: %w", err) - }} - wg.Wait() - err := wgErr.Load() - if err == nil {{ - return nil - }} - return err.(error) + total++ + }} + if end {{ + if err := w.WriteByte(0); err != nil {{ + return {fmt}.Errorf("failed to write pending stream end byte: %w", err) + }} + wg.Wait() + err := wgErr.Load() + if err == nil {{ + return nil }} + return err.(error) }} - }}, nil - }} + }} + }}, nil }}({name}, {writer})"#, ); } @@ -3298,23 +3144,36 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) { } fn print_future(&mut self, ty: &Option) { - let wrpc = self.deps.wrpc(); - self.push_str(wrpc); - self.push_str(".ReceiveCompleter["); - let ty = ty.expect("futures with no element types are not supported"); - self.print_opt_ty(&ty, true); - self.push_str("]"); + match ty { + Some(ty) if is_ty(self.resolve, Type::U8, ty) => { + let io = self.deps.io(); + self.push_str(io); + self.push_str(".Reader"); + } + Some(ty) => { + let wrpc = self.deps.wrpc(); + self.push_str(wrpc); + self.push_str(".Receiver["); + self.print_opt_ty(ty, true); + self.push_str("]"); + } + None => { + panic!("futures with no element types are not supported") + } + } } fn print_stream(&mut self, Stream { element, .. }: &Stream) { - let wrpc = self.deps.wrpc(); - self.push_str(wrpc); match element { Some(ty) if is_ty(self.resolve, Type::U8, ty) => { - self.push_str(".ReadCompleter"); + let io = self.deps.io(); + self.push_str(io); + self.push_str(".Reader"); } Some(ty) => { - self.push_str(".ReceiveCompleter["); + let wrpc = self.deps.wrpc(); + self.push_str(wrpc); + self.push_str(".Receiver["); self.print_list(ty); self.push_str("]"); } diff --git a/examples/go/hello-client/cmd/hello-client-nats/main.go b/examples/go/hello-client/cmd/hello-client-nats/main.go index 448e5a77..88a20f4d 100644 --- a/examples/go/hello-client/cmd/hello-client-nats/main.go +++ b/examples/go/hello-client/cmd/hello-client-nats/main.go @@ -7,9 +7,9 @@ import ( "log/slog" "os" + "github.com/nats-io/nats.go" "wrpc.io/examples/go/hello-client/bindings/wrpc_examples/hello/handler" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) func run() (err error) { diff --git a/examples/go/hello-server/cmd/hello-server-nats/main.go b/examples/go/hello-server/cmd/hello-server-nats/main.go index ba296244..b608b974 100644 --- a/examples/go/hello-server/cmd/hello-server-nats/main.go +++ b/examples/go/hello-server/cmd/hello-server-nats/main.go @@ -9,9 +9,9 @@ import ( "os/signal" "syscall" + "github.com/nats-io/nats.go" server "wrpc.io/examples/go/hello-server/bindings" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) type Handler struct{} diff --git a/examples/go/resources-server/cmd/resources-server-nats/main.go b/examples/go/resources-server/cmd/resources-server-nats/main.go index 910a9431..a6edd945 100644 --- a/examples/go/resources-server/cmd/resources-server-nats/main.go +++ b/examples/go/resources-server/cmd/resources-server-nats/main.go @@ -10,12 +10,12 @@ import ( "sync" "syscall" + "github.com/google/uuid" + "github.com/nats-io/nats.go" server "wrpc.io/examples/go/resources-server/bindings" "wrpc.io/examples/go/resources-server/bindings/exports/wrpc_examples/resources/resources" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" - "github.com/google/uuid" - "github.com/nats-io/nats.go" ) type Foo struct { diff --git a/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go b/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go index 714b98e7..693d680b 100644 --- a/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go +++ b/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go @@ -16,8 +16,8 @@ import ( ) type Req struct { - Numbers wrpc.ReceiveCompleter[[]uint64] - Bytes wrpc.ReadCompleter + Numbers wrpc.Receiver[[]uint64] + Bytes io.Reader } func (v *Req) String() string { return "Req" } @@ -25,196 +25,91 @@ func (v *Req) String() string { return "Req" } func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) slog.Debug("writing field", "name", "numbers") - write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { + write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready stream: %w", cErr) + err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { - slog.Warn("failed to close ready stream", "err", cErr) + slog.Warn("failed to close pending stream", "err", cErr) } } } }() - slog.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if err != io.EOF && len(vs) > 0 { - for { - chunk, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if len(chunk) > 0 { - vs = append(vs, chunk...) - } - if err == io.EOF { - break - } + var wg sync.WaitGroup + var wgErr atomic.Value + var total uint32 + for { + var end bool + slog.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == io.EOF { + end = true + slog.Debug("outgoing pending stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } - } - slog.Debug("writing ready stream contents", "len", len(vs)) - write, err := func(v []uint64, w interface { - io.ByteWriter - io.Writer - }) (write func(wrpc.IndexWriter) error, err error) { - n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) + return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing list length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) + if math.MaxUint32-uint32(n) < total { + return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") } - slog.Debug("writing list elements") - writes := make(map[uint32]func(wrpc.IndexWriter) error, n) - for i, e := range v { + slog.Debug("writing pending stream chunk length", "len", n) + if err = wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) + } + for _, v := range chunk { + slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err - }(e, w) + }(v, w) if err != nil { - return nil, fmt.Errorf("failed to write list element %d: %w", i, err) + return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { - writes[uint32(i)] = write - } - } - if len(writes) > 0 { - return func(w wrpc.IndexWriter) error { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - }, nil - } - return nil, nil - }(vs, w) - if err != nil { - return nil, fmt.Errorf("failed to write ready stream contents: %w", err) - } - return write, nil - } else { - slog.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending stream: %w", cErr) - } else { - slog.Warn("failed to close pending stream", "err", cErr) - } - } - } - }() - var wg sync.WaitGroup - var wgErr atomic.Value - var total uint32 - for { - var end bool - slog.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == io.EOF { - end = true - slog.Debug("outgoing pending stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - } - if math.MaxUint32-uint32(n) < total { - return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - } - slog.Debug("writing pending stream chunk length", "len", n) - if err = wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - } - for _, v := range chunk { - slog.Debug("writing pending stream element", "i", total) - write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err = w.Write(b[:i]) - return err - }(v, w) + wg.Add(1) + w, err := w.Index(total) if err != nil { - return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return fmt.Errorf("failed to index writer: %w", err) } - if write != nil { - wg.Add(1) - w, err := w.Index(total) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + go func() { + defer wg.Done() + if err := write(w); err != nil { + wgErr.Store(err) } - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - total++ + }() } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending stream end byte: %w", err) - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) + total++ + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending stream end byte: %w", err) } + wg.Wait() + err := wgErr.Load() + if err == nil { + return nil + } + return err.(error) } - }, nil - } + } + }, nil }(v.Numbers, w) if err != nil { return nil, fmt.Errorf("failed to write `numbers` field: %w", err) @@ -223,88 +118,57 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err writes[0] = write0 } slog.Debug("writing field", "name", "bytes") - write1, err := func(v wrpc.ReadCompleter, w interface { + write1, err := func(v io.Reader, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready byte stream: %w", cErr) + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { - slog.Warn("failed to close ready byte stream", "err", cErr) + slog.Warn("failed to close pending byte stream", "err", cErr) } } } }() - slog.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("reading ready byte stream contents") - var buf bytes.Buffer - var n int64 - n, err = io.Copy(&buf, v) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - if err = wrpc.WriteByteList(buf.Bytes(), w); err != nil { - return nil, fmt.Errorf("failed to write ready byte stream contents: %w", err) - } - return nil, nil - } else { - slog.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending byte stream: %w", cErr) - } else { - slog.Warn("failed to close pending byte stream", "err", cErr) - } - } - } - }() - chunk := make([]byte, 8096) - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil + chunk := make([]byte, 8096) + for { + var end bool + slog.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == io.EOF { + end = true + slog.Debug("pending byte stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to read pending byte stream chunk: %w", err) + } + if n > math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } + return nil } - }, nil - } + } + }, nil }(v.Bytes, w) if err != nil { return nil, fmt.Errorf("failed to write `bytes` field: %w", err) @@ -341,7 +205,7 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err } return nil, nil } -func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.ReceiveCompleter[[]uint64], r1__ wrpc.ReadCompleter, writeErrs__ <-chan error, err__ error) { +func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receiver[[]uint64], r1__ io.Reader, writeErrs__ <-chan error, err__ error) { var buf__ bytes.Buffer var writeCount__ uint32 write0__, err__ := (r).WriteToIndex(&buf__) @@ -400,7 +264,7 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive if cErr__ := w__.Close(); cErr__ != nil { slog.DebugContext(ctx__, "failed to close outgoing stream", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", cErr__) } - r0__, err__ = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReceiveCompleter[[]uint64], error) { + r0__, err__ = func(r wrpc.IndexReader, path ...uint32) (wrpc.Receiver[[]uint64], error) { slog.Debug("reading stream status byte") status, err := r.ReadByte() if err != nil { @@ -408,11 +272,9 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } var total uint32 return wrpc.NewDecodeReceiver(r, func(r wrpc.IndexReader) ([]uint64, error) { @@ -553,7 +415,7 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive err__ = fmt.Errorf("failed to read result 0: %w", err__) return } - r1__, err__ = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReadCompleter, error) { + r1__, err__ = func(r wrpc.IndexReader, path ...uint32) (io.Reader, error) { slog.Debug("reading byte stream status byte") status, err := r.ReadByte() if err != nil { @@ -561,13 +423,11 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } - return wrpc.NewByteStreamReader(wrpc.NewPendingByteReader(r)), nil + return wrpc.NewByteStreamReader(r), nil case 1: slog.Debug("reading ready byte stream contents") buf, err := @@ -608,7 +468,7 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) } slog.Debug("read ready byte stream contents", "len", len(buf)) - return wrpc.NewCompleteReader(bytes.NewReader(buf)), nil + return bytes.NewReader(buf), nil default: return nil, fmt.Errorf("invalid stream status byte %d", status) } diff --git a/examples/go/streams-client/cmd/streams-client-nats/main.go b/examples/go/streams-client/cmd/streams-client-nats/main.go index e0ad925a..986ef614 100644 --- a/examples/go/streams-client/cmd/streams-client-nats/main.go +++ b/examples/go/streams-client/cmd/streams-client-nats/main.go @@ -10,9 +10,9 @@ import ( "sync" "time" + "github.com/nats-io/nats.go" "wrpc.io/examples/go/streams-client/bindings/wrpc_examples/streams/handler" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) type ThrottleStream[T any] struct { diff --git a/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go b/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go index 8020e25a..2ed29a4b 100644 --- a/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go +++ b/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go @@ -16,8 +16,8 @@ import ( ) type Req struct { - Numbers wrpc.ReceiveCompleter[[]uint64] - Bytes wrpc.ReadCompleter + Numbers wrpc.Receiver[[]uint64] + Bytes io.Reader } func (v *Req) String() string { return "Req" } @@ -25,196 +25,91 @@ func (v *Req) String() string { return "Req" } func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) slog.Debug("writing field", "name", "numbers") - write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { + write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready stream: %w", cErr) + err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { - slog.Warn("failed to close ready stream", "err", cErr) + slog.Warn("failed to close pending stream", "err", cErr) } } } }() - slog.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if err != io.EOF && len(vs) > 0 { - for { - chunk, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if len(chunk) > 0 { - vs = append(vs, chunk...) - } - if err == io.EOF { - break - } + var wg sync.WaitGroup + var wgErr atomic.Value + var total uint32 + for { + var end bool + slog.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == io.EOF { + end = true + slog.Debug("outgoing pending stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } - } - slog.Debug("writing ready stream contents", "len", len(vs)) - write, err := func(v []uint64, w interface { - io.ByteWriter - io.Writer - }) (write func(wrpc.IndexWriter) error, err error) { - n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) + return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) + } + if math.MaxUint32-uint32(n) < total { + return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing list length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) + slog.Debug("writing pending stream chunk length", "len", n) + if err = wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) } - slog.Debug("writing list elements") - writes := make(map[uint32]func(wrpc.IndexWriter) error, n) - for i, e := range v { + for _, v := range chunk { + slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err - }(e, w) + }(v, w) if err != nil { - return nil, fmt.Errorf("failed to write list element %d: %w", i, err) + return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { - writes[uint32(i)] = write - } - } - if len(writes) > 0 { - return func(w wrpc.IndexWriter) error { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - }, nil - } - return nil, nil - }(vs, w) - if err != nil { - return nil, fmt.Errorf("failed to write ready stream contents: %w", err) - } - return write, nil - } else { - slog.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending stream: %w", cErr) - } else { - slog.Warn("failed to close pending stream", "err", cErr) - } - } - } - }() - var wg sync.WaitGroup - var wgErr atomic.Value - var total uint32 - for { - var end bool - slog.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == io.EOF { - end = true - slog.Debug("outgoing pending stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - } - if math.MaxUint32-uint32(n) < total { - return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - } - slog.Debug("writing pending stream chunk length", "len", n) - if err = wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - } - for _, v := range chunk { - slog.Debug("writing pending stream element", "i", total) - write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err = w.Write(b[:i]) - return err - }(v, w) + wg.Add(1) + w, err := w.Index(total) if err != nil { - return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return fmt.Errorf("failed to index writer: %w", err) } - if write != nil { - wg.Add(1) - w, err := w.Index(total) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + go func() { + defer wg.Done() + if err := write(w); err != nil { + wgErr.Store(err) } - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - total++ + }() } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending stream end byte: %w", err) - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) + total++ + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending stream end byte: %w", err) + } + wg.Wait() + err := wgErr.Load() + if err == nil { + return nil } + return err.(error) } - }, nil - } + } + }, nil }(v.Numbers, w) if err != nil { return nil, fmt.Errorf("failed to write `numbers` field: %w", err) @@ -223,88 +118,57 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err writes[0] = write0 } slog.Debug("writing field", "name", "bytes") - write1, err := func(v wrpc.ReadCompleter, w interface { + write1, err := func(v io.Reader, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready byte stream: %w", cErr) + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { - slog.Warn("failed to close ready byte stream", "err", cErr) + slog.Warn("failed to close pending byte stream", "err", cErr) } } } }() - slog.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("reading ready byte stream contents") - var buf bytes.Buffer - var n int64 - n, err = io.Copy(&buf, v) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - if err = wrpc.WriteByteList(buf.Bytes(), w); err != nil { - return nil, fmt.Errorf("failed to write ready byte stream contents: %w", err) - } - return nil, nil - } else { - slog.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending byte stream: %w", cErr) - } else { - slog.Warn("failed to close pending byte stream", "err", cErr) - } - } - } - }() - chunk := make([]byte, 8096) - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil + chunk := make([]byte, 8096) + for { + var end bool + slog.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == io.EOF { + end = true + slog.Debug("pending byte stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to read pending byte stream chunk: %w", err) + } + if n > math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } + return nil } - }, nil - } + } + }, nil }(v.Bytes, w) if err != nil { return nil, fmt.Errorf("failed to write `bytes` field: %w", err) @@ -343,7 +207,7 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err } type Handler interface { - Echo(ctx__ context.Context, r *Req) (wrpc.ReceiveCompleter[[]uint64], wrpc.ReadCompleter, error) + Echo(ctx__ context.Context, r *Req) (wrpc.Receiver[[]uint64], io.Reader, error) } func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { @@ -368,7 +232,7 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { v := &Req{} var err error slog.Debug("reading field", "name", "numbers") - v.Numbers, err = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReceiveCompleter[[]uint64], error) { + v.Numbers, err = func(r wrpc.IndexReader, path ...uint32) (wrpc.Receiver[[]uint64], error) { slog.Debug("reading stream status byte") status, err := r.ReadByte() if err != nil { @@ -376,11 +240,9 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } var total uint32 return wrpc.NewDecodeReceiver(r, func(r wrpc.IndexReader) ([]uint64, error) { @@ -521,7 +383,7 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to read `numbers` field: %w", err) } slog.Debug("reading field", "name", "bytes") - v.Bytes, err = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReadCompleter, error) { + v.Bytes, err = func(r wrpc.IndexReader, path ...uint32) (io.Reader, error) { slog.Debug("reading byte stream status byte") status, err := r.ReadByte() if err != nil { @@ -529,13 +391,11 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } - return wrpc.NewByteStreamReader(wrpc.NewPendingByteReader(r)), nil + return wrpc.NewByteStreamReader(r), nil case 1: slog.Debug("reading ready byte stream contents") buf, err := @@ -576,7 +436,7 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) } slog.Debug("read ready byte stream contents", "len", len(buf)) - return wrpc.NewCompleteReader(bytes.NewReader(buf)), nil + return bytes.NewReader(buf), nil default: return nil, fmt.Errorf("invalid stream status byte %d", status) } @@ -607,196 +467,91 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) - write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { + write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready stream: %w", cErr) + err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { - slog.Warn("failed to close ready stream", "err", cErr) + slog.Warn("failed to close pending stream", "err", cErr) } } } }() - slog.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if err != io.EOF && len(vs) > 0 { - for { - chunk, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if len(chunk) > 0 { - vs = append(vs, chunk...) - } - if err == io.EOF { - break - } + var wg sync.WaitGroup + var wgErr atomic.Value + var total uint32 + for { + var end bool + slog.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == io.EOF { + end = true + slog.Debug("outgoing pending stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } - } - slog.Debug("writing ready stream contents", "len", len(vs)) - write, err := func(v []uint64, w interface { - io.ByteWriter - io.Writer - }) (write func(wrpc.IndexWriter) error, err error) { - n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) + return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing list length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) + if math.MaxUint32-uint32(n) < total { + return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") + } + slog.Debug("writing pending stream chunk length", "len", n) + if err = wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) } - slog.Debug("writing list elements") - writes := make(map[uint32]func(wrpc.IndexWriter) error, n) - for i, e := range v { + for _, v := range chunk { + slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err - }(e, w) + }(v, w) if err != nil { - return nil, fmt.Errorf("failed to write list element %d: %w", i, err) + return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { - writes[uint32(i)] = write - } - } - if len(writes) > 0 { - return func(w wrpc.IndexWriter) error { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - }, nil - } - return nil, nil - }(vs, w) - if err != nil { - return nil, fmt.Errorf("failed to write ready stream contents: %w", err) - } - return write, nil - } else { - slog.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending stream: %w", cErr) - } else { - slog.Warn("failed to close pending stream", "err", cErr) - } - } - } - }() - var wg sync.WaitGroup - var wgErr atomic.Value - var total uint32 - for { - var end bool - slog.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == io.EOF { - end = true - slog.Debug("outgoing pending stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - } - if math.MaxUint32-uint32(n) < total { - return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - } - slog.Debug("writing pending stream chunk length", "len", n) - if err = wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - } - for _, v := range chunk { - slog.Debug("writing pending stream element", "i", total) - write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err = w.Write(b[:i]) - return err - }(v, w) + wg.Add(1) + w, err := w.Index(total) if err != nil { - return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return fmt.Errorf("failed to index writer: %w", err) } - if write != nil { - wg.Add(1) - w, err := w.Index(total) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + go func() { + defer wg.Done() + if err := write(w); err != nil { + wgErr.Store(err) } - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - total++ + }() } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending stream end byte: %w", err) - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) + total++ + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending stream end byte: %w", err) } + wg.Wait() + err := wgErr.Load() + if err == nil { + return nil + } + return err.(error) } - }, nil - } + } + }, nil }(r0, &buf) if err != nil { slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:streams/handler", "name", "echo", "err", err) @@ -805,88 +560,57 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { if write0 != nil { writes[0] = write0 } - write1, err := func(v wrpc.ReadCompleter, w interface { + write1, err := func(v io.Reader, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready byte stream: %w", cErr) + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { - slog.Warn("failed to close ready byte stream", "err", cErr) + slog.Warn("failed to close pending byte stream", "err", cErr) } } } }() - slog.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("reading ready byte stream contents") - var buf bytes.Buffer - var n int64 - n, err = io.Copy(&buf, v) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - if err = wrpc.WriteByteList(buf.Bytes(), w); err != nil { - return nil, fmt.Errorf("failed to write ready byte stream contents: %w", err) - } - return nil, nil - } else { - slog.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending byte stream: %w", cErr) - } else { - slog.Warn("failed to close pending byte stream", "err", cErr) - } - } - } - }() - chunk := make([]byte, 8096) - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil + chunk := make([]byte, 8096) + for { + var end bool + slog.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == io.EOF { + end = true + slog.Debug("pending byte stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to read pending byte stream chunk: %w", err) + } + if n > math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } + return nil } - }, nil - } + } + }, nil }(r1, &buf) if err != nil { slog.WarnContext(ctx, "failed to write result value", "i", 1, "wrpc-examples:streams/handler", "name", "echo", "err", err) diff --git a/examples/go/streams-server/cmd/streams-server-nats/main.go b/examples/go/streams-server/cmd/streams-server-nats/main.go index e0ae1cc7..0e545479 100644 --- a/examples/go/streams-server/cmd/streams-server-nats/main.go +++ b/examples/go/streams-server/cmd/streams-server-nats/main.go @@ -3,22 +3,23 @@ package main import ( "context" "fmt" + "io" "log" "log/slog" "os" "os/signal" "syscall" + "github.com/nats-io/nats.go" server "wrpc.io/examples/go/streams-server/bindings" "wrpc.io/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) type Handler struct{} -func (Handler) Echo(ctx context.Context, req *handler.Req) (wrpc.ReceiveCompleter[[]uint64], wrpc.ReadCompleter, error) { +func (Handler) Echo(ctx context.Context, req *handler.Req) (wrpc.Receiver[[]uint64], io.Reader, error) { slog.InfoContext(ctx, "handling `wrpc-examples:streams/handler.echo`") return req.Numbers, req.Bytes, nil } diff --git a/go/future.go b/go/future.go index f3dd5167..07eff96b 100644 --- a/go/future.go +++ b/go/future.go @@ -24,14 +24,14 @@ func ReadFutureStatus(r ByteReader) (bool, error) { } // ReadFuture reads a future from `r` -func ReadFuture[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[T], error) { +func ReadFuture[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (Receiver[T], error) { slog.Debug("reading future status byte") ok, err := ReadFutureStatus(r) if err != nil { return nil, err } if !ok { - r, err = r.Index(path...) + r, err := r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to get future reader: %w", err) } diff --git a/go/nats/client.go b/go/nats/client.go index 88a332d4..5316c1a8 100644 --- a/go/nats/client.go +++ b/go/nats/client.go @@ -8,8 +8,8 @@ import ( "sync" "sync/atomic" - wrpc "wrpc.io/go" "github.com/nats-io/nats.go" + wrpc "wrpc.io/go" ) // Client is a thin wrapper around *nats.Conn, which is able to serve and invoke wRPC functions @@ -170,7 +170,7 @@ func (w *paramWriter) WriteByte(b byte) error { return nil } -func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriter, error) { +func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriteCloser, error) { return ¶mWriter{ nc: w.nc, init: w.init, @@ -231,7 +231,7 @@ func (w *resultWriter) Close() error { return nil } -func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriter, error) { +func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriteCloser, error) { return &resultWriter{nc: w.nc, tx: indexPath(w.tx, path...)}, nil } @@ -304,7 +304,7 @@ func (r *streamReader) Close() (err error) { return nil } -func (r *streamReader) Index(path ...uint32) (wrpc.IndexReader, error) { +func (r *streamReader) Index(path ...uint32) (wrpc.IndexReadCloser, error) { r.nestRef.Add(1) s := indexPath(r.path, path...) r.nestMu.Lock() diff --git a/go/stream.go b/go/stream.go index 05175853..7f6b1183 100644 --- a/go/stream.go +++ b/go/stream.go @@ -10,54 +10,6 @@ import ( "math" ) -type CompleteReader struct { - io.Reader -} - -func (*CompleteReader) IsComplete() bool { - return true -} - -func NewCompleteReader(r io.Reader) *CompleteReader { - return &CompleteReader{r} -} - -type CompleteByteReader struct { - ByteReader -} - -func (*CompleteByteReader) IsComplete() bool { - return true -} - -func NewCompleteByteReader(r ByteReader) *CompleteByteReader { - return &CompleteByteReader{r} -} - -type PendingReader struct { - io.Reader -} - -func (*PendingReader) IsComplete() bool { - return false -} - -func NewPendingReader(r io.Reader) *PendingReader { - return &PendingReader{r} -} - -type PendingByteReader struct { - ByteReader -} - -func (*PendingByteReader) IsComplete() bool { - return false -} - -func NewPendingByteReader(r ByteReader) *PendingByteReader { - return &PendingByteReader{r} -} - type ByteStreamWriter struct { r io.Reader chunk []byte @@ -71,9 +23,9 @@ func (v *ByteStreamWriter) WriteTo(w ByteWriter) (err error) { defer func() { if fErr := buf.Flush(); fErr != nil { if err == nil { - err = fmt.Errorf("failed to flush writer: %w", fErr) + err = fmt.Errorf("failed to flush pending byte stream writer: %w", fErr) } else { - slog.Warn("failed to flush writer", "err", fErr) + slog.Warn("failed to flush pending byte stream writer", "err", fErr) } } }() @@ -108,7 +60,7 @@ func (v *ByteStreamWriter) WriteTo(w ByteWriter) (err error) { } type ByteStreamReader struct { - r ByteReadCompleter + r ByteReadCloser buf uint32 } @@ -140,19 +92,11 @@ func (r *ByteStreamReader) Read(p []byte) (int, error) { return rn, nil } -func (r *ByteStreamReader) IsComplete() bool { - return r.r.IsComplete() -} - func (r *ByteStreamReader) Close() error { - c, ok := r.r.(io.Closer) - if ok { - return c.Close() - } - return nil + return r.r.Close() } -func NewByteStreamReader(r ByteReadCompleter) *ByteStreamReader { +func NewByteStreamReader(r ByteReadCloser) *ByteStreamReader { return &ByteStreamReader{ r: r, } @@ -177,18 +121,18 @@ func ReadStreamStatus(r ByteReader) (bool, error) { } // ReadByteStream reads a stream of bytes from `r` -func ReadByteStream(r IndexReader, path ...uint32) (ReadCompleter, error) { +func ReadByteStream(r IndexReader, path ...uint32) (io.Reader, error) { slog.Debug("reading byte stream status byte") ok, err := ReadStreamStatus(r) if err != nil { return nil, err } if !ok { - r, err = r.Index(path...) + r, err := r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to index reader: %w", err) } - return NewByteStreamReader(NewPendingByteReader(r)), nil + return NewByteStreamReader(r), nil } slog.Debug("reading ready byte stream") buf, err := ReadByteList(r) @@ -196,18 +140,18 @@ func ReadByteStream(r IndexReader, path ...uint32) (ReadCompleter, error) { return nil, fmt.Errorf("failed to read bytes: %w", err) } slog.Debug("read ready byte stream", "len", len(buf)) - return NewCompleteReader(bytes.NewReader(buf)), nil + return bytes.NewReader(buf), nil } // ReadStream reads a stream from `r` -func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[[]T], error) { +func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (Receiver[[]T], error) { slog.Debug("reading stream status byte") ok, err := ReadStreamStatus(r) if err != nil { return nil, err } if !ok { - r, err = r.Index(path...) + r, err := r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to index reader: %w", err) } @@ -239,24 +183,27 @@ func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...ui return NewCompleteReceiver(vs), nil } -func WriteByteStream(r ReadCompleter, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) { - if r.IsComplete() { - slog.Debug("writing byte stream `stream::ready` status byte") - if err := w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - var buf bytes.Buffer - slog.Debug("reading ready byte stream contents") - n, err := io.CopyBuffer(&buf, r, chunk) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - return nil, WriteByteList(buf.Bytes(), w) - } +func WriteByteStream(r io.Reader, w IndexWriter, chunk []byte, path ...uint32) (err error) { slog.Debug("writing byte stream `stream::pending` status byte") if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + return fmt.Errorf("failed to write `stream::pending` byte: %w", err) } - return &ByteStreamWriter{r, chunk}, nil + wi, err := w.Index(path...) + if err != nil { + return fmt.Errorf("failed to index reader: %w", err) + } + s := &ByteStreamWriter{r, chunk} + defer func() { + if cErr := wi.Close(); cErr != nil { + if err == nil { + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) + } else { + slog.Warn("failed to close pending byte stream", "err", cErr) + } + } + }() + if err := s.WriteTo(wi); err != nil { + return fmt.Errorf("failed to write stream contents: %w", err) + } + return nil } diff --git a/go/wrpc.go b/go/wrpc.go index e6a34e3c..a611886e 100644 --- a/go/wrpc.go +++ b/go/wrpc.go @@ -57,14 +57,14 @@ type IndexReader interface { io.Reader io.ByteReader - Index[IndexReader] + Index[IndexReadCloser] } type IndexWriter interface { io.Writer io.ByteWriter - Index[IndexWriter] + Index[IndexWriteCloser] } type IndexReadCloser interface { @@ -87,43 +87,13 @@ type ByteReader interface { io.Reader } -type Completer interface { - IsComplete() bool -} - type Receiver[T any] interface { Receive() (T, error) } -type ReceiveCompleter[T any] interface { - Receiver[T] - Completer -} - -type ReadCompleter interface { - io.Reader - Completer -} - -type ByteReadCompleter interface { +type ByteReadCloser interface { ByteReader - Completer -} - -type PendingReceiver[T any] struct { - Receiver[T] -} - -func (r *PendingReceiver[T]) Receive() (T, error) { - return r.Receiver.Receive() -} - -func (*PendingReceiver[T]) IsComplete() bool { - return false -} - -func NewPendingReceiver[T any](rx Receiver[T]) *PendingReceiver[T] { - return &PendingReceiver[T]{rx} + io.Closer } type CompleteReceiver[T any] struct { @@ -141,16 +111,12 @@ func (r *CompleteReceiver[T]) Receive() (T, error) { return r.v, nil } -func (*CompleteReceiver[T]) IsComplete() bool { - return true -} - func NewCompleteReceiver[T any](v T) *CompleteReceiver[T] { return &CompleteReceiver[T]{v, true} } type DecodeReceiver[T any] struct { - r IndexReader + r IndexReadCloser decode func(IndexReader) (T, error) } @@ -158,18 +124,10 @@ func (r *DecodeReceiver[T]) Receive() (T, error) { return r.decode(r.r) } -func (*DecodeReceiver[T]) IsComplete() bool { - return false -} - func (r *DecodeReceiver[T]) Close() error { - c, ok := r.r.(io.Closer) - if ok { - return c.Close() - } - return nil + return r.r.Close() } -func NewDecodeReceiver[T any](r IndexReader, decode func(IndexReader) (T, error)) *DecodeReceiver[T] { +func NewDecodeReceiver[T any](r IndexReadCloser, decode func(IndexReader) (T, error)) *DecodeReceiver[T] { return &DecodeReceiver[T]{r, decode} } diff --git a/tests/go/async.go b/tests/go/async.go index 8f98debb..ac42b2c1 100644 --- a/tests/go/async.go +++ b/tests/go/async.go @@ -5,6 +5,7 @@ package integration import ( "bytes" "context" + "io" "log/slog" wrpc "wrpc.io/go" @@ -12,13 +13,13 @@ import ( type AsyncHandler struct{} -func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (wrpc.ReadCompleter, wrpc.ReceiveCompleter[[][]string], error) { +func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (io.Reader, wrpc.Receiver[[][]string], error) { slog.DebugContext(ctx, "handling `with-streams`", "complete", complete) buf := bytes.NewBuffer([]byte("test")) str := wrpc.NewCompleteReceiver([][]string{{"foo", "bar"}, {"baz"}}) if complete { - return wrpc.NewCompleteReader(buf), str, nil + return buf, str, nil } else { - return wrpc.NewPendingByteReader(buf), wrpc.NewPendingReceiver(str), nil + return buf, str, nil } } diff --git a/tests/go/async_test.go b/tests/go/async_test.go index 37e1f342..b760afed 100644 --- a/tests/go/async_test.go +++ b/tests/go/async_test.go @@ -10,12 +10,12 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" "wrpc.io/tests/go/bindings/async_client/wrpc_test/integration/async" "wrpc.io/tests/go/bindings/async_server" "wrpc.io/tests/go/internal" - "github.com/nats-io/nats.go" ) func TestAsync(t *testing.T) { @@ -66,11 +66,10 @@ func TestAsync(t *testing.T) { t.Errorf("expected: `test`, got: %s", string(b)) return } - // TODO: Close - //if err := byteRx.Close(); err != nil { - // t.Errorf("failed to close byte reader: %s", err) - // return - //} + if err := byteRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close byte reader: %s", err) + return + } ss, err := stringListRx.Receive() if err != nil { @@ -87,11 +86,10 @@ func TestAsync(t *testing.T) { t.Errorf("ready list should have returned (nil, io.EOF), got: (%#v, %v)", ss, err) return } - // TODO: Close - //if err := stringListRx.Close(); err != nil { - // t.Errorf("failed to close string list receiver: %s", err) - // return - //} + if err := stringListRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close string list receiver: %s", err) + return + } } { @@ -110,11 +108,10 @@ func TestAsync(t *testing.T) { t.Errorf("expected: `test`, got: %s", string(b)) return } - // TODO: Close - //if err := byteRx.Close(); err != nil { - // t.Errorf("failed to close byte reader: %s", err) - // return - //} + if err := byteRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close byte reader: %s", err) + return + } ss, err := stringListRx.Receive() if err != nil { @@ -131,15 +128,17 @@ func TestAsync(t *testing.T) { t.Errorf("ready list should have returned (nil, io.EOF), got: (%#v, %v)", ss, err) return } - // TODO: Close - //if err := stringListRx.Close(); err != nil { - // t.Errorf("failed to close string list receiver: %s", err) - // return - //} + if err := stringListRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close string list receiver: %s", err) + return + } } if err = stop(); err != nil { t.Errorf("failed to stop serving `async-server` world: %s", err) return } + if nc.NumSubscriptions() != 0 { + t.Errorf("NATS subscriptions leaked: %d active after client stop", nc.NumSubscriptions()) + } } diff --git a/tests/go/cmd/sync-server-nats/main.go b/tests/go/cmd/sync-server-nats/main.go index b37291fc..313e3972 100644 --- a/tests/go/cmd/sync-server-nats/main.go +++ b/tests/go/cmd/sync-server-nats/main.go @@ -8,10 +8,10 @@ import ( "os/signal" "syscall" + "github.com/nats-io/nats.go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" "wrpc.io/tests/go/bindings/sync_server" - "github.com/nats-io/nats.go" ) func run(url string) error { diff --git a/tests/go/resources.go b/tests/go/resources.go index 43d0bf68..5f59caf4 100644 --- a/tests/go/resources.go +++ b/tests/go/resources.go @@ -7,9 +7,9 @@ import ( "fmt" "sync" + "github.com/google/uuid" wrpc "wrpc.io/go" "wrpc.io/tests/go/bindings/resources_server/exports/wrpc_test/integration/resources" - "github.com/google/uuid" ) type Foo struct { diff --git a/tests/go/resources_test.go b/tests/go/resources_test.go index ff4a8753..90567c38 100644 --- a/tests/go/resources_test.go +++ b/tests/go/resources_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" @@ -15,7 +16,6 @@ import ( "wrpc.io/tests/go/bindings/resources_client/wrpc_test/integration/resources" "wrpc.io/tests/go/bindings/resources_server" "wrpc.io/tests/go/internal" - "github.com/nats-io/nats.go" ) func TestResources(t *testing.T) { @@ -138,4 +138,7 @@ func TestResources(t *testing.T) { t.Errorf("failed to stop serving `resources-server` world: %s", err) return } + if nc.NumSubscriptions() != 0 { + t.Errorf("NATS subscriptions leaked: %d active after client stop", nc.NumSubscriptions()) + } } diff --git a/tests/go/sync_test.go b/tests/go/sync_test.go index 0a62b4a5..994ee683 100644 --- a/tests/go/sync_test.go +++ b/tests/go/sync_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" @@ -16,7 +17,6 @@ import ( "wrpc.io/tests/go/bindings/sync_client/wrpc_test/integration/sync" "wrpc.io/tests/go/bindings/sync_server" "wrpc.io/tests/go/internal" - "github.com/nats-io/nats.go" ) func TestSync(t *testing.T) { @@ -228,4 +228,7 @@ func TestSync(t *testing.T) { t.Errorf("failed to stop serving `sync-server` world: %s", err) return } + if nc.NumSubscriptions() != 0 { + t.Errorf("NATS subscriptions leaked: %d active after client stop", nc.NumSubscriptions()) + } } diff --git a/tests/go/types_test.go b/tests/go/types_test.go index 609ea59e..fdfffc15 100644 --- a/tests/go/types_test.go +++ b/tests/go/types_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - wrpc "wrpc.io/go" "wrpc.io/tests/go/bindings/types/wrpc_test/integration/get_types" ) @@ -16,7 +15,7 @@ type indexReader struct { *bytes.Buffer } -func (r *indexReader) Index(path ...uint32) (wrpc.IndexReader, error) { +func (r *indexReader) Index(path ...uint32) (wrpc.IndexReadCloser, error) { panic("not implemented") }