From 8e9df9aa3d9da73bd65b414f96d53a9bd9b12711 Mon Sep 17 00:00:00 2001 From: Marina Sakai <118230951+Marina-Sakai@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:20:42 +0800 Subject: [PATCH] feat(generic): support thrift streaming for json generic client (#1467) --- client/genericclient/client.go | 5 +- client/genericclient/stream.go | 6 +- go.mod | 4 +- go.sum | 10 +- internal/generic/generic_service.go | 199 +++ internal/mocks/thrift/gen.sh | 4 + internal/mocks/thrift/k-stream.go | 1487 +++++++++++++++++ internal/mocks/thrift/k-test.go | 2 +- internal/mocks/thrift/stream.go | 864 ++++++++++ internal/mocks/thrift/stream.thrift | 19 + internal/mocks/thrift/test.go | 2 +- pkg/generic/descriptor/descriptor.go | 13 +- pkg/generic/generic_service.go | 182 +- pkg/generic/grpcjson_test/generic_init.go | 424 +++++ pkg/generic/grpcjson_test/generic_test.go | 271 +++ pkg/generic/grpcjson_test/idl/api.thrift | 19 + ...{http_go116plus_amd64.go => http_amd64.go} | 4 +- pkg/generic/thrift/http_fallback.go | 4 +- pkg/generic/thrift/json.go | 65 +- ...{json_go116plus_amd64.go => json_amd64.go} | 31 +- pkg/generic/thrift/json_fallback.go | 4 +- pkg/generic/thrift/parse.go | 161 +- pkg/remote/codec/grpc/grpc.go | 32 +- pkg/remote/codec/thrift/thrift.go | 15 +- 24 files changed, 3532 insertions(+), 295 deletions(-) create mode 100644 internal/generic/generic_service.go create mode 100644 internal/mocks/thrift/k-stream.go create mode 100644 internal/mocks/thrift/stream.go create mode 100644 internal/mocks/thrift/stream.thrift create mode 100644 pkg/generic/grpcjson_test/generic_init.go create mode 100644 pkg/generic/grpcjson_test/generic_test.go create mode 100644 pkg/generic/grpcjson_test/idl/api.thrift rename pkg/generic/thrift/{http_go116plus_amd64.go => http_amd64.go} (98%) rename pkg/generic/thrift/{json_go116plus_amd64.go => json_amd64.go} (79%) diff --git a/client/genericclient/client.go b/client/genericclient/client.go index f633f78a64..4c74c92dfa 100644 --- a/client/genericclient/client.go +++ b/client/genericclient/client.go @@ -94,7 +94,8 @@ type genericServiceClient struct { func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error) { ctx = client.NewCtxWithCallOptions(ctx, callOptions) - _args := gc.svcInfo.MethodInfo(method).NewArgs().(*generic.Args) + mtInfo := gc.svcInfo.MethodInfo(method) + _args := mtInfo.NewArgs().(*generic.Args) _args.Method = method _args.Request = request @@ -106,7 +107,7 @@ func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, return nil, gc.kClient.Call(ctx, mt.Name, _args, nil) } - _result := gc.svcInfo.MethodInfo(method).NewResult().(*generic.Result) + _result := mtInfo.NewResult().(*generic.Result) if err = gc.kClient.Call(ctx, mt.Name, _args, _result); err != nil { return } diff --git a/client/genericclient/stream.go b/client/genericclient/stream.go index 6cdd54730d..e7dc05b796 100644 --- a/client/genericclient/stream.go +++ b/client/genericclient/stream.go @@ -32,6 +32,7 @@ import ( ) // NOTE: this is a temporary adjustment for ci check. remove it after fully completing the generic streaming support + var ( _ clientStreaming = nil _ serverStreaming = nil @@ -162,8 +163,9 @@ func newServerStreaming(ctx context.Context, genericCli Client, method string, r if err != nil { return nil, err } - ss := &serverStreamingClient{stream, gCli.svcInfo.MethodInfo(method)} - _args := gCli.svcInfo.MethodInfo(method).NewArgs().(*generic.Args) + mtInfo := gCli.svcInfo.MethodInfo(method) + ss := &serverStreamingClient{stream, mtInfo} + _args := mtInfo.NewArgs().(*generic.Args) _args.Method = method _args.Request = req if err = ss.Stream.SendMsg(_args); err != nil { diff --git a/go.mod b/go.mod index 2a65e10e97..4fb45a365c 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/bytedance/gopkg v0.1.1 github.com/bytedance/sonic v1.12.2 github.com/cloudwego/configmanager v0.2.2 - github.com/cloudwego/dynamicgo v0.4.4 + github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128 github.com/cloudwego/fastpb v0.0.5 github.com/cloudwego/frugal v0.2.0 github.com/cloudwego/gopkg v0.1.2 @@ -17,7 +17,6 @@ require ( github.com/golang/mock v1.6.0 github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 github.com/jhump/protoreflect v1.8.2 - github.com/json-iterator/go v1.1.12 github.com/tidwall/gjson v1.17.3 golang.org/x/net v0.24.0 golang.org/x/sync v0.8.0 @@ -38,7 +37,6 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/iancoleman/strcase v0.2.0 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect - github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 42e52a6bce..1ed51eee1e 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/ github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/configmanager v0.2.2 h1:sVrJB8gWYTlPV2OS3wcgJSO9F2/9Zbkmcm1Z7jempOU= github.com/cloudwego/configmanager v0.2.2/go.mod h1:ppiyU+5TPLonE8qMVi/pFQk2eL3Q4P7d4hbiNJn6jwI= -github.com/cloudwego/dynamicgo v0.4.4 h1:RuHhjy44Ajy2PLjrwOhI9EY874t9srhgwd/rkKTUKfQ= -github.com/cloudwego/dynamicgo v0.4.4/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY= +github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128 h1:StnQNfU+fb3PavTNydupCP/8Ges3/DDYRjL8HmLwOMI= +github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY= github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZU= github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk= github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8= @@ -71,7 +71,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= @@ -81,8 +80,6 @@ github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHL github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/jhump/protoreflect v1.8.2 h1:k2xE7wcUomeqwY0LDCYA16y4WWfyTcMx5mKhk0d4ua0= github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= @@ -93,8 +90,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 h1:uiS4zKYKJVj5F3ID+5iylfKPsEQmBEOucSD9Vgmn0i0= github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5/go.mod h1:I8AX+yW//L8Hshx6+a1m3bYkwXkpsVjA2795vP4f4oQ= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= @@ -107,7 +102,6 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/internal/generic/generic_service.go b/internal/generic/generic_service.go new file mode 100644 index 0000000000..aee1ac0d4c --- /dev/null +++ b/internal/generic/generic_service.go @@ -0,0 +1,199 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package generic + +import ( + "context" + "fmt" + + "github.com/cloudwego/gopkg/bufiox" + "github.com/cloudwego/gopkg/protocol/thrift/base" + + "github.com/cloudwego/kitex/pkg/generic/proto" + "github.com/cloudwego/kitex/pkg/generic/thrift" + codecProto "github.com/cloudwego/kitex/pkg/remote/codec/protobuf" +) + +// Args generic request +type Args struct { + Request interface{} + Method string + base *base.Base + inner interface{} +} + +var ( + _ codecProto.MessageWriterWithContext = (*Args)(nil) + _ codecProto.MessageReaderWithMethodWithContext = (*Args)(nil) +) + +func (g *Args) SetCodec(inner interface{}) { + g.inner = inner +} + +func (g *Args) GetOrSetBase() interface{} { + if g.base == nil { + g.base = base.NewBase() + } + return g.base +} + +// Write ... +func (g *Args) Write(ctx context.Context, method string, out bufiox.Writer) error { + if err, ok := g.inner.(error); ok { + return err + } + if w, ok := g.inner.(thrift.MessageWriter); ok { + return w.Write(ctx, out, g.Request, method, true, g.base) + } + return fmt.Errorf("unexpected Args writer type: %T", g.inner) +} + +func (g *Args) WritePb(ctx context.Context, method string) (interface{}, error) { + if err, ok := g.inner.(error); ok { + return nil, err + } + if w, ok := g.inner.(proto.MessageWriter); ok { + return w.Write(ctx, g.Request, method, true) + } + return nil, fmt.Errorf("unexpected Args writer type: %T", g.inner) +} + +// Read ... +func (g *Args) Read(ctx context.Context, method string, dataLen int, in bufiox.Reader) error { + if err, ok := g.inner.(error); ok { + return err + } + if rw, ok := g.inner.(thrift.MessageReader); ok { + g.Method = method + var err error + g.Request, err = rw.Read(ctx, method, false, dataLen, in) + return err + } + return fmt.Errorf("unexpected Args reader type: %T", g.inner) +} + +func (g *Args) ReadPb(ctx context.Context, method string, in []byte) error { + if err, ok := g.inner.(error); ok { + return err + } + if w, ok := g.inner.(proto.MessageReader); ok { + g.Method = method + var err error + g.Request, err = w.Read(ctx, method, false, in) + return err + } + return fmt.Errorf("unexpected Args reader type: %T", g.inner) +} + +// GetFirstArgument implements util.KitexArgs. +func (g *Args) GetFirstArgument() interface{} { + return g.Request +} + +// Result generic response +type Result struct { + Success interface{} + inner interface{} +} + +var ( + _ codecProto.MessageWriterWithContext = (*Result)(nil) + _ codecProto.MessageReaderWithMethodWithContext = (*Result)(nil) +) + +// SetCodec ... +func (r *Result) SetCodec(inner interface{}) { + r.inner = inner +} + +// Write ... +func (r *Result) Write(ctx context.Context, method string, out bufiox.Writer) error { + if err, ok := r.inner.(error); ok { + return err + } + if w, ok := r.inner.(thrift.MessageWriter); ok { + return w.Write(ctx, out, r.Success, method, false, nil) + } + return fmt.Errorf("unexpected Result writer type: %T", r.inner) +} + +func (r *Result) WritePb(ctx context.Context, method string) (interface{}, error) { + if err, ok := r.inner.(error); ok { + return nil, err + } + if w, ok := r.inner.(proto.MessageWriter); ok { + return w.Write(ctx, r.Success, method, false) + } + return nil, fmt.Errorf("unexpected Result writer type: %T", r.inner) +} + +// Read ... +func (r *Result) Read(ctx context.Context, method string, dataLen int, in bufiox.Reader) error { + if err, ok := r.inner.(error); ok { + return err + } + if w, ok := r.inner.(thrift.MessageReader); ok { + var err error + r.Success, err = w.Read(ctx, method, true, dataLen, in) + return err + } + return fmt.Errorf("unexpected Result reader type: %T", r.inner) +} + +func (r *Result) ReadPb(ctx context.Context, method string, in []byte) error { + if err, ok := r.inner.(error); ok { + return err + } + if w, ok := r.inner.(proto.MessageReader); ok { + var err error + r.Success, err = w.Read(ctx, method, true, in) + return err + } + return fmt.Errorf("unexpected Result reader type: %T", r.inner) +} + +// GetSuccess implements util.KitexResult. +func (r *Result) GetSuccess() interface{} { + if !r.IsSetSuccess() { + return nil + } + return r.Success +} + +// SetSuccess implements util.KitexResult. +func (r *Result) SetSuccess(x interface{}) { + r.Success = x +} + +// IsSetSuccess ... +func (r *Result) IsSetSuccess() bool { + return r.Success != nil +} + +// GetResult ... +func (r *Result) GetResult() interface{} { + return r.Success +} + +type ThriftWriter interface { + Write(ctx context.Context, method string, w bufiox.Writer) error +} + +type ThriftReader interface { + Read(ctx context.Context, method string, dataLen int, r bufiox.Reader) error +} diff --git a/internal/mocks/thrift/gen.sh b/internal/mocks/thrift/gen.sh index dd4a4915a0..7e3385f1c2 100755 --- a/internal/mocks/thrift/gen.sh +++ b/internal/mocks/thrift/gen.sh @@ -2,4 +2,8 @@ kitex -thrift no_default_serdes -module github.com/cloudwego/kitex -gen-path .. ./test.thrift +kitex -thrift no_default_serdes -module github.com/cloudwego/kitex -gen-path .. ./stream.thrift + rm -rf ./mock # not in use, rm it + +rm -rf ./testservice # not in use, rm it \ No newline at end of file diff --git a/internal/mocks/thrift/k-stream.go b/internal/mocks/thrift/k-stream.go new file mode 100644 index 0000000000..9c3fd71903 --- /dev/null +++ b/internal/mocks/thrift/k-stream.go @@ -0,0 +1,1487 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Code generated by Kitex v0.11.3. DO NOT EDIT. + +package thrift + +import ( + "bytes" + "fmt" + "reflect" + "strings" + + "github.com/cloudwego/gopkg/protocol/thrift" +) + +// unused protection +var ( + _ = fmt.Formatter(nil) + _ = (*bytes.Buffer)(nil) + _ = (*strings.Builder)(nil) + _ = reflect.Type(nil) + _ = thrift.STOP +) + +func (p *Request) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + var issetMessage bool = false + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + issetMessage = true + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + if !issetMessage { + fieldId = 1 + goto RequiredFieldNotSetError + } + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_Request[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +RequiredFieldNotSetError: + return offset, thrift.NewProtocolException(thrift.INVALID_DATA, fmt.Sprintf("required field %s is not set", fieldIDToName_Request[fieldId])) +} + +func (p *Request) FastReadField1(buf []byte) (int, error) { + offset := 0 + + var _field string + if v, l, err := thrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + _field = v + } + p.Message = _field + return offset, nil +} + +// for compatibility +func (p *Request) FastWrite(buf []byte) int { + return 0 +} + +func (p *Request) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *Request) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *Request) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRING, 1) + offset += thrift.Binary.WriteStringNocopy(buf[offset:], w, p.Message) + return offset +} + +func (p *Request) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += thrift.Binary.StringLengthNocopy(p.Message) + return l +} + +func (p *Response) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + var issetMessage bool = false + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + issetMessage = true + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + if !issetMessage { + fieldId = 1 + goto RequiredFieldNotSetError + } + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_Response[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +RequiredFieldNotSetError: + return offset, thrift.NewProtocolException(thrift.INVALID_DATA, fmt.Sprintf("required field %s is not set", fieldIDToName_Response[fieldId])) +} + +func (p *Response) FastReadField1(buf []byte) (int, error) { + offset := 0 + + var _field string + if v, l, err := thrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + _field = v + } + p.Message = _field + return offset, nil +} + +// for compatibility +func (p *Response) FastWrite(buf []byte) int { + return 0 +} + +func (p *Response) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *Response) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *Response) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRING, 1) + offset += thrift.Binary.WriteStringNocopy(buf[offset:], w, p.Message) + return offset +} + +func (p *Response) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += thrift.Binary.StringLengthNocopy(p.Message) + return l +} + +func (p *TestServiceEchoArgs) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoArgs[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoArgs) FastReadField1(buf []byte) (int, error) { + offset := 0 + _field := NewRequest() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Req = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoArgs) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoArgs) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoArgs) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 1) + offset += p.Req.FastWriteNocopy(buf[offset:], w) + return offset +} + +func (p *TestServiceEchoArgs) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += p.Req.BLength() + return l +} + +func (p *TestServiceEchoResult) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField0(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoResult[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoResult) FastReadField0(buf []byte) (int, error) { + offset := 0 + _field := NewResponse() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Success = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoResult) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField0(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoResult) BLength() int { + l := 0 + if p != nil { + l += p.field0Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoResult) fastWriteField0(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSuccess() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 0) + offset += p.Success.FastWriteNocopy(buf[offset:], w) + } + return offset +} + +func (p *TestServiceEchoResult) field0Length() int { + l := 0 + if p.IsSetSuccess() { + l += thrift.Binary.FieldBeginLength() + l += p.Success.BLength() + } + return l +} + +func (p *TestServiceEchoClientArgs) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoClientArgs[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoClientArgs) FastReadField1(buf []byte) (int, error) { + offset := 0 + _field := NewRequest() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Req = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoClientArgs) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoClientArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoClientArgs) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoClientArgs) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 1) + offset += p.Req.FastWriteNocopy(buf[offset:], w) + return offset +} + +func (p *TestServiceEchoClientArgs) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += p.Req.BLength() + return l +} + +func (p *TestServiceEchoClientResult) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField0(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoClientResult[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoClientResult) FastReadField0(buf []byte) (int, error) { + offset := 0 + _field := NewResponse() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Success = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoClientResult) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoClientResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField0(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoClientResult) BLength() int { + l := 0 + if p != nil { + l += p.field0Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoClientResult) fastWriteField0(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSuccess() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 0) + offset += p.Success.FastWriteNocopy(buf[offset:], w) + } + return offset +} + +func (p *TestServiceEchoClientResult) field0Length() int { + l := 0 + if p.IsSetSuccess() { + l += thrift.Binary.FieldBeginLength() + l += p.Success.BLength() + } + return l +} + +func (p *TestServiceEchoServerArgs) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoServerArgs[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoServerArgs) FastReadField1(buf []byte) (int, error) { + offset := 0 + _field := NewRequest() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Req = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoServerArgs) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoServerArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoServerArgs) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoServerArgs) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 1) + offset += p.Req.FastWriteNocopy(buf[offset:], w) + return offset +} + +func (p *TestServiceEchoServerArgs) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += p.Req.BLength() + return l +} + +func (p *TestServiceEchoServerResult) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField0(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoServerResult[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoServerResult) FastReadField0(buf []byte) (int, error) { + offset := 0 + _field := NewResponse() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Success = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoServerResult) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoServerResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField0(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoServerResult) BLength() int { + l := 0 + if p != nil { + l += p.field0Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoServerResult) fastWriteField0(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSuccess() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 0) + offset += p.Success.FastWriteNocopy(buf[offset:], w) + } + return offset +} + +func (p *TestServiceEchoServerResult) field0Length() int { + l := 0 + if p.IsSetSuccess() { + l += thrift.Binary.FieldBeginLength() + l += p.Success.BLength() + } + return l +} + +func (p *TestServiceEchoUnaryArgs) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoUnaryArgs[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoUnaryArgs) FastReadField1(buf []byte) (int, error) { + offset := 0 + _field := NewRequest() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Req = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoUnaryArgs) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoUnaryArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoUnaryArgs) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoUnaryArgs) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 1) + offset += p.Req.FastWriteNocopy(buf[offset:], w) + return offset +} + +func (p *TestServiceEchoUnaryArgs) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += p.Req.BLength() + return l +} + +func (p *TestServiceEchoUnaryResult) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField0(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoUnaryResult[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoUnaryResult) FastReadField0(buf []byte) (int, error) { + offset := 0 + _field := NewResponse() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Success = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoUnaryResult) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoUnaryResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField0(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoUnaryResult) BLength() int { + l := 0 + if p != nil { + l += p.field0Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoUnaryResult) fastWriteField0(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSuccess() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 0) + offset += p.Success.FastWriteNocopy(buf[offset:], w) + } + return offset +} + +func (p *TestServiceEchoUnaryResult) field0Length() int { + l := 0 + if p.IsSetSuccess() { + l += thrift.Binary.FieldBeginLength() + l += p.Success.BLength() + } + return l +} + +func (p *TestServiceEchoBizExceptionArgs) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoBizExceptionArgs[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoBizExceptionArgs) FastReadField1(buf []byte) (int, error) { + offset := 0 + _field := NewRequest() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Req = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoBizExceptionArgs) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoBizExceptionArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoBizExceptionArgs) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoBizExceptionArgs) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 1) + offset += p.Req.FastWriteNocopy(buf[offset:], w) + return offset +} + +func (p *TestServiceEchoBizExceptionArgs) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += p.Req.BLength() + return l +} + +func (p *TestServiceEchoBizExceptionResult) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField0(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoBizExceptionResult[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoBizExceptionResult) FastReadField0(buf []byte) (int, error) { + offset := 0 + _field := NewResponse() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Success = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoBizExceptionResult) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoBizExceptionResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField0(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoBizExceptionResult) BLength() int { + l := 0 + if p != nil { + l += p.field0Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoBizExceptionResult) fastWriteField0(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSuccess() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 0) + offset += p.Success.FastWriteNocopy(buf[offset:], w) + } + return offset +} + +func (p *TestServiceEchoBizExceptionResult) field0Length() int { + l := 0 + if p.IsSetSuccess() { + l += thrift.Binary.FieldBeginLength() + l += p.Success.BLength() + } + return l +} + +func (p *TestServiceEchoPingPongArgs) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField1(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoPingPongArgs[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoPingPongArgs) FastReadField1(buf []byte) (int, error) { + offset := 0 + _field := NewRequest() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Req = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoPingPongArgs) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoPingPongArgs) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField1(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoPingPongArgs) BLength() int { + l := 0 + if p != nil { + l += p.field1Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoPingPongArgs) fastWriteField1(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 1) + offset += p.Req.FastWriteNocopy(buf[offset:], w) + return offset +} + +func (p *TestServiceEchoPingPongArgs) field1Length() int { + l := 0 + l += thrift.Binary.FieldBeginLength() + l += p.Req.BLength() + return l +} + +func (p *TestServiceEchoPingPongResult) FastRead(buf []byte) (int, error) { + var err error + var offset int + var l int + var fieldTypeId thrift.TType + var fieldId int16 + for { + fieldTypeId, fieldId, l, err = thrift.Binary.ReadFieldBegin(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldBeginError + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + l, err = p.FastReadField0(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + default: + l, err = thrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + } + + return offset, nil +ReadFieldBeginError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err) +ReadFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_TestServiceEchoPingPongResult[fieldId]), err) +SkipFieldError: + return offset, thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err) +} + +func (p *TestServiceEchoPingPongResult) FastReadField0(buf []byte) (int, error) { + offset := 0 + _field := NewResponse() + if l, err := _field.FastRead(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + } + p.Success = _field + return offset, nil +} + +// for compatibility +func (p *TestServiceEchoPingPongResult) FastWrite(buf []byte) int { + return 0 +} + +func (p *TestServiceEchoPingPongResult) FastWriteNocopy(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p != nil { + offset += p.fastWriteField0(buf[offset:], w) + } + offset += thrift.Binary.WriteFieldStop(buf[offset:]) + return offset +} + +func (p *TestServiceEchoPingPongResult) BLength() int { + l := 0 + if p != nil { + l += p.field0Length() + } + l += thrift.Binary.FieldStopLength() + return l +} + +func (p *TestServiceEchoPingPongResult) fastWriteField0(buf []byte, w thrift.NocopyWriter) int { + offset := 0 + if p.IsSetSuccess() { + offset += thrift.Binary.WriteFieldBegin(buf[offset:], thrift.STRUCT, 0) + offset += p.Success.FastWriteNocopy(buf[offset:], w) + } + return offset +} + +func (p *TestServiceEchoPingPongResult) field0Length() int { + l := 0 + if p.IsSetSuccess() { + l += thrift.Binary.FieldBeginLength() + l += p.Success.BLength() + } + return l +} + +func (p *TestServiceEchoArgs) GetFirstArgument() interface{} { + return p.Req +} + +func (p *TestServiceEchoResult) GetResult() interface{} { + return p.Success +} + +func (p *TestServiceEchoClientArgs) GetFirstArgument() interface{} { + return p.Req +} + +func (p *TestServiceEchoClientResult) GetResult() interface{} { + return p.Success +} + +func (p *TestServiceEchoServerArgs) GetFirstArgument() interface{} { + return p.Req +} + +func (p *TestServiceEchoServerResult) GetResult() interface{} { + return p.Success +} + +func (p *TestServiceEchoUnaryArgs) GetFirstArgument() interface{} { + return p.Req +} + +func (p *TestServiceEchoUnaryResult) GetResult() interface{} { + return p.Success +} + +func (p *TestServiceEchoBizExceptionArgs) GetFirstArgument() interface{} { + return p.Req +} + +func (p *TestServiceEchoBizExceptionResult) GetResult() interface{} { + return p.Success +} + +func (p *TestServiceEchoPingPongArgs) GetFirstArgument() interface{} { + return p.Req +} + +func (p *TestServiceEchoPingPongResult) GetResult() interface{} { + return p.Success +} diff --git a/internal/mocks/thrift/k-test.go b/internal/mocks/thrift/k-test.go index b1b37ae2b9..6ae99c7b17 100644 --- a/internal/mocks/thrift/k-test.go +++ b/internal/mocks/thrift/k-test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -// Code generated by Kitex v0.11.0. DO NOT EDIT. +// Code generated by Kitex v0.11.3. DO NOT EDIT. package thrift diff --git a/internal/mocks/thrift/stream.go b/internal/mocks/thrift/stream.go new file mode 100644 index 0000000000..be64aa4159 --- /dev/null +++ b/internal/mocks/thrift/stream.go @@ -0,0 +1,864 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Code generated by thriftgo (0.3.15). DO NOT EDIT. + +package thrift + +import ( + "context" + "fmt" + "github.com/cloudwego/kitex/pkg/streaming" + "strings" +) + +type Request struct { + Message string `thrift:"message,1,required" frugal:"1,required,string" json:"message"` +} + +func NewRequest() *Request { + return &Request{} +} + +func (p *Request) InitDefault() { +} + +func (p *Request) GetMessage() (v string) { + return p.Message +} +func (p *Request) SetMessage(val string) { + p.Message = val +} + +func (p *Request) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("Request(%+v)", *p) +} + +func (p *Request) DeepEqual(ano *Request) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Message) { + return false + } + return true +} + +func (p *Request) Field1DeepEqual(src string) bool { + + if strings.Compare(p.Message, src) != 0 { + return false + } + return true +} + +var fieldIDToName_Request = map[int16]string{ + 1: "message", +} + +type Response struct { + Message string `thrift:"message,1,required" frugal:"1,required,string" json:"message"` +} + +func NewResponse() *Response { + return &Response{} +} + +func (p *Response) InitDefault() { +} + +func (p *Response) GetMessage() (v string) { + return p.Message +} +func (p *Response) SetMessage(val string) { + p.Message = val +} + +func (p *Response) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("Response(%+v)", *p) +} + +func (p *Response) DeepEqual(ano *Response) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Message) { + return false + } + return true +} + +func (p *Response) Field1DeepEqual(src string) bool { + + if strings.Compare(p.Message, src) != 0 { + return false + } + return true +} + +var fieldIDToName_Response = map[int16]string{ + 1: "message", +} + +type TestService interface { + Echo(stream TestService_EchoServer) (err error) + + EchoClient(stream TestService_EchoClientServer) (err error) + + EchoServer(req *Request, stream TestService_EchoServerServer) (err error) + + EchoUnary(ctx context.Context, req *Request) (r *Response, err error) + + EchoBizException(stream TestService_EchoBizExceptionServer) (err error) + + EchoPingPong(ctx context.Context, req *Request) (r *Response, err error) +} + +type TestServiceEchoArgs struct { + Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` +} + +func NewTestServiceEchoArgs() *TestServiceEchoArgs { + return &TestServiceEchoArgs{} +} + +func (p *TestServiceEchoArgs) InitDefault() { +} + +var TestServiceEchoArgs_Req_DEFAULT *Request + +func (p *TestServiceEchoArgs) GetReq() (v *Request) { + if !p.IsSetReq() { + return TestServiceEchoArgs_Req_DEFAULT + } + return p.Req +} +func (p *TestServiceEchoArgs) SetReq(val *Request) { + p.Req = val +} + +func (p *TestServiceEchoArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *TestServiceEchoArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoArgs(%+v)", *p) +} + +func (p *TestServiceEchoArgs) DeepEqual(ano *TestServiceEchoArgs) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Req) { + return false + } + return true +} + +func (p *TestServiceEchoArgs) Field1DeepEqual(src *Request) bool { + + if !p.Req.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoArgs = map[int16]string{ + 1: "req", +} + +type TestServiceEchoResult struct { + Success *Response `thrift:"success,0,optional" frugal:"0,optional,Response" json:"success,omitempty"` +} + +func NewTestServiceEchoResult() *TestServiceEchoResult { + return &TestServiceEchoResult{} +} + +func (p *TestServiceEchoResult) InitDefault() { +} + +var TestServiceEchoResult_Success_DEFAULT *Response + +func (p *TestServiceEchoResult) GetSuccess() (v *Response) { + if !p.IsSetSuccess() { + return TestServiceEchoResult_Success_DEFAULT + } + return p.Success +} +func (p *TestServiceEchoResult) SetSuccess(x interface{}) { + p.Success = x.(*Response) +} + +func (p *TestServiceEchoResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *TestServiceEchoResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoResult(%+v)", *p) +} + +func (p *TestServiceEchoResult) DeepEqual(ano *TestServiceEchoResult) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field0DeepEqual(ano.Success) { + return false + } + return true +} + +func (p *TestServiceEchoResult) Field0DeepEqual(src *Response) bool { + + if !p.Success.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoResult = map[int16]string{ + 0: "success", +} + +type TestService_EchoServer interface { + streaming.Stream + + Recv() (*Request, error) + + Send(*Response) error +} + +type TestServiceEchoClientArgs struct { + Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` +} + +func NewTestServiceEchoClientArgs() *TestServiceEchoClientArgs { + return &TestServiceEchoClientArgs{} +} + +func (p *TestServiceEchoClientArgs) InitDefault() { +} + +var TestServiceEchoClientArgs_Req_DEFAULT *Request + +func (p *TestServiceEchoClientArgs) GetReq() (v *Request) { + if !p.IsSetReq() { + return TestServiceEchoClientArgs_Req_DEFAULT + } + return p.Req +} +func (p *TestServiceEchoClientArgs) SetReq(val *Request) { + p.Req = val +} + +func (p *TestServiceEchoClientArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *TestServiceEchoClientArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoClientArgs(%+v)", *p) +} + +func (p *TestServiceEchoClientArgs) DeepEqual(ano *TestServiceEchoClientArgs) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Req) { + return false + } + return true +} + +func (p *TestServiceEchoClientArgs) Field1DeepEqual(src *Request) bool { + + if !p.Req.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoClientArgs = map[int16]string{ + 1: "req", +} + +type TestServiceEchoClientResult struct { + Success *Response `thrift:"success,0,optional" frugal:"0,optional,Response" json:"success,omitempty"` +} + +func NewTestServiceEchoClientResult() *TestServiceEchoClientResult { + return &TestServiceEchoClientResult{} +} + +func (p *TestServiceEchoClientResult) InitDefault() { +} + +var TestServiceEchoClientResult_Success_DEFAULT *Response + +func (p *TestServiceEchoClientResult) GetSuccess() (v *Response) { + if !p.IsSetSuccess() { + return TestServiceEchoClientResult_Success_DEFAULT + } + return p.Success +} +func (p *TestServiceEchoClientResult) SetSuccess(x interface{}) { + p.Success = x.(*Response) +} + +func (p *TestServiceEchoClientResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *TestServiceEchoClientResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoClientResult(%+v)", *p) +} + +func (p *TestServiceEchoClientResult) DeepEqual(ano *TestServiceEchoClientResult) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field0DeepEqual(ano.Success) { + return false + } + return true +} + +func (p *TestServiceEchoClientResult) Field0DeepEqual(src *Response) bool { + + if !p.Success.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoClientResult = map[int16]string{ + 0: "success", +} + +type TestService_EchoClientServer interface { + streaming.Stream + + Recv() (*Request, error) + + SendAndClose(*Response) error +} + +type TestServiceEchoServerArgs struct { + Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` +} + +func NewTestServiceEchoServerArgs() *TestServiceEchoServerArgs { + return &TestServiceEchoServerArgs{} +} + +func (p *TestServiceEchoServerArgs) InitDefault() { +} + +var TestServiceEchoServerArgs_Req_DEFAULT *Request + +func (p *TestServiceEchoServerArgs) GetReq() (v *Request) { + if !p.IsSetReq() { + return TestServiceEchoServerArgs_Req_DEFAULT + } + return p.Req +} +func (p *TestServiceEchoServerArgs) SetReq(val *Request) { + p.Req = val +} + +func (p *TestServiceEchoServerArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *TestServiceEchoServerArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoServerArgs(%+v)", *p) +} + +func (p *TestServiceEchoServerArgs) DeepEqual(ano *TestServiceEchoServerArgs) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Req) { + return false + } + return true +} + +func (p *TestServiceEchoServerArgs) Field1DeepEqual(src *Request) bool { + + if !p.Req.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoServerArgs = map[int16]string{ + 1: "req", +} + +type TestServiceEchoServerResult struct { + Success *Response `thrift:"success,0,optional" frugal:"0,optional,Response" json:"success,omitempty"` +} + +func NewTestServiceEchoServerResult() *TestServiceEchoServerResult { + return &TestServiceEchoServerResult{} +} + +func (p *TestServiceEchoServerResult) InitDefault() { +} + +var TestServiceEchoServerResult_Success_DEFAULT *Response + +func (p *TestServiceEchoServerResult) GetSuccess() (v *Response) { + if !p.IsSetSuccess() { + return TestServiceEchoServerResult_Success_DEFAULT + } + return p.Success +} +func (p *TestServiceEchoServerResult) SetSuccess(x interface{}) { + p.Success = x.(*Response) +} + +func (p *TestServiceEchoServerResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *TestServiceEchoServerResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoServerResult(%+v)", *p) +} + +func (p *TestServiceEchoServerResult) DeepEqual(ano *TestServiceEchoServerResult) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field0DeepEqual(ano.Success) { + return false + } + return true +} + +func (p *TestServiceEchoServerResult) Field0DeepEqual(src *Response) bool { + + if !p.Success.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoServerResult = map[int16]string{ + 0: "success", +} + +type TestService_EchoServerServer interface { + streaming.Stream + + Send(*Response) error +} + +type TestServiceEchoUnaryArgs struct { + Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` +} + +func NewTestServiceEchoUnaryArgs() *TestServiceEchoUnaryArgs { + return &TestServiceEchoUnaryArgs{} +} + +func (p *TestServiceEchoUnaryArgs) InitDefault() { +} + +var TestServiceEchoUnaryArgs_Req_DEFAULT *Request + +func (p *TestServiceEchoUnaryArgs) GetReq() (v *Request) { + if !p.IsSetReq() { + return TestServiceEchoUnaryArgs_Req_DEFAULT + } + return p.Req +} +func (p *TestServiceEchoUnaryArgs) SetReq(val *Request) { + p.Req = val +} + +func (p *TestServiceEchoUnaryArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *TestServiceEchoUnaryArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoUnaryArgs(%+v)", *p) +} + +func (p *TestServiceEchoUnaryArgs) DeepEqual(ano *TestServiceEchoUnaryArgs) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Req) { + return false + } + return true +} + +func (p *TestServiceEchoUnaryArgs) Field1DeepEqual(src *Request) bool { + + if !p.Req.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoUnaryArgs = map[int16]string{ + 1: "req", +} + +type TestServiceEchoUnaryResult struct { + Success *Response `thrift:"success,0,optional" frugal:"0,optional,Response" json:"success,omitempty"` +} + +func NewTestServiceEchoUnaryResult() *TestServiceEchoUnaryResult { + return &TestServiceEchoUnaryResult{} +} + +func (p *TestServiceEchoUnaryResult) InitDefault() { +} + +var TestServiceEchoUnaryResult_Success_DEFAULT *Response + +func (p *TestServiceEchoUnaryResult) GetSuccess() (v *Response) { + if !p.IsSetSuccess() { + return TestServiceEchoUnaryResult_Success_DEFAULT + } + return p.Success +} +func (p *TestServiceEchoUnaryResult) SetSuccess(x interface{}) { + p.Success = x.(*Response) +} + +func (p *TestServiceEchoUnaryResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *TestServiceEchoUnaryResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoUnaryResult(%+v)", *p) +} + +func (p *TestServiceEchoUnaryResult) DeepEqual(ano *TestServiceEchoUnaryResult) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field0DeepEqual(ano.Success) { + return false + } + return true +} + +func (p *TestServiceEchoUnaryResult) Field0DeepEqual(src *Response) bool { + + if !p.Success.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoUnaryResult = map[int16]string{ + 0: "success", +} + +type TestServiceEchoBizExceptionArgs struct { + Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` +} + +func NewTestServiceEchoBizExceptionArgs() *TestServiceEchoBizExceptionArgs { + return &TestServiceEchoBizExceptionArgs{} +} + +func (p *TestServiceEchoBizExceptionArgs) InitDefault() { +} + +var TestServiceEchoBizExceptionArgs_Req_DEFAULT *Request + +func (p *TestServiceEchoBizExceptionArgs) GetReq() (v *Request) { + if !p.IsSetReq() { + return TestServiceEchoBizExceptionArgs_Req_DEFAULT + } + return p.Req +} +func (p *TestServiceEchoBizExceptionArgs) SetReq(val *Request) { + p.Req = val +} + +func (p *TestServiceEchoBizExceptionArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *TestServiceEchoBizExceptionArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoBizExceptionArgs(%+v)", *p) +} + +func (p *TestServiceEchoBizExceptionArgs) DeepEqual(ano *TestServiceEchoBizExceptionArgs) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Req) { + return false + } + return true +} + +func (p *TestServiceEchoBizExceptionArgs) Field1DeepEqual(src *Request) bool { + + if !p.Req.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoBizExceptionArgs = map[int16]string{ + 1: "req", +} + +type TestServiceEchoBizExceptionResult struct { + Success *Response `thrift:"success,0,optional" frugal:"0,optional,Response" json:"success,omitempty"` +} + +func NewTestServiceEchoBizExceptionResult() *TestServiceEchoBizExceptionResult { + return &TestServiceEchoBizExceptionResult{} +} + +func (p *TestServiceEchoBizExceptionResult) InitDefault() { +} + +var TestServiceEchoBizExceptionResult_Success_DEFAULT *Response + +func (p *TestServiceEchoBizExceptionResult) GetSuccess() (v *Response) { + if !p.IsSetSuccess() { + return TestServiceEchoBizExceptionResult_Success_DEFAULT + } + return p.Success +} +func (p *TestServiceEchoBizExceptionResult) SetSuccess(x interface{}) { + p.Success = x.(*Response) +} + +func (p *TestServiceEchoBizExceptionResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *TestServiceEchoBizExceptionResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoBizExceptionResult(%+v)", *p) +} + +func (p *TestServiceEchoBizExceptionResult) DeepEqual(ano *TestServiceEchoBizExceptionResult) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field0DeepEqual(ano.Success) { + return false + } + return true +} + +func (p *TestServiceEchoBizExceptionResult) Field0DeepEqual(src *Response) bool { + + if !p.Success.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoBizExceptionResult = map[int16]string{ + 0: "success", +} + +type TestService_EchoBizExceptionServer interface { + streaming.Stream + + Recv() (*Request, error) + + SendAndClose(*Response) error +} + +type TestServiceEchoPingPongArgs struct { + Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` +} + +func NewTestServiceEchoPingPongArgs() *TestServiceEchoPingPongArgs { + return &TestServiceEchoPingPongArgs{} +} + +func (p *TestServiceEchoPingPongArgs) InitDefault() { +} + +var TestServiceEchoPingPongArgs_Req_DEFAULT *Request + +func (p *TestServiceEchoPingPongArgs) GetReq() (v *Request) { + if !p.IsSetReq() { + return TestServiceEchoPingPongArgs_Req_DEFAULT + } + return p.Req +} +func (p *TestServiceEchoPingPongArgs) SetReq(val *Request) { + p.Req = val +} + +func (p *TestServiceEchoPingPongArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *TestServiceEchoPingPongArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoPingPongArgs(%+v)", *p) +} + +func (p *TestServiceEchoPingPongArgs) DeepEqual(ano *TestServiceEchoPingPongArgs) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field1DeepEqual(ano.Req) { + return false + } + return true +} + +func (p *TestServiceEchoPingPongArgs) Field1DeepEqual(src *Request) bool { + + if !p.Req.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoPingPongArgs = map[int16]string{ + 1: "req", +} + +type TestServiceEchoPingPongResult struct { + Success *Response `thrift:"success,0,optional" frugal:"0,optional,Response" json:"success,omitempty"` +} + +func NewTestServiceEchoPingPongResult() *TestServiceEchoPingPongResult { + return &TestServiceEchoPingPongResult{} +} + +func (p *TestServiceEchoPingPongResult) InitDefault() { +} + +var TestServiceEchoPingPongResult_Success_DEFAULT *Response + +func (p *TestServiceEchoPingPongResult) GetSuccess() (v *Response) { + if !p.IsSetSuccess() { + return TestServiceEchoPingPongResult_Success_DEFAULT + } + return p.Success +} +func (p *TestServiceEchoPingPongResult) SetSuccess(x interface{}) { + p.Success = x.(*Response) +} + +func (p *TestServiceEchoPingPongResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *TestServiceEchoPingPongResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TestServiceEchoPingPongResult(%+v)", *p) +} + +func (p *TestServiceEchoPingPongResult) DeepEqual(ano *TestServiceEchoPingPongResult) bool { + if p == ano { + return true + } else if p == nil || ano == nil { + return false + } + if !p.Field0DeepEqual(ano.Success) { + return false + } + return true +} + +func (p *TestServiceEchoPingPongResult) Field0DeepEqual(src *Response) bool { + + if !p.Success.DeepEqual(src) { + return false + } + return true +} + +var fieldIDToName_TestServiceEchoPingPongResult = map[int16]string{ + 0: "success", +} diff --git a/internal/mocks/thrift/stream.thrift b/internal/mocks/thrift/stream.thrift new file mode 100644 index 0000000000..88461092c0 --- /dev/null +++ b/internal/mocks/thrift/stream.thrift @@ -0,0 +1,19 @@ +namespace go thrift + +struct Request { + 1: required string message, +} + +struct Response { + 1: required string message, +} + +service TestService { + Response Echo (1: Request req) (streaming.mode="bidirectional"), + Response EchoClient (1: Request req) (streaming.mode="client"), + Response EchoServer (1: Request req) (streaming.mode="server"), + Response EchoUnary (1: Request req) (streaming.mode="unary"), // not recommended + Response EchoBizException (1: Request req) (streaming.mode="client"), + + Response EchoPingPong (1: Request req), // KitexThrift, non-streaming +} \ No newline at end of file diff --git a/internal/mocks/thrift/test.go b/internal/mocks/thrift/test.go index ccc8a6f27b..f73d9f0293 100644 --- a/internal/mocks/thrift/test.go +++ b/internal/mocks/thrift/test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -// Code generated by thriftgo (0.3.14). DO NOT EDIT. +// Code generated by thriftgo (0.3.15). DO NOT EDIT. package thrift diff --git a/pkg/generic/descriptor/descriptor.go b/pkg/generic/descriptor/descriptor.go index 4ef2c27fb1..8df2bce30f 100644 --- a/pkg/generic/descriptor/descriptor.go +++ b/pkg/generic/descriptor/descriptor.go @@ -91,12 +91,13 @@ func (d *StructDescriptor) CheckRequired(rw map[int32]struct{}) error { // FunctionDescriptor idl function descriptor type FunctionDescriptor struct { - Name string - Oneway bool - Request *TypeDescriptor - Response *TypeDescriptor - HasRequestBase bool - StreamingMode serviceinfo.StreamingMode + Name string + Oneway bool + Request *TypeDescriptor + Response *TypeDescriptor + HasRequestBase bool + IsWithoutWrapping bool // true when it's a streaming method. this indicates whether the Request and Response are not wrapped in struct + StreamingMode serviceinfo.StreamingMode } // ServiceDescriptor idl service descriptor diff --git a/pkg/generic/generic_service.go b/pkg/generic/generic_service.go index d6a3489350..5bff8c6f1c 100644 --- a/pkg/generic/generic_service.go +++ b/pkg/generic/generic_service.go @@ -18,14 +18,8 @@ package generic import ( "context" - "fmt" - "github.com/cloudwego/gopkg/bufiox" - "github.com/cloudwego/gopkg/protocol/thrift/base" - - "github.com/cloudwego/kitex/pkg/generic/proto" - "github.com/cloudwego/kitex/pkg/generic/thrift" - codecProto "github.com/cloudwego/kitex/pkg/remote/codec/protobuf" + "github.com/cloudwego/kitex/internal/generic" "github.com/cloudwego/kitex/pkg/serviceinfo" ) @@ -82,8 +76,16 @@ func GetMethodInfo(messageReaderWriter interface{}, serviceName string) (methods methods = map[string]serviceinfo.MethodInfo{ serviceinfo.GenericMethod: serviceinfo.NewMethodInfo( callHandler, - func() interface{} { return &Args{inner: messageReaderWriter} }, - func() interface{} { return &Result{inner: messageReaderWriter} }, + func() interface{} { + args := &Args{} + args.SetCodec(messageReaderWriter) + return args + }, + func() interface{} { + result := &Result{} + result.SetCodec(messageReaderWriter) + return result + }, false, ), } @@ -116,166 +118,12 @@ type WithCodec interface { } // Args generic request -type Args struct { - Request interface{} - Method string - base *base.Base - inner interface{} -} - -var ( - _ codecProto.MessageWriterWithContext = (*Args)(nil) - _ codecProto.MessageReaderWithMethodWithContext = (*Args)(nil) - _ WithCodec = (*Args)(nil) -) - -// SetCodec ... -func (g *Args) SetCodec(inner interface{}) { - g.inner = inner -} - -func (g *Args) GetOrSetBase() interface{} { - if g.base == nil { - g.base = base.NewBase() - } - return g.base -} - -// Write ... -func (g *Args) Write(ctx context.Context, method string, out bufiox.Writer) error { - if err, ok := g.inner.(error); ok { - return err - } - if w, ok := g.inner.(thrift.MessageWriter); ok { - return w.Write(ctx, out, g.Request, method, true, g.base) - } - return fmt.Errorf("unexpected Args writer type: %T", g.inner) -} - -func (g *Args) WritePb(ctx context.Context, method string) (interface{}, error) { - if err, ok := g.inner.(error); ok { - return nil, err - } - if w, ok := g.inner.(proto.MessageWriter); ok { - return w.Write(ctx, g.Request, method, true) - } - return nil, fmt.Errorf("unexpected Args writer type: %T", g.inner) -} - -// Read ... -func (g *Args) Read(ctx context.Context, method string, dataLen int, in bufiox.Reader) error { - if err, ok := g.inner.(error); ok { - return err - } - if rw, ok := g.inner.(thrift.MessageReader); ok { - g.Method = method - var err error - g.Request, err = rw.Read(ctx, method, false, dataLen, in) - return err - } - return fmt.Errorf("unexpected Args reader type: %T", g.inner) -} - -func (g *Args) ReadPb(ctx context.Context, method string, in []byte) error { - if err, ok := g.inner.(error); ok { - return err - } - if w, ok := g.inner.(proto.MessageReader); ok { - g.Method = method - var err error - g.Request, err = w.Read(ctx, method, false, in) - return err - } - return fmt.Errorf("unexpected Args reader type: %T", g.inner) -} - -// GetFirstArgument implements util.KitexArgs. -func (g *Args) GetFirstArgument() interface{} { - return g.Request -} +type Args = generic.Args // Result generic response -type Result struct { - Success interface{} - inner interface{} -} +type Result = generic.Result var ( - _ codecProto.MessageWriterWithContext = (*Result)(nil) - _ codecProto.MessageReaderWithMethodWithContext = (*Result)(nil) - _ WithCodec = (*Result)(nil) + _ WithCodec = (*Args)(nil) + _ WithCodec = (*Result)(nil) ) - -// SetCodec ... -func (r *Result) SetCodec(inner interface{}) { - r.inner = inner -} - -// Write ... -func (r *Result) Write(ctx context.Context, method string, out bufiox.Writer) error { - if err, ok := r.inner.(error); ok { - return err - } - if w, ok := r.inner.(thrift.MessageWriter); ok { - return w.Write(ctx, out, r.Success, method, false, nil) - } - return fmt.Errorf("unexpected Result writer type: %T", r.inner) -} - -func (r *Result) WritePb(ctx context.Context, method string) (interface{}, error) { - if err, ok := r.inner.(error); ok { - return nil, err - } - if w, ok := r.inner.(proto.MessageWriter); ok { - return w.Write(ctx, r.Success, method, false) - } - return nil, fmt.Errorf("unexpected Result writer type: %T", r.inner) -} - -// Read ... -func (r *Result) Read(ctx context.Context, method string, dataLen int, in bufiox.Reader) error { - if err, ok := r.inner.(error); ok { - return err - } - if w, ok := r.inner.(thrift.MessageReader); ok { - var err error - r.Success, err = w.Read(ctx, method, true, dataLen, in) - return err - } - return fmt.Errorf("unexpected Result reader type: %T", r.inner) -} - -func (r *Result) ReadPb(ctx context.Context, method string, in []byte) error { - if err, ok := r.inner.(error); ok { - return err - } - if w, ok := r.inner.(proto.MessageReader); ok { - var err error - r.Success, err = w.Read(ctx, method, true, in) - return err - } - return fmt.Errorf("unexpected Result reader type: %T", r.inner) -} - -// GetSuccess implements util.KitexResult. -func (r *Result) GetSuccess() interface{} { - if !r.IsSetSuccess() { - return nil - } - return r.Success -} - -// SetSuccess implements util.KitexResult. -func (r *Result) SetSuccess(x interface{}) { - r.Success = x -} - -// IsSetSuccess ... -func (r *Result) IsSetSuccess() bool { - return r.Success != nil -} - -// GetResult ... -func (r *Result) GetResult() interface{} { - return r.Success -} diff --git a/pkg/generic/grpcjson_test/generic_init.go b/pkg/generic/grpcjson_test/generic_init.go new file mode 100644 index 0000000000..a897bc2128 --- /dev/null +++ b/pkg/generic/grpcjson_test/generic_init.go @@ -0,0 +1,424 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test + +/* +import ( + "context" + "errors" + "fmt" + "io" + "net" + "strings" + "sync" + "time" + + "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/client/genericclient" + kt "github.com/cloudwego/kitex/internal/mocks/thrift" + "github.com/cloudwego/kitex/pkg/generic" + "github.com/cloudwego/kitex/pkg/kerrors" + kitex "github.com/cloudwego/kitex/pkg/serviceinfo" + "github.com/cloudwego/kitex/pkg/streaming" + "github.com/cloudwego/kitex/server" + "github.com/cloudwego/kitex/transport" +) + +func newGenericStreamingClient(g generic.Generic, targetIPPort string) genericclient.Client { + cli, err := genericclient.NewStreamingClient("destService", g, + client.WithTransportProtocol(transport.GRPC), + client.WithHostPorts(targetIPPort), + ) + if err != nil { + panic(err) + } + return cli +} + +func newGenericClient(g generic.Generic, targetIPPort string) genericclient.Client { + cli, err := genericclient.NewClient("destService", g, + client.WithHostPorts(targetIPPort), + client.WithTransportProtocol(transport.TTHeader)) + if err != nil { + panic(err) + } + return cli +} + +var _ kt.TestService = &StreamingTestImpl{} + +type StreamingTestImpl struct{} + +func (s *StreamingTestImpl) Echo(stream kt.TestService_EchoServer) (err error) { + fmt.Println("Echo called") + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("panic: %v", p) + } + wg.Done() + }() + defer stream.Close() + for { + msg, recvErr := stream.Recv() + if recvErr == io.EOF { + return + } else if recvErr != nil { + err = recvErr + return + } + fmt.Printf("Echo: received message = %s\n", msg.Message) + time.Sleep(100 * time.Millisecond) + } + }() + + go func() { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("panic: %v", p) + } + wg.Done() + }() + resp := &kt.Response{} + for i := 0; i < 3; i++ { + resp.Message = fmt.Sprintf("%dth response", i) + if sendErr := stream.Send(resp); sendErr != nil { + err = sendErr + return + } + fmt.Printf("Echo: sent message = %s\n", resp) + time.Sleep(100 * time.Millisecond) + } + }() + wg.Wait() + return +} + +func (s *StreamingTestImpl) EchoClient(stream kt.TestService_EchoClientServer) (err error) { + fmt.Println("EchoClient called") + var msgs []string + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return err + } + fmt.Printf("Recv: %s\n", req.Message) + msgs = append(msgs, req.Message) + time.Sleep(100 * time.Millisecond) + } + return stream.SendAndClose(&kt.Response{Message: "all message: " + strings.Join(msgs, ", ")}) +} + +func (s *StreamingTestImpl) EchoServer(req *kt.Request, stream kt.TestService_EchoServerServer) (err error) { + fmt.Println("EchoServer called") + resp := &kt.Response{} + for i := 0; i < 3; i++ { + resp.Message = fmt.Sprintf("%v -> %dth response", req.Message, i) + err := stream.Send(resp) + if err != nil { + return err + } + time.Sleep(100 * time.Millisecond) + } + return +} + +func (s *StreamingTestImpl) EchoUnary(ctx context.Context, req *kt.Request) (resp *kt.Response, err error) { + fmt.Println("EchoUnary called") + resp = &kt.Response{} + resp.Message = "hello " + req.Message + return +} + +func (s *StreamingTestImpl) EchoPingPong(ctx context.Context, req *kt.Request) (resp *kt.Response, err error) { + fmt.Println("EchoPingPong called") + resp = &kt.Response{} + resp.Message = "hello " + req.Message + return +} + +func (s *StreamingTestImpl) EchoBizException(stream kt.TestService_EchoBizExceptionServer) error { + fmt.Println("EchoBizException called") + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return err + } + fmt.Printf("Recv: %s\n", req.Message) + time.Sleep(100 * time.Millisecond) + } + return kerrors.NewBizStatusError(int32(404), "not found") +} + +// normal server +func newMockServer(handler kt.TestService, addr net.Addr, opts ...server.Option) server.Server { + var options []server.Option + + opts = append(opts, server.WithServiceAddr(addr)) + options = append(options, opts...) + options = append(options, server.WithCompatibleMiddlewareForUnary()) + + svr := server.NewServer(options...) + if err := svr.RegisterService(serviceInfo(), handler); err != nil { + panic(err) + } + go func() { + err := svr.Run() + if err != nil { + panic(err) + } + }() + return svr +} + +var serviceMethods = map[string]kitex.MethodInfo{ + "Echo": kitex.NewMethodInfo( + echoHandler, + newTestServiceEchoArgs, + newTestServiceEchoResult, + false, + kitex.WithStreamingMode(kitex.StreamingBidirectional), + ), + "EchoClient": kitex.NewMethodInfo( + echoClientHandler, + newTestServiceEchoClientArgs, + newTestServiceEchoClientResult, + false, + kitex.WithStreamingMode(kitex.StreamingClient), + ), + "EchoServer": kitex.NewMethodInfo( + echoServerHandler, + newTestServiceEchoServerArgs, + newTestServiceEchoServerResult, + false, + kitex.WithStreamingMode(kitex.StreamingServer), + ), + "EchoUnary": kitex.NewMethodInfo( + echoUnaryHandler, + newTestServiceEchoUnaryArgs, + newTestServiceEchoUnaryResult, + false, + kitex.WithStreamingMode(kitex.StreamingUnary), + ), + "EchoBizException": kitex.NewMethodInfo( + echoBizExceptionHandler, + newTestServiceEchoBizExceptionArgs, + newTestServiceEchoBizExceptionResult, + false, + kitex.WithStreamingMode(kitex.StreamingClient), + ), + "EchoPingPong": kitex.NewMethodInfo( + echoPingPongHandler, + newTestServiceEchoPingPongArgs, + newTestServiceEchoPingPongResult, + false, + kitex.WithStreamingMode(kitex.StreamingNone), + ), +} + +func serviceInfo() *kitex.ServiceInfo { + serviceName := "TestService" + handlerType := (*kt.TestService)(nil) + methods := map[string]kitex.MethodInfo{} + for name, m := range serviceMethods { + methods[name] = m + } + extra := map[string]interface{}{ + "PackageName": "thrift", + } + extra["streaming"] = true + + svcInfo := &kitex.ServiceInfo{ + ServiceName: serviceName, + HandlerType: handlerType, + Methods: methods, + PayloadCodec: kitex.Thrift, + KiteXGenVersion: "v0.11.0", + Extra: extra, + } + return svcInfo +} + +func echoHandler(ctx context.Context, handler, arg, result interface{}) error { + st, ok := arg.(*streaming.Args) + if !ok { + return errors.New("TestService.Echo is a thrift streaming method, please call with Kitex StreamClient") + } + stream := &testServiceEchoServer{st.Stream} + return handler.(kt.TestService).Echo(stream) +} + +type testServiceEchoServer struct { + streaming.Stream +} + +func (x *testServiceEchoServer) Send(m *kt.Response) error { + return x.Stream.SendMsg(m) +} + +func (x *testServiceEchoServer) Recv() (*kt.Request, error) { + m := new(kt.Request) + return m, x.Stream.RecvMsg(m) +} + +func newTestServiceEchoArgs() interface{} { + return kt.NewTestServiceEchoArgs() +} + +func newTestServiceEchoResult() interface{} { + return kt.NewTestServiceEchoResult() +} + +func echoClientHandler(ctx context.Context, handler, arg, result interface{}) error { + st, ok := arg.(*streaming.Args) + if !ok { + return errors.New("TestService.EchoClient is a thrift streaming method, please call with Kitex StreamClient") + } + stream := &testServiceEchoClientServer{st.Stream} + return handler.(kt.TestService).EchoClient(stream) +} + +type testServiceEchoClientServer struct { + streaming.Stream +} + +func (x *testServiceEchoClientServer) SendAndClose(m *kt.Response) error { + return x.Stream.SendMsg(m) +} + +func (x *testServiceEchoClientServer) Recv() (*kt.Request, error) { + m := new(kt.Request) + return m, x.Stream.RecvMsg(m) +} + +func newTestServiceEchoClientArgs() interface{} { + return kt.NewTestServiceEchoClientArgs() +} + +func newTestServiceEchoClientResult() interface{} { + return kt.NewTestServiceEchoClientResult() +} + +func echoServerHandler(ctx context.Context, handler, arg, result interface{}) error { + st, ok := arg.(*streaming.Args) + if !ok { + return errors.New("TestService.EchoServer is a thrift streaming method, please call with Kitex StreamClient") + } + stream := &testServiceEchoServerServer{st.Stream} + req := new(kt.Request) + if err := st.Stream.RecvMsg(req); err != nil { + return err + } + return handler.(kt.TestService).EchoServer(req, stream) +} + +type testServiceEchoServerServer struct { + streaming.Stream +} + +func (x *testServiceEchoServerServer) Send(m *kt.Response) error { + return x.Stream.SendMsg(m) +} + +func newTestServiceEchoServerArgs() interface{} { + return kt.NewTestServiceEchoServerArgs() +} + +func newTestServiceEchoServerResult() interface{} { + return kt.NewTestServiceEchoServerResult() +} + +func echoUnaryHandler(ctx context.Context, handler, arg, result interface{}) error { + if streaming.GetStream(ctx) == nil { + return errors.New("TestService.EchoUnary is a thrift streaming unary method, please call with Kitex StreamClient or remove the annotation streaming.mode") + } + realArg := arg.(*kt.TestServiceEchoUnaryArgs) + realResult := result.(*kt.TestServiceEchoUnaryResult) + success, err := handler.(kt.TestService).EchoUnary(ctx, realArg.Req) + if err != nil { + return err + } + realResult.Success = success + return nil +} + +func newTestServiceEchoUnaryArgs() interface{} { + return kt.NewTestServiceEchoUnaryArgs() +} + +func newTestServiceEchoUnaryResult() interface{} { + return kt.NewTestServiceEchoUnaryResult() +} + +func echoBizExceptionHandler(ctx context.Context, handler, arg, result interface{}) error { + st, ok := arg.(*streaming.Args) + if !ok { + return errors.New("TestService.EchoBizException is a thrift streaming method, please call with Kitex StreamClient") + } + stream := &testServiceEchoBizExceptionServer{st.Stream} + return handler.(kt.TestService).EchoBizException(stream) +} + +type testServiceEchoBizExceptionServer struct { + streaming.Stream +} + +func (x *testServiceEchoBizExceptionServer) SendAndClose(m *kt.Response) error { + return x.Stream.SendMsg(m) +} + +func (x *testServiceEchoBizExceptionServer) Recv() (*kt.Request, error) { + m := new(kt.Request) + return m, x.Stream.RecvMsg(m) +} + +func newTestServiceEchoBizExceptionArgs() interface{} { + return kt.NewTestServiceEchoBizExceptionArgs() +} + +func newTestServiceEchoBizExceptionResult() interface{} { + return kt.NewTestServiceEchoBizExceptionResult() +} + +func echoPingPongHandler(ctx context.Context, handler, arg, result interface{}) error { + realArg := arg.(*kt.TestServiceEchoPingPongArgs) + realResult := result.(*kt.TestServiceEchoPingPongResult) + success, err := handler.(kt.TestService).EchoPingPong(ctx, realArg.Req) + if err != nil { + return err + } + realResult.Success = success + return nil +} + +func newTestServiceEchoPingPongArgs() interface{} { + return kt.NewTestServiceEchoPingPongArgs() +} + +func newTestServiceEchoPingPongResult() interface{} { + return kt.NewTestServiceEchoPingPongResult() +} +*/ diff --git a/pkg/generic/grpcjson_test/generic_test.go b/pkg/generic/grpcjson_test/generic_test.go new file mode 100644 index 0000000000..9d3b4fe7de --- /dev/null +++ b/pkg/generic/grpcjson_test/generic_test.go @@ -0,0 +1,271 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test + +/* +import ( + "context" + "fmt" + "io" + "net" + "reflect" + "strings" + "sync" + "testing" + "time" + + "github.com/tidwall/gjson" + + "github.com/cloudwego/kitex/client/genericclient" + kt "github.com/cloudwego/kitex/internal/mocks/thrift" + "github.com/cloudwego/kitex/internal/test" + "github.com/cloudwego/kitex/pkg/generic" + "github.com/cloudwego/kitex/pkg/kerrors" + "github.com/cloudwego/kitex/server" +) + +var idl = "./idl/api.thrift" + +func TestClientStreaming(t *testing.T) { + ctx := context.Background() + addr := test.GetLocalAddress() + + svr := initMockTestServer(new(StreamingTestImpl), addr) + defer svr.Stop() + + cli := initStreamingClient(t, addr, idl, false) + testClientStreaming(t, ctx, cli) + + // with dynamicgo + cli = initStreamingClient(t, addr, idl, true) + testClientStreaming(t, ctx, cli) +} + +func TestServerStreaming(t *testing.T) { + ctx := context.Background() + addr := test.GetLocalAddress() + + svr := initMockTestServer(new(StreamingTestImpl), addr) + defer svr.Stop() + + cli := initStreamingClient(t, addr, idl, false) + testServerStreaming(t, ctx, cli) + + // with dynamicgo + cli = initStreamingClient(t, addr, idl, true) + testServerStreaming(t, ctx, cli) +} + +func TestBidirectionalStreaming(t *testing.T) { + ctx := context.Background() + addr := test.GetLocalAddress() + + svr := initMockTestServer(new(StreamingTestImpl), addr) + defer svr.Stop() + + cli := initStreamingClient(t, addr, idl, false) + testBidirectionalStreaming(t, ctx, cli) + + // with dynamicgo + cli = initStreamingClient(t, addr, idl, true) + testBidirectionalStreaming(t, ctx, cli) +} + +func TestUnary(t *testing.T) { + ctx := context.Background() + addr := test.GetLocalAddress() + + svr := initMockTestServer(new(StreamingTestImpl), addr) + defer svr.Stop() + + cli := initStreamingClient(t, addr, idl, false) + testUnary(t, ctx, cli) + + // with dynamicgo + cli = initStreamingClient(t, addr, idl, true) + testUnary(t, ctx, cli) +} + +func TestBizException(t *testing.T) { + ctx := context.Background() + addr := test.GetLocalAddress() + + svr := initMockTestServer(new(StreamingTestImpl), addr) + defer svr.Stop() + + cli := initStreamingClient(t, addr, idl, false) + streamCli, err := genericclient.NewClientStreaming(ctx, cli, "EchoBizException") + test.Assert(t, err == nil) + for i := 0; i < 3; i++ { + req := fmt.Sprintf(`{"message": "grpc client streaming generic %dth request"}`, i) + err = streamCli.Send(req) + test.Assert(t, err == nil, err) + time.Sleep(100 * time.Millisecond) + } + resp, err := streamCli.CloseAndRecv() + test.Assert(t, err != nil) + bizStatusErr, ok := kerrors.FromBizStatusError(err) + test.Assert(t, ok) + test.Assert(t, bizStatusErr.BizStatusCode() == 404) + test.Assert(t, bizStatusErr.BizMessage() == "not found") + test.Assert(t, resp == nil) +} + +func TestNoneStreaming(t *testing.T) { + ctx := context.Background() + addr := test.GetLocalAddress() + + svr := initMockTestServer(new(StreamingTestImpl), addr) + defer svr.Stop() + + cli := initGenericClient(t, addr, idl, false) + testNoneStreaming(t, ctx, cli) + + // with dynamicgo + cli = initGenericClient(t, addr, idl, true) + testNoneStreaming(t, ctx, cli) +} + +func initStreamingClient(t *testing.T, addr, idl string, enableDynamicgo bool) genericclient.Client { + g, err := getJsonThriftGeneric(idl, enableDynamicgo) + test.Assert(t, err == nil) + return newGenericStreamingClient(g, addr) +} + +func initGenericClient(t *testing.T, addr, idl string, enableDynamicgo bool) genericclient.Client { + g, err := getJsonThriftGeneric(idl, enableDynamicgo) + test.Assert(t, err == nil) + return newGenericClient(g, addr) +} + +func getJsonThriftGeneric(idl string, enableDynamicgo bool) (generic.Generic, error) { + var p generic.DescriptorProvider + var err error + if enableDynamicgo { + p, err = generic.NewThriftFileProviderWithDynamicGo(idl) + if err != nil { + return nil, err + } + } else { + p, err = generic.NewThriftFileProvider(idl) + if err != nil { + return nil, err + } + } + g, err := generic.JSONThriftGeneric(p) + if err != nil { + return nil, err + } + return g, nil +} + +func initMockTestServer(handler kt.TestService, address string) server.Server { + addr, _ := net.ResolveTCPAddr("tcp", address) + return newMockServer(handler, addr) +} + +func testClientStreaming(t *testing.T, ctx context.Context, cli genericclient.Client) { + streamCli, err := genericclient.NewClientStreaming(ctx, cli, "EchoClient") + test.Assert(t, err == nil) + for i := 0; i < 3; i++ { + req := fmt.Sprintf(`{"message": "grpc client streaming generic %dth request"}`, i) + err = streamCli.Send(req) + test.Assert(t, err == nil, err) + time.Sleep(100 * time.Millisecond) + } + resp, err := streamCli.CloseAndRecv() + test.Assert(t, err == nil, err) + strResp, ok := resp.(string) + test.Assert(t, ok) + fmt.Printf("clientStreaming message received: %v\n", strResp) + test.Assert(t, reflect.DeepEqual(gjson.Get(strResp, "message").String(), + "all message: grpc client streaming generic 0th request, grpc client streaming generic 1th request, grpc client streaming generic 2th request")) +} + +func testServerStreaming(t *testing.T, ctx context.Context, cli genericclient.Client) { + streamCli, err := genericclient.NewServerStreaming(ctx, cli, "EchoServer", `{"message": "grpc server streaming generic request"}`) + test.Assert(t, err == nil, err) + for { + resp, err := streamCli.Recv() + if err != nil { + test.Assert(t, err == io.EOF, err) + fmt.Println("serverStreaming message receive done") + break + } else { + strResp, ok := resp.(string) + test.Assert(t, ok) + fmt.Printf("serverStreaming message received: %s\n", strResp) + test.Assert(t, strings.Contains(gjson.Get(strResp, "message").String(), "grpc server streaming generic request ->")) + } + time.Sleep(100 * time.Millisecond) + } +} + +func testBidirectionalStreaming(t *testing.T, ctx context.Context, cli genericclient.Client) { + streamCli, err := genericclient.NewBidirectionalStreaming(ctx, cli, "Echo") + test.Assert(t, err == nil) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + defer streamCli.Close() + for i := 0; i < 3; i++ { + req := fmt.Sprintf(`{"message": "grpc bidirectional streaming generic %dth request"}`, i) + err = streamCli.Send(req) + test.Assert(t, err == nil) + fmt.Printf("Echo send: req = %s\n", req) + } + }() + + go func() { + defer wg.Done() + for { + resp, err := streamCli.Recv() + if err != nil { + test.Assert(t, err == io.EOF, err) + fmt.Println("bidirectionalStreaming message receive done") + break + } else { + strResp, ok := resp.(string) + test.Assert(t, ok) + fmt.Printf("bidirectionalStreaming message received: %s\n", strResp) + test.Assert(t, strings.Contains(gjson.Get(strResp, "message").String(), "th response")) + } + time.Sleep(100 * time.Millisecond) + } + }() + wg.Wait() +} + +func testUnary(t *testing.T, ctx context.Context, cli genericclient.Client) { + resp, err := cli.GenericCall(ctx, "EchoUnary", `{"message": "unary request"}`) + test.Assert(t, err == nil, err) + strResp, ok := resp.(string) + test.Assert(t, ok) + test.Assert(t, reflect.DeepEqual(gjson.Get(strResp, "message").String(), "hello unary request")) +} + +func testNoneStreaming(t *testing.T, ctx context.Context, cli genericclient.Client) { + resp, err := cli.GenericCall(ctx, "EchoPingPong", `{"message": "ping pong request"}`) + test.Assert(t, err == nil, err) + strResp, ok := resp.(string) + test.Assert(t, ok) + test.Assert(t, reflect.DeepEqual(gjson.Get(strResp, "message").String(), "hello ping pong request")) +} +*/ diff --git a/pkg/generic/grpcjson_test/idl/api.thrift b/pkg/generic/grpcjson_test/idl/api.thrift new file mode 100644 index 0000000000..88461092c0 --- /dev/null +++ b/pkg/generic/grpcjson_test/idl/api.thrift @@ -0,0 +1,19 @@ +namespace go thrift + +struct Request { + 1: required string message, +} + +struct Response { + 1: required string message, +} + +service TestService { + Response Echo (1: Request req) (streaming.mode="bidirectional"), + Response EchoClient (1: Request req) (streaming.mode="client"), + Response EchoServer (1: Request req) (streaming.mode="server"), + Response EchoUnary (1: Request req) (streaming.mode="unary"), // not recommended + Response EchoBizException (1: Request req) (streaming.mode="client"), + + Response EchoPingPong (1: Request req), // KitexThrift, non-streaming +} \ No newline at end of file diff --git a/pkg/generic/thrift/http_go116plus_amd64.go b/pkg/generic/thrift/http_amd64.go similarity index 98% rename from pkg/generic/thrift/http_go116plus_amd64.go rename to pkg/generic/thrift/http_amd64.go index 94a04edfe2..2911419e3d 100644 --- a/pkg/generic/thrift/http_go116plus_amd64.go +++ b/pkg/generic/thrift/http_amd64.go @@ -1,5 +1,5 @@ -//go:build amd64 && go1.16 -// +build amd64,go1.16 +//go:build amd64 +// +build amd64 /* * Copyright 2023 CloudWeGo Authors diff --git a/pkg/generic/thrift/http_fallback.go b/pkg/generic/thrift/http_fallback.go index b6ea961a03..b182b31630 100644 --- a/pkg/generic/thrift/http_fallback.go +++ b/pkg/generic/thrift/http_fallback.go @@ -1,5 +1,5 @@ -//go:build !amd64 || !go1.16 -// +build !amd64 !go1.16 +//go:build !amd64 +// +build !amd64 /* * Copyright 2023 CloudWeGo Authors diff --git a/pkg/generic/thrift/json.go b/pkg/generic/thrift/json.go index 4ba24e3896..c23af3c6bf 100644 --- a/pkg/generic/thrift/json.go +++ b/pkg/generic/thrift/json.go @@ -22,13 +22,13 @@ import ( "strconv" "github.com/bytedance/gopkg/lang/dirtmake" + "github.com/bytedance/sonic" "github.com/cloudwego/dynamicgo/conv" "github.com/cloudwego/dynamicgo/conv/t2j" dthrift "github.com/cloudwego/dynamicgo/thrift" "github.com/cloudwego/gopkg/bufiox" "github.com/cloudwego/gopkg/protocol/thrift" "github.com/cloudwego/gopkg/protocol/thrift/base" - jsoniter "github.com/json-iterator/go" "github.com/tidwall/gjson" "github.com/cloudwego/kitex/pkg/generic/descriptor" @@ -36,6 +36,8 @@ import ( "github.com/cloudwego/kitex/pkg/utils" ) +var jsonSonic = sonic.Config{EscapeHTML: true}.Froze() + type JSONReaderWriter struct { *ReadJSON *WriteJSON @@ -124,10 +126,13 @@ func (m *WriteJSON) originalWrite(ctx context.Context, out bufiox.Writer, msg in Index: 0, } } - if err = wrapJSONWriter(ctx, &body, bw, typeDsc, &writerOption{requestBase: requestBase, binaryWithBase64: m.base64Binary}); err != nil { - return err + + opt := &writerOption{requestBase: requestBase, binaryWithBase64: m.base64Binary} + if fnDsc.IsWithoutWrapping { + return jsonWriter(ctx, &body, typeDsc, opt, bw) } - return err + + return wrapJSONWriter(ctx, &body, bw, typeDsc, &writerOption{requestBase: requestBase, binaryWithBase64: m.base64Binary}) } // NewReadJSON build ReadJSON according to ServiceDescriptor @@ -180,15 +185,15 @@ func (m *ReadJSON) Read(ctx context.Context, method string, isClient bool, dataL } var resp interface{} + var err error if tyDsc.Struct().Fields()[0].Type().Type() == dthrift.VOID { - if err := in.Skip(voidWholeLen); err != nil { + if err = in.Skip(voidWholeLen); err != nil { return nil, err } resp = descriptor.Void{} } else { transBuff := dirtmake.Bytes(dataLen, dataLen) - _, err := in.ReadBinary(transBuff) - if err != nil { + if _, err = in.ReadBinary(transBuff); err != nil { return nil, err } @@ -196,17 +201,19 @@ func (m *ReadJSON) Read(ctx context.Context, method string, isClient bool, dataL buf := dirtmake.Bytes(0, len(transBuff)*2) // thrift []byte to json []byte var t2jBinaryConv t2j.BinaryConv - if isClient { + if isClient && !fnDsc.IsWithoutWrapping() { t2jBinaryConv = t2j.NewBinaryConv(m.convOptsWithException) } else { t2jBinaryConv = t2j.NewBinaryConv(m.convOpts) } - if err := t2jBinaryConv.DoInto(ctx, tyDsc, transBuff, &buf); err != nil { + if err = t2jBinaryConv.DoInto(ctx, tyDsc, transBuff, &buf); err != nil { return nil, err } - buf = removePrefixAndSuffix(buf) + if !fnDsc.IsWithoutWrapping() { + buf = removePrefixAndSuffix(buf) + } resp = utils.SliceByteToString(buf) - if tyDsc.Struct().Fields()[0].Type().Type() == dthrift.STRING { + if !fnDsc.IsWithoutWrapping() && tyDsc.Struct().Fields()[0].Type().Type() == dthrift.STRING { strresp := resp.(string) resp, err = strconv.Unquote(strresp) if err != nil { @@ -223,13 +230,18 @@ func (m *ReadJSON) originalRead(ctx context.Context, method string, isClient boo if err != nil { return nil, err } - fDsc := fnDsc.Response + tyDsc := fnDsc.Response if !isClient { - fDsc = fnDsc.Request + tyDsc = fnDsc.Request } br := thrift.NewBufferReader(in) defer br.Recycle() - resp, err := skipStructReader(ctx, br, fDsc, &readerOption{forJSON: true, throwException: true, binaryWithBase64: m.binaryWithBase64}) + + if fnDsc.IsWithoutWrapping { + return structReader(ctx, tyDsc, &readerOption{forJSON: true, binaryWithBase64: m.binaryWithBase64}, br) + } + + resp, err := skipStructReader(ctx, br, tyDsc, &readerOption{forJSON: true, throwException: true, binaryWithBase64: m.binaryWithBase64}) if err != nil { return nil, err } @@ -245,7 +257,7 @@ func (m *ReadJSON) originalRead(ctx context.Context, method string, isClient boo } // resp is map - respNode, err := jsoniter.Marshal(resp) + respNode, err := jsonSonic.Marshal(resp) if err != nil { return nil, perrors.NewProtocolErrorWithType(perrors.InvalidData, fmt.Sprintf("response marshal failed. err:%#v", err)) } @@ -260,3 +272,26 @@ func removePrefixAndSuffix(buf []byte) []byte { } return buf } + +func jsonWriter(ctx context.Context, body *gjson.Result, typeDsc *descriptor.TypeDescriptor, opt *writerOption, bw *thrift.BufferWriter) error { + val, writer, err := nextJSONWriter(body, typeDsc, opt) + if err != nil { + return fmt.Errorf("nextWriter of field[%s] error %w", typeDsc.Name, err) + } + if err = writer(ctx, val, bw, typeDsc, opt); err != nil { + return fmt.Errorf("writer of field[%s] error %w", typeDsc.Name, err) + } + return nil +} + +func structReader(ctx context.Context, typeDesc *descriptor.TypeDescriptor, opt *readerOption, br *thrift.BufferReader) (v interface{}, err error) { + resp, err := readStruct(ctx, br, typeDesc, opt) + if err != nil { + return nil, err + } + respNode, err := jsonSonic.Marshal(resp) + if err != nil { + return nil, perrors.NewProtocolErrorWithType(perrors.InvalidData, fmt.Sprintf("streaming response marshal failed. err:%#v", err)) + } + return string(respNode), nil +} diff --git a/pkg/generic/thrift/json_go116plus_amd64.go b/pkg/generic/thrift/json_amd64.go similarity index 79% rename from pkg/generic/thrift/json_go116plus_amd64.go rename to pkg/generic/thrift/json_amd64.go index 42192a863c..0e434af567 100644 --- a/pkg/generic/thrift/json_go116plus_amd64.go +++ b/pkg/generic/thrift/json_amd64.go @@ -1,5 +1,5 @@ -//go:build amd64 && go1.16 -// +build amd64,go1.16 +//go:build amd64 +// +build amd64 /* * Copyright 2023 CloudWeGo Authors @@ -70,7 +70,7 @@ func (m *WriteJSON) Write(ctx context.Context, out bufiox.Writer, msg interface{ // msg is void or nil if _, ok := msg.(descriptor.Void); ok || msg == nil { - return m.writeFields(ctx, out, dynamicgoTypeDsc, nil, nil, isClient) + return writeFields(ctx, out, dynamicgoTypeDsc, nil, nil, isClient) } // msg is string @@ -80,12 +80,16 @@ func (m *WriteJSON) Write(ctx context.Context, out bufiox.Writer, msg interface{ } transBuff := utils.StringToSliceByte(s) - return m.writeFields(ctx, out, dynamicgoTypeDsc, &cv, transBuff, isClient) + if fnDsc.IsWithoutWrapping() { + return writeUnwrappedFields(ctx, out, dynamicgoTypeDsc, &cv, transBuff) + } else { + return writeFields(ctx, out, dynamicgoTypeDsc, &cv, transBuff, isClient) + } } type MsgType int -func (m *WriteJSON) writeFields(ctx context.Context, out bufiox.Writer, dynamicgoTypeDsc *dthrift.TypeDescriptor, cv *j2t.BinaryConv, transBuff []byte, isClient bool) error { +func writeFields(ctx context.Context, out bufiox.Writer, dynamicgoTypeDsc *dthrift.TypeDescriptor, cv *j2t.BinaryConv, transBuff []byte, isClient bool) error { dbuf := mcache.Malloc(len(transBuff))[0:0] defer mcache.Free(dbuf) @@ -124,3 +128,20 @@ func (m *WriteJSON) writeFields(ctx context.Context, out bufiox.Writer, dynamicg } return bw.WriteFieldStop() } + +func writeUnwrappedFields(ctx context.Context, out bufiox.Writer, dynamicgoTypeDsc *dthrift.TypeDescriptor, cv *j2t.BinaryConv, transBuff []byte) error { + dbuf := mcache.Malloc(len(transBuff))[0:0] + defer mcache.Free(dbuf) + + if err := cv.DoInto(ctx, dynamicgoTypeDsc, transBuff, &dbuf); err != nil { + return err + } + + if wb, err := out.Malloc(len(dbuf)); err != nil { + return err + } else { + copy(wb, dbuf) + } + dbuf = dbuf[:0] + return nil +} diff --git a/pkg/generic/thrift/json_fallback.go b/pkg/generic/thrift/json_fallback.go index 5bc204f2c9..da214400dd 100644 --- a/pkg/generic/thrift/json_fallback.go +++ b/pkg/generic/thrift/json_fallback.go @@ -1,5 +1,5 @@ -//go:build !amd64 || !go1.16 -// +build !amd64 !go1.16 +//go:build !amd64 +// +build !amd64 /* * Copyright 2023 CloudWeGo Authors diff --git a/pkg/generic/thrift/parse.go b/pkg/generic/thrift/parse.go index 332667a997..989ddb7a28 100644 --- a/pkg/generic/thrift/parse.go +++ b/pkg/generic/thrift/parse.go @@ -153,20 +153,68 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv mode := streamingMode(st) // only support single argument field := fn.Arguments[0] - req := &descriptor.TypeDescriptor{ - Type: descriptor.STRUCT, - Struct: &descriptor.StructDescriptor{ - FieldsByID: map[int32]*descriptor.FieldDescriptor{}, - FieldsByName: map[string]*descriptor.FieldDescriptor{}, - }, - } - var reqType *descriptor.TypeDescriptor - reqType, err = parseType(field.Type, tree, structsCache, initRecursionDepth, opts) + isStream := mode != serviceinfo.StreamingNone && mode != serviceinfo.StreamingUnary + + req, hasRequestBase, err := parseRequest(isStream, field, tree, structsCache, opts) if err != nil { return err } - hasRequestBase := false + resp, err := parseResponse(isStream, fn, tree, structsCache, opts) + if err != nil { + return err + } + + fnDsc := &descriptor.FunctionDescriptor{ + Name: fn.Name, + Oneway: fn.Oneway, + Request: req, + Response: resp, + HasRequestBase: hasRequestBase, + IsWithoutWrapping: isStream, + StreamingMode: mode, + } + defer func() { + if ret := recover(); ret != nil { + klog.Errorf("KITEX: router handle failed, err=%v\nstack=%s", ret, string(debug.Stack())) + err = fmt.Errorf("router handle failed, err=%v", ret) + } + }() + for _, ann := range fn.Annotations { + for _, v := range ann.GetValues() { + if handle, ok := descriptor.FindAnnotation(ann.GetKey(), v); ok { + if nr, ok := handle.(descriptor.NewRoute); ok { + sDsc.Router.Handle(nr(v, fnDsc)) + break + } + } + } + } + sDsc.Functions[fn.Name] = fnDsc + return nil +} + +func streamingMode(st *streaming.Streaming) serviceinfo.StreamingMode { + if st.BidirectionalStreaming { + return serviceinfo.StreamingBidirectional + } + if st.ClientStreaming { + return serviceinfo.StreamingClient + } + if st.ServerStreaming { + return serviceinfo.StreamingServer + } + if st.Unary { + return serviceinfo.StreamingUnary + } + return serviceinfo.StreamingNone +} + +func parseRequest(isStream bool, field *parser.Field, tree *parser.Thrift, structsCache map[string]*descriptor.TypeDescriptor, opts *parseOptions) (req *descriptor.TypeDescriptor, hasRequestBase bool, err error) { + reqType, err := parseType(field.Type, tree, structsCache, initRecursionDepth, opts) + if err != nil { + return nil, hasRequestBase, err + } if reqType.Type == descriptor.STRUCT { for _, f := range reqType.Struct.FieldsByName { if f.Type.IsRequestBase { @@ -175,34 +223,56 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv } } } + + if isStream { + return reqType, hasRequestBase, nil + } + + // wrap with a struct + wrappedTyDsc := &descriptor.TypeDescriptor{ + Type: descriptor.STRUCT, + Struct: &descriptor.StructDescriptor{ + FieldsByID: map[int32]*descriptor.FieldDescriptor{}, + FieldsByName: map[string]*descriptor.FieldDescriptor{}, + }, + } reqField := &descriptor.FieldDescriptor{ Name: field.Name, ID: field.ID, Type: reqType, GoTagOpt: opts.goTag, } - req.Struct.FieldsByID[field.ID] = reqField - req.Struct.FieldsByName[field.Name] = reqField - // parse response filed - resp := &descriptor.TypeDescriptor{ + wrappedTyDsc.Struct.FieldsByID[field.ID] = reqField + wrappedTyDsc.Struct.FieldsByName[field.Name] = reqField + + return wrappedTyDsc, hasRequestBase, nil +} + +func parseResponse(isStream bool, fn *parser.Function, tree *parser.Thrift, structsCache map[string]*descriptor.TypeDescriptor, opts *parseOptions) (*descriptor.TypeDescriptor, error) { + respType, err := parseType(fn.FunctionType, tree, structsCache, initRecursionDepth, opts) + if err != nil { + return nil, err + } + + if isStream { + return respType, nil + } + + // wrap with a struct + wrappedResp := &descriptor.TypeDescriptor{ Type: descriptor.STRUCT, Struct: &descriptor.StructDescriptor{ FieldsByID: map[int32]*descriptor.FieldDescriptor{}, FieldsByName: map[string]*descriptor.FieldDescriptor{}, }, } - var respType *descriptor.TypeDescriptor - respType, err = parseType(fn.FunctionType, tree, structsCache, initRecursionDepth, opts) - if err != nil { - return err - } respField := &descriptor.FieldDescriptor{ Type: respType, GoTagOpt: opts.goTag, } // response has no name or id - resp.Struct.FieldsByID[0] = respField - resp.Struct.FieldsByName[""] = respField + wrappedResp.Struct.FieldsByID[0] = respField + wrappedResp.Struct.FieldsByName[""] = respField if len(fn.Throws) > 0 { // only support single exception @@ -210,7 +280,7 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv var exceptionType *descriptor.TypeDescriptor exceptionType, err = parseType(field.Type, tree, structsCache, initRecursionDepth, opts) if err != nil { - return err + return nil, err } exceptionField := &descriptor.FieldDescriptor{ Name: field.Name, @@ -219,51 +289,10 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv Type: exceptionType, GoTagOpt: opts.goTag, } - resp.Struct.FieldsByID[field.ID] = exceptionField - resp.Struct.FieldsByName[field.Name] = exceptionField - } - fnDsc := &descriptor.FunctionDescriptor{ - Name: fn.Name, - Oneway: fn.Oneway, - Request: req, - Response: resp, - HasRequestBase: hasRequestBase, - StreamingMode: mode, - } - defer func() { - if ret := recover(); ret != nil { - klog.Errorf("KITEX: router handle failed, err=%v\nstack=%s", ret, string(debug.Stack())) - err = fmt.Errorf("router handle failed, err=%v", ret) - } - }() - for _, ann := range fn.Annotations { - for _, v := range ann.GetValues() { - if handle, ok := descriptor.FindAnnotation(ann.GetKey(), v); ok { - if nr, ok := handle.(descriptor.NewRoute); ok { - sDsc.Router.Handle(nr(v, fnDsc)) - break - } - } - } + wrappedResp.Struct.FieldsByID[field.ID] = exceptionField + wrappedResp.Struct.FieldsByName[field.Name] = exceptionField } - sDsc.Functions[fn.Name] = fnDsc - return nil -} - -func streamingMode(st *streaming.Streaming) serviceinfo.StreamingMode { - if st.BidirectionalStreaming { - return serviceinfo.StreamingBidirectional - } - if st.ClientStreaming { - return serviceinfo.StreamingClient - } - if st.ServerStreaming { - return serviceinfo.StreamingServer - } - if st.Unary { - return serviceinfo.StreamingUnary - } - return serviceinfo.StreamingNone + return wrappedResp, nil } // reuse builtin types diff --git a/pkg/remote/codec/grpc/grpc.go b/pkg/remote/codec/grpc/grpc.go index 5cbc94fc2a..ff78eac60b 100644 --- a/pkg/remote/codec/grpc/grpc.go +++ b/pkg/remote/codec/grpc/grpc.go @@ -23,8 +23,10 @@ import ( "fmt" "github.com/cloudwego/fastpb" + "github.com/cloudwego/gopkg/bufiox" "google.golang.org/protobuf/proto" + "github.com/cloudwego/kitex/internal/generic" "github.com/cloudwego/kitex/internal/utils/safemcache" "github.com/cloudwego/kitex/pkg/remote" "github.com/cloudwego/kitex/pkg/remote/codec/perrors" @@ -100,7 +102,24 @@ func (c *grpcCodec) Encode(ctx context.Context, message remote.Message, out remo switch message.ProtocolInfo().CodecType { case serviceinfo.Thrift: - payload, err = thrift.MarshalThriftData(ctx, c.ThriftCodec, message.Data()) + switch msg := message.Data().(type) { + case generic.ThriftWriter: + methodName := message.RPCInfo().Invocation().MethodName() + if methodName == "" { + return errors.New("empty methodName in grpc generic streaming Encode") + } + var bs []byte + bw := bufiox.NewBytesWriter(&bs) + if err = msg.Write(ctx, methodName, bw); err != nil { + return perrors.NewProtocolErrorWithErrMsg(err, fmt.Sprintf("generic thrift streaming marshal, Write failed: %s", err.Error())) + } + if err = bw.Flush(); err != nil { + return perrors.NewProtocolErrorWithErrMsg(err, fmt.Sprintf("generic thrift streaming marshal, Flush failed: %s", err.Error())) + } + payload = bs + default: + payload, err = thrift.MarshalThriftData(ctx, c.ThriftCodec, message.Data()) + } case serviceinfo.Protobuf: switch t := message.Data().(type) { case fastpb.Writer: @@ -187,7 +206,16 @@ func (c *grpcCodec) Decode(ctx context.Context, message remote.Message, in remot data := message.Data() switch message.ProtocolInfo().CodecType { case serviceinfo.Thrift: - return thrift.UnmarshalThriftData(ctx, c.ThriftCodec, "", d, message.Data()) + switch t := data.(type) { + case generic.ThriftReader: + methodName := message.RPCInfo().Invocation().MethodName() + if methodName == "" { + return errors.New("empty methodName in grpc Decode") + } + return t.Read(ctx, methodName, len(d), remote.NewReaderBuffer(d)) + default: + return thrift.UnmarshalThriftData(ctx, c.ThriftCodec, "", d, message.Data()) + } case serviceinfo.Protobuf: if t, ok := data.(fastpb.Reader); ok { if len(d) == 0 { diff --git a/pkg/remote/codec/thrift/thrift.go b/pkg/remote/codec/thrift/thrift.go index 5bc7ccd8be..96d9633705 100644 --- a/pkg/remote/codec/thrift/thrift.go +++ b/pkg/remote/codec/thrift/thrift.go @@ -24,6 +24,7 @@ import ( "github.com/cloudwego/gopkg/bufiox" "github.com/cloudwego/gopkg/protocol/thrift" + "github.com/cloudwego/kitex/internal/generic" "github.com/cloudwego/kitex/pkg/remote" "github.com/cloudwego/kitex/pkg/remote/codec" "github.com/cloudwego/kitex/pkg/remote/codec/perrors" @@ -144,7 +145,7 @@ func (c thriftCodec) Marshal(ctx context.Context, message remote.Message, out re } // generic call - if msg, ok := data.(genericWriter); ok { + if msg, ok := data.(generic.ThriftWriter); ok { return encodeGenericThrift(out, ctx, methodName, msgType, seqID, msg) } @@ -166,7 +167,7 @@ func (c thriftCodec) Marshal(ctx context.Context, message remote.Message, out re return errEncodeMismatchMsgType } -func encodeGenericThrift(out bufiox.Writer, ctx context.Context, method string, msgType remote.MessageType, seqID int32, msg genericWriter) error { +func encodeGenericThrift(out bufiox.Writer, ctx context.Context, method string, msgType remote.MessageType, seqID int32, msg generic.ThriftWriter) error { binaryWriter := thrift.NewBufferWriter(out) if err := binaryWriter.WriteMessageBegin(method, thrift.TMessageType(msgType), seqID); err != nil { return perrors.NewProtocolErrorWithErrMsg(err, fmt.Sprintf("thrift marshal, Write failed: %s", err.Error())) @@ -211,7 +212,7 @@ func (c thriftCodec) Unmarshal(ctx context.Context, message remote.Message, in r ri := message.RPCInfo() rpcinfo.Record(ctx, ri, stats.WaitReadStart, nil) - if msg, ok := data.(genericReader); ok { + if msg, ok := data.(generic.ThriftReader); ok { err = msg.Read(ctx, methodName, dataLen, in) if err != nil { err = remote.NewTransError(remote.ProtocolError, err) @@ -248,11 +249,3 @@ func validateMessageBeforeDecode(message remote.Message, seqID int32, methodName func (c thriftCodec) Name() string { return serviceinfo.Thrift.String() } - -type genericWriter interface { // used by pkg/generic - Write(ctx context.Context, method string, w bufiox.Writer) error -} - -type genericReader interface { // used by pkg/generic - Read(ctx context.Context, method string, dataLen int, r bufiox.Reader) error -}