diff --git a/crates/wit-bindgen-go/src/interface.rs b/crates/wit-bindgen-go/src/interface.rs index 19a2d386..8454e788 100644 --- a/crates/wit-bindgen-go/src/interface.rs +++ b/crates/wit-bindgen-go/src/interface.rs @@ -910,13 +910,14 @@ impl InterfaceGenerator<'_> { match ty { Some(ty) if is_list_of(self.resolve, Type::U8, ty) => { let bytes = self.deps.bytes(); + let io = self.deps.io(); let fmt = self.deps.fmt(); let slog = self.deps.slog(); let wrpc = self.deps.wrpc(); uwriteln!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReadCompleter, error) {{ + r#"func(r {wrpc}.IndexReader, path ...uint32) ({io}.Reader, error) {{ {slog}.Debug("reading byte list future status byte") status, err := r.ReadByte() if err != nil {{ @@ -924,13 +925,11 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} - return {wrpc}.NewByteStreamReader({wrpc}.NewPendingByteReader(r)), nil + return {wrpc}.NewByteStreamReader(r), nil case 1: {slog}.Debug("reading ready byte list future contents") buf, err := "# @@ -943,7 +942,7 @@ impl InterfaceGenerator<'_> { return nil, {fmt}.Errorf("failed to read ready byte list future contents: %w", err) }} {slog}.Debug("read ready byte list future contents", "len", len(buf)) - return {wrpc}.NewCompleteReader({bytes}.NewReader(buf)), nil + return {bytes}.NewReader(buf), nil default: return nil, {fmt}.Errorf("invalid byte list future status byte %d", status) }} @@ -963,7 +962,7 @@ impl InterfaceGenerator<'_> { uwrite!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReceiveCompleter["# + r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.Receiver["# ); self.print_opt_ty(ty, true); uwrite!( @@ -976,11 +975,9 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("# ); @@ -1032,13 +1029,14 @@ impl InterfaceGenerator<'_> { match element { Some(ty) if is_ty(self.resolve, Type::U8, ty) => { let bytes = self.deps.bytes(); + let io = self.deps.io(); let fmt = self.deps.fmt(); let slog = self.deps.slog(); let wrpc = self.deps.wrpc(); uwriteln!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReadCompleter, error) {{ + r#"func(r {wrpc}.IndexReader, path ...uint32) ({io}.Reader, error) {{ {slog}.Debug("reading byte stream status byte") status, err := r.ReadByte() if err != nil {{ @@ -1046,13 +1044,11 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} - return {wrpc}.NewByteStreamReader({wrpc}.NewPendingByteReader(r)), nil + return {wrpc}.NewByteStreamReader(r), nil case 1: {slog}.Debug("reading ready byte stream contents") buf, err := "# @@ -1065,7 +1061,7 @@ impl InterfaceGenerator<'_> { return nil, {fmt}.Errorf("failed to read ready byte stream contents: %w", err) }} {slog}.Debug("read ready byte stream contents", "len", len(buf)) - return {wrpc}.NewCompleteReader({bytes}.NewReader(buf)), nil + return {bytes}.NewReader(buf), nil default: return nil, {fmt}.Errorf("invalid stream status byte %d", status) }} @@ -1088,7 +1084,7 @@ impl InterfaceGenerator<'_> { uwrite!( self.src, - r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReceiveCompleter["# + r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.Receiver["# ); self.print_list(ty); uwrite!( @@ -1101,11 +1097,9 @@ impl InterfaceGenerator<'_> { }} switch status {{ case 0: - if len(path) > 0 {{ - r, err = r.Index(path...) - if err != nil {{ - return nil, {fmt}.Errorf("failed to index reader: %w", err) - }} + r, err := r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) }} var total uint32 return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("# @@ -1928,7 +1922,6 @@ impl InterfaceGenerator<'_> { fn print_write_future(&mut self, ty: &Option, name: &str, writer: &str) { match ty { Some(ty) if is_list_of(self.resolve, Type::U8, ty) => { - let bytes = self.deps.bytes(); let fmt = self.deps.fmt(); let io = self.deps.io(); let math = self.deps.math(); @@ -1936,72 +1929,41 @@ impl InterfaceGenerator<'_> { let wrpc = self.deps.wrpc(); uwrite!( self.src, - r#"func(v {wrpc}.ReadCompleter, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + r#"func(v {io}.Reader, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ + {slog}.Debug("writing byte list future `future::pending` status byte") + if err = w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready byte list future: %w", cErr) + err = {fmt}.Errorf("failed to close pending byte list future: %w", cErr) }} else {{ - slog.Warn("failed to close ready byte list future", "err", cErr) + {slog}.Warn("failed to close pending byte list future", "err", cErr) }} }} }} }}() - {slog}.Debug("writing byte list future `future::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `future::ready` byte: %w", err) - }} - {slog}.Debug("reading ready byte list future contents") - var buf {bytes}.Buffer - var n int64 - n, err = {io}.Copy(&buf, v) + {slog}.Debug("reading pending byte list future contents") + chunk, err := {io}.ReadAll(chunk) if err != nil {{ - return nil, {fmt}.Errorf("failed to read ready byte list future contents: %w", err) + return {fmt}.Errorf("failed to read pending byte list future: %w", err) }} - {slog}.Debug("writing ready byte list future contents", "len", n) - if err = {wrpc}.WriteByteList(buf.Bytes(), w); err != nil {{ - return nil, {fmt}.Errorf("failed to write ready byte list future contents: %w", err) + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("pending byte list future length of %d overflows a 32-bit integer", n) }} - return nil, nil - }} else {{ - {slog}.Debug("writing byte list future `future::pending` status byte") - if err = w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + {slog}.Debug("writing pending byte list future length", "len", n) + if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending byte list future length of %d: %w", n, err) }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending byte list future: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending byte list future", "err", cErr) - }} - }} - }} - }}() - {slog}.Debug("reading pending byte list future contents") - chunk, err := {io}.ReadAll(chunk) - if err != nil {{ - return {fmt}.Errorf("failed to read pending byte list future: %w", err) - }} - if n > {math}.MaxUint32 {{ - return {fmt}.Errorf("pending byte list future length of %d overflows a 32-bit integer", n) - }} - {slog}.Debug("writing pending byte list future length", "len", n) - if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ - return {fmt}.Errorf("failed to write pending byte list future length of %d: %w", n, err) - }} - _, err = w.Write(chunk[:n]) - if err != nil {{ - return {fmt}.Errorf("failed to write pending byte list future contents: %w", err) - }} - }}, nil - }} + _, err = w.Write(chunk[:n]) + if err != nil {{ + return {fmt}.Errorf("failed to write pending byte list future contents: %w", err) + }} + }}, nil }}({name}, {writer})"#, ); } @@ -2010,83 +1972,48 @@ impl InterfaceGenerator<'_> { let io = self.deps.io(); let slog = self.deps.slog(); let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(v {wrpc}.ReceiveCompleter[",); + uwrite!(self.src, "func(v {wrpc}.Receiver[",); self.print_opt_ty(ty, true); uwrite!( self.src, r#"], w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + {slog}.Debug("writing future `future::pending` status byte") + if err := w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready future: %w", cErr) + err = {fmt}.Errorf("failed to close pending future: %w", cErr) }} else {{ - slog.Warn("failed to close ready future", "err", cErr) + {slog}.Warn("failed to close pending future", "err", cErr) }} }} }} }}() - {slog}.Debug("writing future `future::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `future::ready` byte: %w", err) - }} - {slog}.Debug("receiving ready future contents") + {slog}.Debug("receiving outgoing pending future contents") rx, err := v.Receive() - if err != nil && err != {io}.EOF {{ - return nil, {fmt}.Errorf("failed to receive ready future contents: %w", err) + if err != nil {{ + return {fmt}.Errorf("failed to receive outgoing pending future: %w", err) }} - {slog}.Debug("writing ready future contents") - write, err := "#, + {slog}.Debug("writing pending future element") + write, err :="#, ); self.print_write_ty(ty, "rx", "w"); uwrite!( self.src, r#" if err != nil {{ - return nil, {fmt}.Errorf("failed to write ready future contents: %w", err) + return {fmt}.Errorf("failed to write pending future element: %w", err) }} - return write, nil - }} else {{ - {slog}.Debug("writing future `future::pending` status byte") - if err := w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `future::pending` byte: %w", err) + if write != nil {{ + return write(w) }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending future: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending future", "err", cErr) - }} - }} - }} - }}() - {slog}.Debug("receiving outgoing pending future contents") - rx, err := v.Receive() - if err != nil {{ - return {fmt}.Errorf("failed to receive outgoing pending future: %w", err) - }} - {slog}.Debug("writing pending future element") - write, err :="#, - ); - self.print_write_ty(ty, "rx", "w"); - uwrite!( - self.src, - r#" - if err != nil {{ - return {fmt}.Errorf("failed to write pending future element: %w", err) - }} - if write != nil {{ - return write(w) - }} - return nil - }}, nil - }} + return nil + }}, nil }}({name}, {writer})"#, ); } @@ -2097,7 +2024,6 @@ impl InterfaceGenerator<'_> { fn print_write_stream(&mut self, Stream { element, .. }: &Stream, name: &str, writer: &str) { match element { Some(ty) if is_ty(self.resolve, Type::U8, ty) => { - let bytes = self.deps.bytes(); let fmt = self.deps.fmt(); let io = self.deps.io(); let math = self.deps.math(); @@ -2105,85 +2031,54 @@ impl InterfaceGenerator<'_> { let wrpc = self.deps.wrpc(); uwrite!( self.src, - r#"func(v {wrpc}.ReadCompleter, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + r#"func(v {io}.Reader, w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ + {slog}.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready byte stream: %w", cErr) + err = {fmt}.Errorf("failed to close pending byte stream: %w", cErr) }} else {{ - slog.Warn("failed to close ready byte stream", "err", cErr) + {slog}.Warn("failed to close pending byte stream", "err", cErr) }} }} }} }}() - {slog}.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `stream::ready` byte: %w", err) - }} - {slog}.Debug("reading ready byte stream contents") - var buf {bytes}.Buffer - var n int64 - n, err = {io}.Copy(&buf, v) - if err != nil {{ - return nil, {fmt}.Errorf("failed to read ready byte stream contents: %w", err) - }} - {slog}.Debug("writing ready byte stream contents", "len", n) - if err = {wrpc}.WriteByteList(buf.Bytes(), w); err != nil {{ - return nil, {fmt}.Errorf("failed to write ready byte stream contents: %w", err) - }} - return nil, nil - }} else {{ - {slog}.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending byte stream: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending byte stream", "err", cErr) - }} - }} - }} - }}() - chunk := make([]byte, 8096) - for {{ - var end bool - {slog}.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == {io}.EOF {{ - end = true - {slog}.Debug("pending byte stream reached EOF") - }} else if err != nil {{ - return {fmt}.Errorf("failed to read pending byte stream chunk: %w", err) - }} - if n > {math}.MaxUint32 {{ - return {fmt}.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - }} - {slog}.Debug("writing pending byte stream chunk length", "len", n) - if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ - return {fmt}.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - }} - _, err = w.Write(chunk[:n]) - if err != nil {{ - return {fmt}.Errorf("failed to write pending byte stream chunk contents: %w", err) - }} - if end {{ - if err := w.WriteByte(0); err != nil {{ - return {fmt}.Errorf("failed to write pending byte stream end byte: %w", err) - }} - return nil + chunk := make([]byte, 8096) + for {{ + var end bool + {slog}.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == {io}.EOF {{ + end = true + {slog}.Debug("pending byte stream reached EOF") + }} else if err != nil {{ + return {fmt}.Errorf("failed to read pending byte stream chunk: %w", err) + }} + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + }} + {slog}.Debug("writing pending byte stream chunk length", "len", n) + if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + }} + _, err = w.Write(chunk[:n]) + if err != nil {{ + return {fmt}.Errorf("failed to write pending byte stream chunk contents: %w", err) + }} + if end {{ + if err := w.WriteByte(0); err != nil {{ + return {fmt}.Errorf("failed to write pending byte stream end byte: %w", err) }} + return nil }} - }}, nil - }} + }} + }}, nil }}({name}, {writer})"#, ); } @@ -2196,140 +2091,91 @@ impl InterfaceGenerator<'_> { let slog = self.deps.slog(); let sync = self.deps.sync(); let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(v {wrpc}.ReceiveCompleter[",); + uwrite!(self.src, "func(v {wrpc}.Receiver[",); self.print_list(ty); uwrite!( self.src, r#"], w interface {{ {io}.ByteWriter; {io}.Writer }}) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.IsComplete() {{ + {slog}.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ if cErr := body.Close(); cErr != nil {{ if err == nil {{ - err = {fmt}.Errorf("failed to close ready stream: %w", cErr) + err = {fmt}.Errorf("failed to close pending stream: %w", cErr) }} else {{ - slog.Warn("failed to close ready stream", "err", cErr) + {slog}.Warn("failed to close pending stream", "err", cErr) }} }} }} }}() - {slog}.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil {{ - return nil, {fmt}.Errorf("failed to write `stream::ready` byte: %w", err) - }} - {slog}.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != {io}.EOF {{ - return nil, {fmt}.Errorf("failed to receive ready stream contents: %w", err) - }} - if err != {io}.EOF && len(vs) > 0 {{ - for {{ - chunk, err := v.Receive() - if err != nil && err != {io}.EOF {{ - return nil, {fmt}.Errorf("failed to receive ready stream contents: %w", err) - }} - if len(chunk) > 0 {{ - vs = append(vs, chunk...) - }} - if err == {io}.EOF {{ - break - }} + var wg {sync}.WaitGroup + var wgErr {atomic}.Value + var total uint32 + for {{ + var end bool + {slog}.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == {io}.EOF {{ + end = true + {slog}.Debug("outgoing pending stream reached EOF") + }} else if err != nil {{ + return {fmt}.Errorf("failed to receive outgoing pending stream chunk: %w", err) }} - }} - {slog}.Debug("writing ready stream contents", "len", len(vs)) - write, err := "#, - ); - self.print_write_list(ty, "vs", "w"); - uwrite!( - self.src, - r#" - if err != nil {{ - return nil, {fmt}.Errorf("failed to write ready stream contents: %w", err) - }} - return write, nil - }} else {{ - {slog}.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil {{ - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - }} - return func(w {wrpc}.IndexWriter) (err error) {{ - defer func() {{ - body, ok := v.({io}.Closer) - if ok {{ - if cErr := body.Close(); cErr != nil {{ - if err == nil {{ - err = {fmt}.Errorf("failed to close pending stream: %w", cErr) - }} else {{ - {slog}.Warn("failed to close pending stream", "err", cErr) - }} - }} - }} - }}() - var wg {sync}.WaitGroup - var wgErr {atomic}.Value - var total uint32 - for {{ - var end bool - {slog}.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == {io}.EOF {{ - end = true - {slog}.Debug("outgoing pending stream reached EOF") - }} else if err != nil {{ - return {fmt}.Errorf("failed to receive outgoing pending stream chunk: %w", err) - }} - if n > {math}.MaxUint32 {{ - return {fmt}.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - }} - if {math}.MaxUint32 - uint32(n) < total {{ - return {errors}.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - }} - {slog}.Debug("writing pending stream chunk length", "len", n) - if err = {wrpc}.WriteUint32(uint32(n), w); err != nil {{ - return {fmt}.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - }} - for _, v := range chunk {{ - {slog}.Debug("writing pending stream element", "i", total) - write, err :="#, + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) + }} + if {math}.MaxUint32 - uint32(n) < total {{ + return {errors}.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") + }} + {slog}.Debug("writing pending stream chunk length", "len", n) + if err = {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending stream chunk length of %d: %w", n, err) + }} + for _, v := range chunk {{ + {slog}.Debug("writing pending stream element", "i", total) + write, err :="#, ); self.print_write_ty(ty, "v", "w"); uwrite!( self.src, r#" + if err != nil {{ + return {fmt}.Errorf("failed to write pending stream chunk element %d: %w", total, err) + }} + if write != nil {{ + wg.Add(1) + w, err := w.Index(total) if err != nil {{ - return {fmt}.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return {fmt}.Errorf("failed to index writer: %w", err) }} - if write != nil {{ - wg.Add(1) - w, err := w.Index(total) - if err != nil {{ - return {fmt}.Errorf("failed to index writer: %w", err) + go func() {{ + defer wg.Done() + if err := write(w); err != nil {{ + wgErr.Store(err) }} - go func() {{ - defer wg.Done() - if err := write(w); err != nil {{ - wgErr.Store(err) - }} - }}() - }} - total++ + }}() }} - if end {{ - if err := w.WriteByte(0); err != nil {{ - return {fmt}.Errorf("failed to write pending stream end byte: %w", err) - }} - wg.Wait() - err := wgErr.Load() - if err == nil {{ - return nil - }} - return err.(error) + total++ + }} + if end {{ + if err := w.WriteByte(0); err != nil {{ + return {fmt}.Errorf("failed to write pending stream end byte: %w", err) + }} + wg.Wait() + err := wgErr.Load() + if err == nil {{ + return nil }} + return err.(error) }} - }}, nil - }} + }} + }}, nil }}({name}, {writer})"#, ); } @@ -3298,23 +3144,36 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) { } fn print_future(&mut self, ty: &Option) { - let wrpc = self.deps.wrpc(); - self.push_str(wrpc); - self.push_str(".ReceiveCompleter["); - let ty = ty.expect("futures with no element types are not supported"); - self.print_opt_ty(&ty, true); - self.push_str("]"); + match ty { + Some(ty) if is_ty(self.resolve, Type::U8, ty) => { + let io = self.deps.io(); + self.push_str(io); + self.push_str(".Reader"); + } + Some(ty) => { + let wrpc = self.deps.wrpc(); + self.push_str(wrpc); + self.push_str(".Receiver["); + self.print_opt_ty(ty, true); + self.push_str("]"); + } + None => { + panic!("futures with no element types are not supported") + } + } } fn print_stream(&mut self, Stream { element, .. }: &Stream) { - let wrpc = self.deps.wrpc(); - self.push_str(wrpc); match element { Some(ty) if is_ty(self.resolve, Type::U8, ty) => { - self.push_str(".ReadCompleter"); + let io = self.deps.io(); + self.push_str(io); + self.push_str(".Reader"); } Some(ty) => { - self.push_str(".ReceiveCompleter["); + let wrpc = self.deps.wrpc(); + self.push_str(wrpc); + self.push_str(".Receiver["); self.print_list(ty); self.push_str("]"); } diff --git a/examples/go/hello-client/cmd/hello-client-nats/main.go b/examples/go/hello-client/cmd/hello-client-nats/main.go index 448e5a77..88a20f4d 100644 --- a/examples/go/hello-client/cmd/hello-client-nats/main.go +++ b/examples/go/hello-client/cmd/hello-client-nats/main.go @@ -7,9 +7,9 @@ import ( "log/slog" "os" + "github.com/nats-io/nats.go" "wrpc.io/examples/go/hello-client/bindings/wrpc_examples/hello/handler" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) func run() (err error) { diff --git a/examples/go/hello-server/cmd/hello-server-nats/main.go b/examples/go/hello-server/cmd/hello-server-nats/main.go index ba296244..b608b974 100644 --- a/examples/go/hello-server/cmd/hello-server-nats/main.go +++ b/examples/go/hello-server/cmd/hello-server-nats/main.go @@ -9,9 +9,9 @@ import ( "os/signal" "syscall" + "github.com/nats-io/nats.go" server "wrpc.io/examples/go/hello-server/bindings" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) type Handler struct{} diff --git a/examples/go/resources-server/cmd/resources-server-nats/main.go b/examples/go/resources-server/cmd/resources-server-nats/main.go index 910a9431..a6edd945 100644 --- a/examples/go/resources-server/cmd/resources-server-nats/main.go +++ b/examples/go/resources-server/cmd/resources-server-nats/main.go @@ -10,12 +10,12 @@ import ( "sync" "syscall" + "github.com/google/uuid" + "github.com/nats-io/nats.go" server "wrpc.io/examples/go/resources-server/bindings" "wrpc.io/examples/go/resources-server/bindings/exports/wrpc_examples/resources/resources" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" - "github.com/google/uuid" - "github.com/nats-io/nats.go" ) type Foo struct { diff --git a/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go b/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go index 714b98e7..693d680b 100644 --- a/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go +++ b/examples/go/streams-client/bindings/wrpc_examples/streams/handler/bindings.wrpc.go @@ -16,8 +16,8 @@ import ( ) type Req struct { - Numbers wrpc.ReceiveCompleter[[]uint64] - Bytes wrpc.ReadCompleter + Numbers wrpc.Receiver[[]uint64] + Bytes io.Reader } func (v *Req) String() string { return "Req" } @@ -25,196 +25,91 @@ func (v *Req) String() string { return "Req" } func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) slog.Debug("writing field", "name", "numbers") - write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { + write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready stream: %w", cErr) + err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { - slog.Warn("failed to close ready stream", "err", cErr) + slog.Warn("failed to close pending stream", "err", cErr) } } } }() - slog.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if err != io.EOF && len(vs) > 0 { - for { - chunk, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if len(chunk) > 0 { - vs = append(vs, chunk...) - } - if err == io.EOF { - break - } + var wg sync.WaitGroup + var wgErr atomic.Value + var total uint32 + for { + var end bool + slog.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == io.EOF { + end = true + slog.Debug("outgoing pending stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } - } - slog.Debug("writing ready stream contents", "len", len(vs)) - write, err := func(v []uint64, w interface { - io.ByteWriter - io.Writer - }) (write func(wrpc.IndexWriter) error, err error) { - n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) + return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing list length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) + if math.MaxUint32-uint32(n) < total { + return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") } - slog.Debug("writing list elements") - writes := make(map[uint32]func(wrpc.IndexWriter) error, n) - for i, e := range v { + slog.Debug("writing pending stream chunk length", "len", n) + if err = wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) + } + for _, v := range chunk { + slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err - }(e, w) + }(v, w) if err != nil { - return nil, fmt.Errorf("failed to write list element %d: %w", i, err) + return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { - writes[uint32(i)] = write - } - } - if len(writes) > 0 { - return func(w wrpc.IndexWriter) error { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - }, nil - } - return nil, nil - }(vs, w) - if err != nil { - return nil, fmt.Errorf("failed to write ready stream contents: %w", err) - } - return write, nil - } else { - slog.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending stream: %w", cErr) - } else { - slog.Warn("failed to close pending stream", "err", cErr) - } - } - } - }() - var wg sync.WaitGroup - var wgErr atomic.Value - var total uint32 - for { - var end bool - slog.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == io.EOF { - end = true - slog.Debug("outgoing pending stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - } - if math.MaxUint32-uint32(n) < total { - return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - } - slog.Debug("writing pending stream chunk length", "len", n) - if err = wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - } - for _, v := range chunk { - slog.Debug("writing pending stream element", "i", total) - write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err = w.Write(b[:i]) - return err - }(v, w) + wg.Add(1) + w, err := w.Index(total) if err != nil { - return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return fmt.Errorf("failed to index writer: %w", err) } - if write != nil { - wg.Add(1) - w, err := w.Index(total) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + go func() { + defer wg.Done() + if err := write(w); err != nil { + wgErr.Store(err) } - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - total++ + }() } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending stream end byte: %w", err) - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) + total++ + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending stream end byte: %w", err) } + wg.Wait() + err := wgErr.Load() + if err == nil { + return nil + } + return err.(error) } - }, nil - } + } + }, nil }(v.Numbers, w) if err != nil { return nil, fmt.Errorf("failed to write `numbers` field: %w", err) @@ -223,88 +118,57 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err writes[0] = write0 } slog.Debug("writing field", "name", "bytes") - write1, err := func(v wrpc.ReadCompleter, w interface { + write1, err := func(v io.Reader, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready byte stream: %w", cErr) + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { - slog.Warn("failed to close ready byte stream", "err", cErr) + slog.Warn("failed to close pending byte stream", "err", cErr) } } } }() - slog.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("reading ready byte stream contents") - var buf bytes.Buffer - var n int64 - n, err = io.Copy(&buf, v) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - if err = wrpc.WriteByteList(buf.Bytes(), w); err != nil { - return nil, fmt.Errorf("failed to write ready byte stream contents: %w", err) - } - return nil, nil - } else { - slog.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending byte stream: %w", cErr) - } else { - slog.Warn("failed to close pending byte stream", "err", cErr) - } - } - } - }() - chunk := make([]byte, 8096) - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil + chunk := make([]byte, 8096) + for { + var end bool + slog.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == io.EOF { + end = true + slog.Debug("pending byte stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to read pending byte stream chunk: %w", err) + } + if n > math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } + return nil } - }, nil - } + } + }, nil }(v.Bytes, w) if err != nil { return nil, fmt.Errorf("failed to write `bytes` field: %w", err) @@ -341,7 +205,7 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err } return nil, nil } -func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.ReceiveCompleter[[]uint64], r1__ wrpc.ReadCompleter, writeErrs__ <-chan error, err__ error) { +func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receiver[[]uint64], r1__ io.Reader, writeErrs__ <-chan error, err__ error) { var buf__ bytes.Buffer var writeCount__ uint32 write0__, err__ := (r).WriteToIndex(&buf__) @@ -400,7 +264,7 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive if cErr__ := w__.Close(); cErr__ != nil { slog.DebugContext(ctx__, "failed to close outgoing stream", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", cErr__) } - r0__, err__ = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReceiveCompleter[[]uint64], error) { + r0__, err__ = func(r wrpc.IndexReader, path ...uint32) (wrpc.Receiver[[]uint64], error) { slog.Debug("reading stream status byte") status, err := r.ReadByte() if err != nil { @@ -408,11 +272,9 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } var total uint32 return wrpc.NewDecodeReceiver(r, func(r wrpc.IndexReader) ([]uint64, error) { @@ -553,7 +415,7 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive err__ = fmt.Errorf("failed to read result 0: %w", err__) return } - r1__, err__ = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReadCompleter, error) { + r1__, err__ = func(r wrpc.IndexReader, path ...uint32) (io.Reader, error) { slog.Debug("reading byte stream status byte") status, err := r.ReadByte() if err != nil { @@ -561,13 +423,11 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } - return wrpc.NewByteStreamReader(wrpc.NewPendingByteReader(r)), nil + return wrpc.NewByteStreamReader(r), nil case 1: slog.Debug("reading ready byte stream contents") buf, err := @@ -608,7 +468,7 @@ func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receive return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) } slog.Debug("read ready byte stream contents", "len", len(buf)) - return wrpc.NewCompleteReader(bytes.NewReader(buf)), nil + return bytes.NewReader(buf), nil default: return nil, fmt.Errorf("invalid stream status byte %d", status) } diff --git a/examples/go/streams-client/cmd/streams-client-nats/main.go b/examples/go/streams-client/cmd/streams-client-nats/main.go index e0ad925a..986ef614 100644 --- a/examples/go/streams-client/cmd/streams-client-nats/main.go +++ b/examples/go/streams-client/cmd/streams-client-nats/main.go @@ -10,9 +10,9 @@ import ( "sync" "time" + "github.com/nats-io/nats.go" "wrpc.io/examples/go/streams-client/bindings/wrpc_examples/streams/handler" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) type ThrottleStream[T any] struct { diff --git a/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go b/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go index 8020e25a..2ed29a4b 100644 --- a/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go +++ b/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go @@ -16,8 +16,8 @@ import ( ) type Req struct { - Numbers wrpc.ReceiveCompleter[[]uint64] - Bytes wrpc.ReadCompleter + Numbers wrpc.Receiver[[]uint64] + Bytes io.Reader } func (v *Req) String() string { return "Req" } @@ -25,196 +25,91 @@ func (v *Req) String() string { return "Req" } func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) slog.Debug("writing field", "name", "numbers") - write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { + write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready stream: %w", cErr) + err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { - slog.Warn("failed to close ready stream", "err", cErr) + slog.Warn("failed to close pending stream", "err", cErr) } } } }() - slog.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if err != io.EOF && len(vs) > 0 { - for { - chunk, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if len(chunk) > 0 { - vs = append(vs, chunk...) - } - if err == io.EOF { - break - } + var wg sync.WaitGroup + var wgErr atomic.Value + var total uint32 + for { + var end bool + slog.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == io.EOF { + end = true + slog.Debug("outgoing pending stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } - } - slog.Debug("writing ready stream contents", "len", len(vs)) - write, err := func(v []uint64, w interface { - io.ByteWriter - io.Writer - }) (write func(wrpc.IndexWriter) error, err error) { - n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) + return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) + } + if math.MaxUint32-uint32(n) < total { + return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing list length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) + slog.Debug("writing pending stream chunk length", "len", n) + if err = wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) } - slog.Debug("writing list elements") - writes := make(map[uint32]func(wrpc.IndexWriter) error, n) - for i, e := range v { + for _, v := range chunk { + slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err - }(e, w) + }(v, w) if err != nil { - return nil, fmt.Errorf("failed to write list element %d: %w", i, err) + return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { - writes[uint32(i)] = write - } - } - if len(writes) > 0 { - return func(w wrpc.IndexWriter) error { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - }, nil - } - return nil, nil - }(vs, w) - if err != nil { - return nil, fmt.Errorf("failed to write ready stream contents: %w", err) - } - return write, nil - } else { - slog.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending stream: %w", cErr) - } else { - slog.Warn("failed to close pending stream", "err", cErr) - } - } - } - }() - var wg sync.WaitGroup - var wgErr atomic.Value - var total uint32 - for { - var end bool - slog.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == io.EOF { - end = true - slog.Debug("outgoing pending stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - } - if math.MaxUint32-uint32(n) < total { - return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - } - slog.Debug("writing pending stream chunk length", "len", n) - if err = wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - } - for _, v := range chunk { - slog.Debug("writing pending stream element", "i", total) - write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err = w.Write(b[:i]) - return err - }(v, w) + wg.Add(1) + w, err := w.Index(total) if err != nil { - return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return fmt.Errorf("failed to index writer: %w", err) } - if write != nil { - wg.Add(1) - w, err := w.Index(total) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + go func() { + defer wg.Done() + if err := write(w); err != nil { + wgErr.Store(err) } - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - total++ + }() } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending stream end byte: %w", err) - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) + total++ + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending stream end byte: %w", err) + } + wg.Wait() + err := wgErr.Load() + if err == nil { + return nil } + return err.(error) } - }, nil - } + } + }, nil }(v.Numbers, w) if err != nil { return nil, fmt.Errorf("failed to write `numbers` field: %w", err) @@ -223,88 +118,57 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err writes[0] = write0 } slog.Debug("writing field", "name", "bytes") - write1, err := func(v wrpc.ReadCompleter, w interface { + write1, err := func(v io.Reader, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready byte stream: %w", cErr) + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { - slog.Warn("failed to close ready byte stream", "err", cErr) + slog.Warn("failed to close pending byte stream", "err", cErr) } } } }() - slog.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("reading ready byte stream contents") - var buf bytes.Buffer - var n int64 - n, err = io.Copy(&buf, v) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - if err = wrpc.WriteByteList(buf.Bytes(), w); err != nil { - return nil, fmt.Errorf("failed to write ready byte stream contents: %w", err) - } - return nil, nil - } else { - slog.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending byte stream: %w", cErr) - } else { - slog.Warn("failed to close pending byte stream", "err", cErr) - } - } - } - }() - chunk := make([]byte, 8096) - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil + chunk := make([]byte, 8096) + for { + var end bool + slog.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == io.EOF { + end = true + slog.Debug("pending byte stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to read pending byte stream chunk: %w", err) + } + if n > math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } + return nil } - }, nil - } + } + }, nil }(v.Bytes, w) if err != nil { return nil, fmt.Errorf("failed to write `bytes` field: %w", err) @@ -343,7 +207,7 @@ func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, err } type Handler interface { - Echo(ctx__ context.Context, r *Req) (wrpc.ReceiveCompleter[[]uint64], wrpc.ReadCompleter, error) + Echo(ctx__ context.Context, r *Req) (wrpc.Receiver[[]uint64], io.Reader, error) } func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { @@ -368,7 +232,7 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { v := &Req{} var err error slog.Debug("reading field", "name", "numbers") - v.Numbers, err = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReceiveCompleter[[]uint64], error) { + v.Numbers, err = func(r wrpc.IndexReader, path ...uint32) (wrpc.Receiver[[]uint64], error) { slog.Debug("reading stream status byte") status, err := r.ReadByte() if err != nil { @@ -376,11 +240,9 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } var total uint32 return wrpc.NewDecodeReceiver(r, func(r wrpc.IndexReader) ([]uint64, error) { @@ -521,7 +383,7 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to read `numbers` field: %w", err) } slog.Debug("reading field", "name", "bytes") - v.Bytes, err = func(r wrpc.IndexReader, path ...uint32) (wrpc.ReadCompleter, error) { + v.Bytes, err = func(r wrpc.IndexReader, path ...uint32) (io.Reader, error) { slog.Debug("reading byte stream status byte") status, err := r.ReadByte() if err != nil { @@ -529,13 +391,11 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } switch status { case 0: - if len(path) > 0 { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to index reader: %w", err) - } + r, err := r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) } - return wrpc.NewByteStreamReader(wrpc.NewPendingByteReader(r)), nil + return wrpc.NewByteStreamReader(r), nil case 1: slog.Debug("reading ready byte stream contents") buf, err := @@ -576,7 +436,7 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) } slog.Debug("read ready byte stream contents", "len", len(buf)) - return wrpc.NewCompleteReader(bytes.NewReader(buf)), nil + return bytes.NewReader(buf), nil default: return nil, fmt.Errorf("invalid stream status byte %d", status) } @@ -607,196 +467,91 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) - write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { + write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready stream: %w", cErr) + err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { - slog.Warn("failed to close ready stream", "err", cErr) + slog.Warn("failed to close pending stream", "err", cErr) } } } }() - slog.Debug("writing stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("receiving ready stream contents") - vs, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if err != io.EOF && len(vs) > 0 { - for { - chunk, err := v.Receive() - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to receive ready stream contents: %w", err) - } - if len(chunk) > 0 { - vs = append(vs, chunk...) - } - if err == io.EOF { - break - } + var wg sync.WaitGroup + var wgErr atomic.Value + var total uint32 + for { + var end bool + slog.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == io.EOF { + end = true + slog.Debug("outgoing pending stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } - } - slog.Debug("writing ready stream contents", "len", len(vs)) - write, err := func(v []uint64, w interface { - io.ByteWriter - io.Writer - }) (write func(wrpc.IndexWriter) error, err error) { - n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) + return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing list length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) + if math.MaxUint32-uint32(n) < total { + return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") + } + slog.Debug("writing pending stream chunk length", "len", n) + if err = wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) } - slog.Debug("writing list elements") - writes := make(map[uint32]func(wrpc.IndexWriter) error, n) - for i, e := range v { + for _, v := range chunk { + slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err - }(e, w) + }(v, w) if err != nil { - return nil, fmt.Errorf("failed to write list element %d: %w", i, err) + return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { - writes[uint32(i)] = write - } - } - if len(writes) > 0 { - return func(w wrpc.IndexWriter) error { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - }, nil - } - return nil, nil - }(vs, w) - if err != nil { - return nil, fmt.Errorf("failed to write ready stream contents: %w", err) - } - return write, nil - } else { - slog.Debug("writing stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending stream: %w", cErr) - } else { - slog.Warn("failed to close pending stream", "err", cErr) - } - } - } - }() - var wg sync.WaitGroup - var wgErr atomic.Value - var total uint32 - for { - var end bool - slog.Debug("receiving outgoing pending stream contents") - chunk, err := v.Receive() - n := len(chunk) - if n == 0 || err == io.EOF { - end = true - slog.Debug("outgoing pending stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) - } - if math.MaxUint32-uint32(n) < total { - return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") - } - slog.Debug("writing pending stream chunk length", "len", n) - if err = wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) - } - for _, v := range chunk { - slog.Debug("writing pending stream element", "i", total) - write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err = w.Write(b[:i]) - return err - }(v, w) + wg.Add(1) + w, err := w.Index(total) if err != nil { - return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) + return fmt.Errorf("failed to index writer: %w", err) } - if write != nil { - wg.Add(1) - w, err := w.Index(total) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + go func() { + defer wg.Done() + if err := write(w); err != nil { + wgErr.Store(err) } - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - total++ + }() } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending stream end byte: %w", err) - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) + total++ + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending stream end byte: %w", err) } + wg.Wait() + err := wgErr.Load() + if err == nil { + return nil + } + return err.(error) } - }, nil - } + } + }, nil }(r0, &buf) if err != nil { slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:streams/handler", "name", "echo", "err", err) @@ -805,88 +560,57 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { if write0 != nil { writes[0] = write0 } - write1, err := func(v wrpc.ReadCompleter, w interface { + write1, err := func(v io.Reader, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { - if v.IsComplete() { + slog.Debug("writing byte stream `stream::pending` status byte") + if err = w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return func(w wrpc.IndexWriter) (err error) { defer func() { body, ok := v.(io.Closer) if ok { if cErr := body.Close(); cErr != nil { if err == nil { - err = fmt.Errorf("failed to close ready byte stream: %w", cErr) + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { - slog.Warn("failed to close ready byte stream", "err", cErr) + slog.Warn("failed to close pending byte stream", "err", cErr) } } } }() - slog.Debug("writing byte stream `stream::ready` status byte") - if err = w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - slog.Debug("reading ready byte stream contents") - var buf bytes.Buffer - var n int64 - n, err = io.Copy(&buf, v) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - if err = wrpc.WriteByteList(buf.Bytes(), w); err != nil { - return nil, fmt.Errorf("failed to write ready byte stream contents: %w", err) - } - return nil, nil - } else { - slog.Debug("writing byte stream `stream::pending` status byte") - if err = w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return func(w wrpc.IndexWriter) (err error) { - defer func() { - body, ok := v.(io.Closer) - if ok { - if cErr := body.Close(); cErr != nil { - if err == nil { - err = fmt.Errorf("failed to close pending byte stream: %w", cErr) - } else { - slog.Warn("failed to close pending byte stream", "err", cErr) - } - } - } - }() - chunk := make([]byte, 8096) - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.Read(chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := wrpc.WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil + chunk := make([]byte, 8096) + for { + var end bool + slog.Debug("reading pending byte stream contents") + n, err := v.Read(chunk) + if err == io.EOF { + end = true + slog.Debug("pending byte stream reached EOF") + } else if err != nil { + return fmt.Errorf("failed to read pending byte stream chunk: %w", err) + } + if n > math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := wrpc.WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } + return nil } - }, nil - } + } + }, nil }(r1, &buf) if err != nil { slog.WarnContext(ctx, "failed to write result value", "i", 1, "wrpc-examples:streams/handler", "name", "echo", "err", err) diff --git a/examples/go/streams-server/cmd/streams-server-nats/main.go b/examples/go/streams-server/cmd/streams-server-nats/main.go index e0ae1cc7..0e545479 100644 --- a/examples/go/streams-server/cmd/streams-server-nats/main.go +++ b/examples/go/streams-server/cmd/streams-server-nats/main.go @@ -3,22 +3,23 @@ package main import ( "context" "fmt" + "io" "log" "log/slog" "os" "os/signal" "syscall" + "github.com/nats-io/nats.go" server "wrpc.io/examples/go/streams-server/bindings" "wrpc.io/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" - "github.com/nats-io/nats.go" ) type Handler struct{} -func (Handler) Echo(ctx context.Context, req *handler.Req) (wrpc.ReceiveCompleter[[]uint64], wrpc.ReadCompleter, error) { +func (Handler) Echo(ctx context.Context, req *handler.Req) (wrpc.Receiver[[]uint64], io.Reader, error) { slog.InfoContext(ctx, "handling `wrpc-examples:streams/handler.echo`") return req.Numbers, req.Bytes, nil } diff --git a/go/future.go b/go/future.go index f3dd5167..07eff96b 100644 --- a/go/future.go +++ b/go/future.go @@ -24,14 +24,14 @@ func ReadFutureStatus(r ByteReader) (bool, error) { } // ReadFuture reads a future from `r` -func ReadFuture[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[T], error) { +func ReadFuture[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (Receiver[T], error) { slog.Debug("reading future status byte") ok, err := ReadFutureStatus(r) if err != nil { return nil, err } if !ok { - r, err = r.Index(path...) + r, err := r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to get future reader: %w", err) } diff --git a/go/nats/client.go b/go/nats/client.go index 88a332d4..5316c1a8 100644 --- a/go/nats/client.go +++ b/go/nats/client.go @@ -8,8 +8,8 @@ import ( "sync" "sync/atomic" - wrpc "wrpc.io/go" "github.com/nats-io/nats.go" + wrpc "wrpc.io/go" ) // Client is a thin wrapper around *nats.Conn, which is able to serve and invoke wRPC functions @@ -170,7 +170,7 @@ func (w *paramWriter) WriteByte(b byte) error { return nil } -func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriter, error) { +func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriteCloser, error) { return ¶mWriter{ nc: w.nc, init: w.init, @@ -231,7 +231,7 @@ func (w *resultWriter) Close() error { return nil } -func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriter, error) { +func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriteCloser, error) { return &resultWriter{nc: w.nc, tx: indexPath(w.tx, path...)}, nil } @@ -304,7 +304,7 @@ func (r *streamReader) Close() (err error) { return nil } -func (r *streamReader) Index(path ...uint32) (wrpc.IndexReader, error) { +func (r *streamReader) Index(path ...uint32) (wrpc.IndexReadCloser, error) { r.nestRef.Add(1) s := indexPath(r.path, path...) r.nestMu.Lock() diff --git a/go/stream.go b/go/stream.go index 05175853..7f6b1183 100644 --- a/go/stream.go +++ b/go/stream.go @@ -10,54 +10,6 @@ import ( "math" ) -type CompleteReader struct { - io.Reader -} - -func (*CompleteReader) IsComplete() bool { - return true -} - -func NewCompleteReader(r io.Reader) *CompleteReader { - return &CompleteReader{r} -} - -type CompleteByteReader struct { - ByteReader -} - -func (*CompleteByteReader) IsComplete() bool { - return true -} - -func NewCompleteByteReader(r ByteReader) *CompleteByteReader { - return &CompleteByteReader{r} -} - -type PendingReader struct { - io.Reader -} - -func (*PendingReader) IsComplete() bool { - return false -} - -func NewPendingReader(r io.Reader) *PendingReader { - return &PendingReader{r} -} - -type PendingByteReader struct { - ByteReader -} - -func (*PendingByteReader) IsComplete() bool { - return false -} - -func NewPendingByteReader(r ByteReader) *PendingByteReader { - return &PendingByteReader{r} -} - type ByteStreamWriter struct { r io.Reader chunk []byte @@ -71,9 +23,9 @@ func (v *ByteStreamWriter) WriteTo(w ByteWriter) (err error) { defer func() { if fErr := buf.Flush(); fErr != nil { if err == nil { - err = fmt.Errorf("failed to flush writer: %w", fErr) + err = fmt.Errorf("failed to flush pending byte stream writer: %w", fErr) } else { - slog.Warn("failed to flush writer", "err", fErr) + slog.Warn("failed to flush pending byte stream writer", "err", fErr) } } }() @@ -108,7 +60,7 @@ func (v *ByteStreamWriter) WriteTo(w ByteWriter) (err error) { } type ByteStreamReader struct { - r ByteReadCompleter + r ByteReadCloser buf uint32 } @@ -140,19 +92,11 @@ func (r *ByteStreamReader) Read(p []byte) (int, error) { return rn, nil } -func (r *ByteStreamReader) IsComplete() bool { - return r.r.IsComplete() -} - func (r *ByteStreamReader) Close() error { - c, ok := r.r.(io.Closer) - if ok { - return c.Close() - } - return nil + return r.r.Close() } -func NewByteStreamReader(r ByteReadCompleter) *ByteStreamReader { +func NewByteStreamReader(r ByteReadCloser) *ByteStreamReader { return &ByteStreamReader{ r: r, } @@ -177,18 +121,18 @@ func ReadStreamStatus(r ByteReader) (bool, error) { } // ReadByteStream reads a stream of bytes from `r` -func ReadByteStream(r IndexReader, path ...uint32) (ReadCompleter, error) { +func ReadByteStream(r IndexReader, path ...uint32) (io.Reader, error) { slog.Debug("reading byte stream status byte") ok, err := ReadStreamStatus(r) if err != nil { return nil, err } if !ok { - r, err = r.Index(path...) + r, err := r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to index reader: %w", err) } - return NewByteStreamReader(NewPendingByteReader(r)), nil + return NewByteStreamReader(r), nil } slog.Debug("reading ready byte stream") buf, err := ReadByteList(r) @@ -196,18 +140,18 @@ func ReadByteStream(r IndexReader, path ...uint32) (ReadCompleter, error) { return nil, fmt.Errorf("failed to read bytes: %w", err) } slog.Debug("read ready byte stream", "len", len(buf)) - return NewCompleteReader(bytes.NewReader(buf)), nil + return bytes.NewReader(buf), nil } // ReadStream reads a stream from `r` -func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[[]T], error) { +func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (Receiver[[]T], error) { slog.Debug("reading stream status byte") ok, err := ReadStreamStatus(r) if err != nil { return nil, err } if !ok { - r, err = r.Index(path...) + r, err := r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to index reader: %w", err) } @@ -239,24 +183,27 @@ func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...ui return NewCompleteReceiver(vs), nil } -func WriteByteStream(r ReadCompleter, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) { - if r.IsComplete() { - slog.Debug("writing byte stream `stream::ready` status byte") - if err := w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - var buf bytes.Buffer - slog.Debug("reading ready byte stream contents") - n, err := io.CopyBuffer(&buf, r, chunk) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - return nil, WriteByteList(buf.Bytes(), w) - } +func WriteByteStream(r io.Reader, w IndexWriter, chunk []byte, path ...uint32) (err error) { slog.Debug("writing byte stream `stream::pending` status byte") if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + return fmt.Errorf("failed to write `stream::pending` byte: %w", err) } - return &ByteStreamWriter{r, chunk}, nil + wi, err := w.Index(path...) + if err != nil { + return fmt.Errorf("failed to index reader: %w", err) + } + s := &ByteStreamWriter{r, chunk} + defer func() { + if cErr := wi.Close(); cErr != nil { + if err == nil { + err = fmt.Errorf("failed to close pending byte stream: %w", cErr) + } else { + slog.Warn("failed to close pending byte stream", "err", cErr) + } + } + }() + if err := s.WriteTo(wi); err != nil { + return fmt.Errorf("failed to write stream contents: %w", err) + } + return nil } diff --git a/go/wrpc.go b/go/wrpc.go index e6a34e3c..a611886e 100644 --- a/go/wrpc.go +++ b/go/wrpc.go @@ -57,14 +57,14 @@ type IndexReader interface { io.Reader io.ByteReader - Index[IndexReader] + Index[IndexReadCloser] } type IndexWriter interface { io.Writer io.ByteWriter - Index[IndexWriter] + Index[IndexWriteCloser] } type IndexReadCloser interface { @@ -87,43 +87,13 @@ type ByteReader interface { io.Reader } -type Completer interface { - IsComplete() bool -} - type Receiver[T any] interface { Receive() (T, error) } -type ReceiveCompleter[T any] interface { - Receiver[T] - Completer -} - -type ReadCompleter interface { - io.Reader - Completer -} - -type ByteReadCompleter interface { +type ByteReadCloser interface { ByteReader - Completer -} - -type PendingReceiver[T any] struct { - Receiver[T] -} - -func (r *PendingReceiver[T]) Receive() (T, error) { - return r.Receiver.Receive() -} - -func (*PendingReceiver[T]) IsComplete() bool { - return false -} - -func NewPendingReceiver[T any](rx Receiver[T]) *PendingReceiver[T] { - return &PendingReceiver[T]{rx} + io.Closer } type CompleteReceiver[T any] struct { @@ -141,16 +111,12 @@ func (r *CompleteReceiver[T]) Receive() (T, error) { return r.v, nil } -func (*CompleteReceiver[T]) IsComplete() bool { - return true -} - func NewCompleteReceiver[T any](v T) *CompleteReceiver[T] { return &CompleteReceiver[T]{v, true} } type DecodeReceiver[T any] struct { - r IndexReader + r IndexReadCloser decode func(IndexReader) (T, error) } @@ -158,18 +124,10 @@ func (r *DecodeReceiver[T]) Receive() (T, error) { return r.decode(r.r) } -func (*DecodeReceiver[T]) IsComplete() bool { - return false -} - func (r *DecodeReceiver[T]) Close() error { - c, ok := r.r.(io.Closer) - if ok { - return c.Close() - } - return nil + return r.r.Close() } -func NewDecodeReceiver[T any](r IndexReader, decode func(IndexReader) (T, error)) *DecodeReceiver[T] { +func NewDecodeReceiver[T any](r IndexReadCloser, decode func(IndexReader) (T, error)) *DecodeReceiver[T] { return &DecodeReceiver[T]{r, decode} } diff --git a/tests/go/async.go b/tests/go/async.go index 8f98debb..ac42b2c1 100644 --- a/tests/go/async.go +++ b/tests/go/async.go @@ -5,6 +5,7 @@ package integration import ( "bytes" "context" + "io" "log/slog" wrpc "wrpc.io/go" @@ -12,13 +13,13 @@ import ( type AsyncHandler struct{} -func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (wrpc.ReadCompleter, wrpc.ReceiveCompleter[[][]string], error) { +func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (io.Reader, wrpc.Receiver[[][]string], error) { slog.DebugContext(ctx, "handling `with-streams`", "complete", complete) buf := bytes.NewBuffer([]byte("test")) str := wrpc.NewCompleteReceiver([][]string{{"foo", "bar"}, {"baz"}}) if complete { - return wrpc.NewCompleteReader(buf), str, nil + return buf, str, nil } else { - return wrpc.NewPendingByteReader(buf), wrpc.NewPendingReceiver(str), nil + return buf, str, nil } } diff --git a/tests/go/async_test.go b/tests/go/async_test.go index 37e1f342..b760afed 100644 --- a/tests/go/async_test.go +++ b/tests/go/async_test.go @@ -10,12 +10,12 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" "wrpc.io/tests/go/bindings/async_client/wrpc_test/integration/async" "wrpc.io/tests/go/bindings/async_server" "wrpc.io/tests/go/internal" - "github.com/nats-io/nats.go" ) func TestAsync(t *testing.T) { @@ -66,11 +66,10 @@ func TestAsync(t *testing.T) { t.Errorf("expected: `test`, got: %s", string(b)) return } - // TODO: Close - //if err := byteRx.Close(); err != nil { - // t.Errorf("failed to close byte reader: %s", err) - // return - //} + if err := byteRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close byte reader: %s", err) + return + } ss, err := stringListRx.Receive() if err != nil { @@ -87,11 +86,10 @@ func TestAsync(t *testing.T) { t.Errorf("ready list should have returned (nil, io.EOF), got: (%#v, %v)", ss, err) return } - // TODO: Close - //if err := stringListRx.Close(); err != nil { - // t.Errorf("failed to close string list receiver: %s", err) - // return - //} + if err := stringListRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close string list receiver: %s", err) + return + } } { @@ -110,11 +108,10 @@ func TestAsync(t *testing.T) { t.Errorf("expected: `test`, got: %s", string(b)) return } - // TODO: Close - //if err := byteRx.Close(); err != nil { - // t.Errorf("failed to close byte reader: %s", err) - // return - //} + if err := byteRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close byte reader: %s", err) + return + } ss, err := stringListRx.Receive() if err != nil { @@ -131,15 +128,17 @@ func TestAsync(t *testing.T) { t.Errorf("ready list should have returned (nil, io.EOF), got: (%#v, %v)", ss, err) return } - // TODO: Close - //if err := stringListRx.Close(); err != nil { - // t.Errorf("failed to close string list receiver: %s", err) - // return - //} + if err := stringListRx.(io.Closer).Close(); err != nil { + t.Errorf("failed to close string list receiver: %s", err) + return + } } if err = stop(); err != nil { t.Errorf("failed to stop serving `async-server` world: %s", err) return } + if nc.NumSubscriptions() != 0 { + t.Errorf("NATS subscriptions leaked: %d active after client stop", nc.NumSubscriptions()) + } } diff --git a/tests/go/cmd/sync-server-nats/main.go b/tests/go/cmd/sync-server-nats/main.go index b37291fc..313e3972 100644 --- a/tests/go/cmd/sync-server-nats/main.go +++ b/tests/go/cmd/sync-server-nats/main.go @@ -8,10 +8,10 @@ import ( "os/signal" "syscall" + "github.com/nats-io/nats.go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" "wrpc.io/tests/go/bindings/sync_server" - "github.com/nats-io/nats.go" ) func run(url string) error { diff --git a/tests/go/resources.go b/tests/go/resources.go index 43d0bf68..5f59caf4 100644 --- a/tests/go/resources.go +++ b/tests/go/resources.go @@ -7,9 +7,9 @@ import ( "fmt" "sync" + "github.com/google/uuid" wrpc "wrpc.io/go" "wrpc.io/tests/go/bindings/resources_server/exports/wrpc_test/integration/resources" - "github.com/google/uuid" ) type Foo struct { diff --git a/tests/go/resources_test.go b/tests/go/resources_test.go index ff4a8753..90567c38 100644 --- a/tests/go/resources_test.go +++ b/tests/go/resources_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" @@ -15,7 +16,6 @@ import ( "wrpc.io/tests/go/bindings/resources_client/wrpc_test/integration/resources" "wrpc.io/tests/go/bindings/resources_server" "wrpc.io/tests/go/internal" - "github.com/nats-io/nats.go" ) func TestResources(t *testing.T) { @@ -138,4 +138,7 @@ func TestResources(t *testing.T) { t.Errorf("failed to stop serving `resources-server` world: %s", err) return } + if nc.NumSubscriptions() != 0 { + t.Errorf("NATS subscriptions leaked: %d active after client stop", nc.NumSubscriptions()) + } } diff --git a/tests/go/sync_test.go b/tests/go/sync_test.go index 0a62b4a5..994ee683 100644 --- a/tests/go/sync_test.go +++ b/tests/go/sync_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" wrpc "wrpc.io/go" wrpcnats "wrpc.io/go/nats" integration "wrpc.io/tests/go" @@ -16,7 +17,6 @@ import ( "wrpc.io/tests/go/bindings/sync_client/wrpc_test/integration/sync" "wrpc.io/tests/go/bindings/sync_server" "wrpc.io/tests/go/internal" - "github.com/nats-io/nats.go" ) func TestSync(t *testing.T) { @@ -228,4 +228,7 @@ func TestSync(t *testing.T) { t.Errorf("failed to stop serving `sync-server` world: %s", err) return } + if nc.NumSubscriptions() != 0 { + t.Errorf("NATS subscriptions leaked: %d active after client stop", nc.NumSubscriptions()) + } } diff --git a/tests/go/types_test.go b/tests/go/types_test.go index 609ea59e..fdfffc15 100644 --- a/tests/go/types_test.go +++ b/tests/go/types_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - wrpc "wrpc.io/go" "wrpc.io/tests/go/bindings/types/wrpc_test/integration/get_types" ) @@ -16,7 +15,7 @@ type indexReader struct { *bytes.Buffer } -func (r *indexReader) Index(path ...uint32) (wrpc.IndexReader, error) { +func (r *indexReader) Index(path ...uint32) (wrpc.IndexReadCloser, error) { panic("not implemented") }