Skip to content

Commit

Permalink
feat(go-bindgen): rework async functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
rvolosatovs committed Aug 23, 2024
1 parent f4f273d commit e69425f
Show file tree
Hide file tree
Showing 20 changed files with 1,096 additions and 223 deletions.
238 changes: 169 additions & 69 deletions crates/wit-bindgen-go/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2792,87 +2792,115 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {
let wrpc = self.deps.wrpc();

self.print_docs_and_params(func);
if let FunctionKind::Constructor(id) = &func.kind {
self.push_str(" (r0__ ");
self.print_own(*id);

self.src.push_str(" (");
for (i, ty) in func.results.iter_types().enumerate() {
uwrite!(self.src, "r{i}__ ");
self.print_opt_ty(ty, true);
self.src.push_str(", ");
} else {
self.src.push_str(" (");
for (i, ty) in func.results.iter_types().enumerate() {
uwrite!(self.src, "r{i}__ ");
self.print_opt_ty(ty, true);
self.src.push_str(", ");
}
}
self.push_str("close__ func() error, err__ error) ");
uwrite!(
self.src,
r#"{{
if err__ = wrpc__.Invoke(ctx__, "{instance}", ""#
);
self.src.push_str(rpc_func_name(func));
uwriteln!(
self.src,
r#"", func(w__ {wrpc}.IndexWriteCloser, r__ {wrpc}.IndexReadCloser) error {{
close__ = r__.Close"#
);

let async_params = func.params.iter().any(|(_, ty)| {
let (paths, fut) = async_paths_ty(self.resolve, ty);
fut || !paths.is_empty()
});
if async_params {
self.push_str("writeErrs__ <-chan error, ");
}
self.push_str("err__ error) {");
if !func.params.is_empty() {
let bytes = self.deps.bytes();
uwriteln!(
uwrite!(
self.src,
r"var buf__ {bytes}.Buffer
writes__ := make(map[uint32]func({wrpc}.IndexWriter) error, {})",
func.params.len(),
r"
var buf__ {bytes}.Buffer",
);
if async_params {
uwrite!(
self.src,
r"
var writeCount__ uint32"
);
}
for (i, (name, ty)) in func.params.iter().enumerate() {
uwrite!(self.src, "write{i}__, err__ :=");
uwrite!(
self.src,
r"
write{i}__, err__ :="
);
self.print_write_ty(ty, &to_go_ident(name), "&buf__");
self.src.push_str("\nif err__ != nil {\n");
uwriteln!(
uwrite!(
self.src,
r#"return {fmt}.Errorf("failed to write `{name}` parameter: %w", err__)"#,
r#"
if err__ != nil {{
err__ = {fmt}.Errorf("failed to write `{name}` parameter: %w", err__)
return
}}"#,
);
self.src.push_str("}\n");
uwriteln!(
if async_params {
uwrite!(
self.src,
r"
if write{i}__ != nil {{
writeCount__++
}}"
);
}
}
if async_params {
uwrite!(
self.src,
r#"if write{i}__ != nil {{
writes__[{i}] = write{i}__
}}"#,
r"
writes__ := make(map[uint32]func({wrpc}.IndexWriter) error, uint(writeCount__))",
);
}
for (i, (name, _)) in func.params.iter().enumerate() {
uwrite!(
self.src,
r"
if write{i}__ != nil {{"
);
if async_params {
uwrite!(
self.src,
r"
writes__[{i}] = write{i}__",
);
} else {
uwrite!(
self.src,
r#"
err__ = {errors}.New("unexpected deferred write for synchronous `{name}` parameter")
return"#,
errors = self.deps.errors(),
);
}
uwrite!(
self.src,
r#"
}}"#,
);
}
self.push_str("_, err__ = w__.Write(buf__.Bytes())\n");
self.push_str("if err__ != nil {\n");
uwriteln!(
self.src,
r#"return {fmt}.Errorf("failed to write parameters: %w", err__)"#,
);
self.src.push_str("}\n");
} else {
self.push_str("_, err__ = w__.Write(nil)\n");
self.push_str("if err__ != nil {\n");
uwriteln!(
self.src,
r#"return {fmt}.Errorf("failed to write empty parameters: %w", err__)"#,
);
self.src.push_str("}\n");
}
for (i, ty) in func.results.iter_types().enumerate() {
uwrite!(self.src, "r{i}__, err__ = ");
self.print_read_ty(ty, "r__", &format!("[]uint32{{ {i} }}"));
self.push_str("\n");
uwriteln!(
self.src,
r#"if err__ != nil {{ return {fmt}.Errorf("failed to read result {i}: %w", err__) }}"#,
);
uwrite!(
self.src,
r#"
var w__ {wrpc}.IndexWriteCloser
var r__ {wrpc}.IndexReadCloser
w__, r__, err__ = wrpc__.Invoke(ctx__, "{instance}", ""#
);
self.src.push_str(rpc_func_name(func));
self.src.push_str("\", ");
if !func.params.is_empty() {
self.src.push_str("buf__.Bytes()");
} else {
self.src.push_str("nil");
}
self.src.push_str("return nil\n");
self.src.push_str("},");
self.src.push_str(",\n");
for (i, ty) in func.results.iter_types().enumerate() {
let (nested, fut) = async_paths_ty(self.resolve, ty);
for path in nested {
self.push_str(wrpc);
self.push_str(".NewSubscribePath().Index(");
uwrite!(self.src, "{i})");
uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i})");
for p in path {
if let Some(p) = p {
uwrite!(self.src, ".Index({p})");
Expand All @@ -2886,15 +2914,87 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {
uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i}), ");
}
}
self.src.push_str("); err__ != nil {\n");
uwriteln!(
let slog = self.deps.slog();
uwrite!(
self.src,
r#"err__ = {fmt}.Errorf("failed to invoke `{}`: %w", err__)
return
}}
r#"
)
if err__ != nil {{
err__ = {fmt}.Errorf("failed to invoke `{name}`: %w", err__)
return
}}
defer func() {{
if err := r__.Close(); err != nil {{
{slog}.ErrorContext(ctx__, "failed to close reader", "instance", "{instance}", "name", "{name}", "err", err)
}}
}}()"#,
name = func.name,
);
if async_params {
let sync = self.deps.sync();
uwrite!(
self.src,
r#"
if writeCount__ > 0 {{
writeErrCh__ := make(chan error, uint(writeCount__))
writeErrs__ = writeErrCh__
var wg__ {sync}.WaitGroup
for index, write := range writes__ {{
wg__.Add(1)
w, err := w__.Index(index)
if err != nil {{
if cErr := w__.Close(); cErr != nil {{
{slog}.DebugContext(ctx__, "failed to close outgoing stream", "instance", "{instance}", "name", "{}", "err", cErr)
}}
err__ = {fmt}.Errorf("failed to index writer at index `%v`: %w", index, err)
return
}}
write := write
go func() {{
defer wg__.Done()
if err := write(w); err != nil {{
writeErrCh__ <- err
}}
}}()
}}
go func() {{
wg__.Wait()
close(writeErrCh__)
}}()
}}"#,
func.name,
);
}
uwrite!(
self.src,
r#"
if cErr__ := w__.Close(); cErr__ != nil {{
{slog}.DebugContext(ctx__, "failed to close outgoing stream", "instance", "{instance}", "name", "{}", "err", cErr__)
}}"#,
func.name
func.name,
);

for (i, ty) in func.results.iter_types().enumerate() {
uwrite!(
self.src,
"
r{i}__, err__ = "
);
self.print_read_ty(ty, "r__", &format!("[]uint32{{ {i} }}"));
uwrite!(
self.src,
r#"
if err__ != nil {{
err__ = {fmt}.Errorf("failed to read result {i}: %w", err__)
return
}}"#,
);
}
uwriteln!(
self.src,
r#"
return
}}"#,
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,60 @@ import (
utf8 "unicode/utf8"
)

func Hello(ctx__ context.Context, wrpc__ wrpc.Invoker) (r0__ string, close__ func() error, err__ error) {
if err__ = wrpc__.Invoke(ctx__, "wrpc-examples:hello/handler", "hello", func(w__ wrpc.IndexWriteCloser, r__ wrpc.IndexReadCloser) error {
close__ = r__.Close
_, err__ = w__.Write(nil)
if err__ != nil {
return fmt.Errorf("failed to write empty parameters: %w", err__)
func Hello(ctx__ context.Context, wrpc__ wrpc.Invoker) (r0__ string, err__ error) {
var w__ wrpc.IndexWriteCloser
var r__ wrpc.IndexReadCloser
w__, r__, err__ = wrpc__.Invoke(ctx__, "wrpc-examples:hello/handler", "hello", nil)
if err__ != nil {
err__ = fmt.Errorf("failed to invoke `hello`: %w", err__)
return
}
defer func() {
if err := r__.Close(); err != nil {
slog.ErrorContext(ctx__, "failed to close reader", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", err)
}
r0__, err__ = func(r interface {
io.ByteReader
io.Reader
}) (string, error) {
var x uint32
var s uint8
for i := 0; i < 5; i++ {
slog.Debug("reading string length byte", "i", i)
b, err := r.ReadByte()
if err != nil {
if i > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return "", fmt.Errorf("failed to read string length byte: %w", err)
}()
if cErr__ := w__.Close(); cErr__ != nil {
slog.DebugContext(ctx__, "failed to close outgoing stream", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", cErr__)
}
r0__, err__ = func(r interface {
io.ByteReader
io.Reader
}) (string, error) {
var x uint32
var s uint8
for i := 0; i < 5; i++ {
slog.Debug("reading string length byte", "i", i)
b, err := r.ReadByte()
if err != nil {
if i > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
if s == 28 && b > 0x0f {
return "", errors.New("string length overflows a 32-bit integer")
return "", fmt.Errorf("failed to read string length byte: %w", err)
}
if s == 28 && b > 0x0f {
return "", errors.New("string length overflows a 32-bit integer")
}
if b < 0x80 {
x = x | uint32(b)<<s
buf := make([]byte, x)
slog.Debug("reading string bytes", "len", x)
_, err = r.Read(buf)
if err != nil {
return "", fmt.Errorf("failed to read string bytes: %w", err)
}
if b < 0x80 {
x = x | uint32(b)<<s
buf := make([]byte, x)
slog.Debug("reading string bytes", "len", x)
_, err = r.Read(buf)
if err != nil {
return "", fmt.Errorf("failed to read string bytes: %w", err)
}
if !utf8.Valid(buf) {
return string(buf), errors.New("string is not valid UTF-8")
}
return string(buf), nil
if !utf8.Valid(buf) {
return string(buf), errors.New("string is not valid UTF-8")
}
x |= uint32(b&0x7f) << s
s += 7
return string(buf), nil
}
return "", errors.New("string length overflows a 32-bit integer")
}(r__)
if err__ != nil {
return fmt.Errorf("failed to read result 0: %w", err__)
x |= uint32(b&0x7f) << s
s += 7
}
return nil
}); err__ != nil {
err__ = fmt.Errorf("failed to invoke `hello`: %w", err__)
return "", errors.New("string length overflows a 32-bit integer")
}(r__)
if err__ != nil {
err__ = fmt.Errorf("failed to read result 0: %w", err__)
return
}
return
Expand Down
5 changes: 1 addition & 4 deletions examples/go/hello-client/cmd/hello-client-nats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ func run() (err error) {

for _, prefix := range os.Args[1:] {
wrpc := wrpcnats.NewClient(nc, prefix)
greeting, cleanup, err := handler.Hello(context.Background(), wrpc)
greeting, err := handler.Hello(context.Background(), wrpc)
if err != nil {
return fmt.Errorf("failed to call `wrpc-examples:hello/handler.hello`: %w", err)
}
fmt.Printf("%s: %s\n", prefix, greeting)
if err := cleanup(); err != nil {
return fmt.Errorf("failed to shutdown `wrpc-examples:hello/handler.hello` invocation: %w", err)
}
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions examples/go/streams-client/bindings/client.wrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT!
// client package contains wRPC bindings for `client` world
package client
Loading

0 comments on commit e69425f

Please sign in to comment.