Skip to content

Commit

Permalink
feat(generic): support thrift streaming for json generic client (#1467)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina-Sakai authored Nov 18, 2024
1 parent 85107b7 commit 8e9df9a
Show file tree
Hide file tree
Showing 24 changed files with 3,532 additions and 295 deletions.
5 changes: 3 additions & 2 deletions client/genericclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ type genericServiceClient struct {

func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error) {
ctx = client.NewCtxWithCallOptions(ctx, callOptions)
_args := gc.svcInfo.MethodInfo(method).NewArgs().(*generic.Args)
mtInfo := gc.svcInfo.MethodInfo(method)
_args := mtInfo.NewArgs().(*generic.Args)
_args.Method = method
_args.Request = request

Expand All @@ -106,7 +107,7 @@ func (gc *genericServiceClient) GenericCall(ctx context.Context, method string,
return nil, gc.kClient.Call(ctx, mt.Name, _args, nil)
}

_result := gc.svcInfo.MethodInfo(method).NewResult().(*generic.Result)
_result := mtInfo.NewResult().(*generic.Result)
if err = gc.kClient.Call(ctx, mt.Name, _args, _result); err != nil {
return
}
Expand Down
6 changes: 4 additions & 2 deletions client/genericclient/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

// NOTE: this is a temporary adjustment for ci check. remove it after fully completing the generic streaming support

var (
_ clientStreaming = nil
_ serverStreaming = nil
Expand Down Expand Up @@ -162,8 +163,9 @@ func newServerStreaming(ctx context.Context, genericCli Client, method string, r
if err != nil {
return nil, err
}
ss := &serverStreamingClient{stream, gCli.svcInfo.MethodInfo(method)}
_args := gCli.svcInfo.MethodInfo(method).NewArgs().(*generic.Args)
mtInfo := gCli.svcInfo.MethodInfo(method)
ss := &serverStreamingClient{stream, mtInfo}
_args := mtInfo.NewArgs().(*generic.Args)
_args.Method = method
_args.Request = req
if err = ss.Stream.SendMsg(_args); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/bytedance/gopkg v0.1.1
github.com/bytedance/sonic v1.12.2
github.com/cloudwego/configmanager v0.2.2
github.com/cloudwego/dynamicgo v0.4.4
github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128
github.com/cloudwego/fastpb v0.0.5
github.com/cloudwego/frugal v0.2.0
github.com/cloudwego/gopkg v0.1.2
Expand All @@ -17,7 +17,6 @@ require (
github.com/golang/mock v1.6.0
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8
github.com/jhump/protoreflect v1.8.2
github.com/json-iterator/go v1.1.12
github.com/tidwall/gjson v1.17.3
golang.org/x/net v0.24.0
golang.org/x/sync v0.8.0
Expand All @@ -38,7 +37,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/configmanager v0.2.2 h1:sVrJB8gWYTlPV2OS3wcgJSO9F2/9Zbkmcm1Z7jempOU=
github.com/cloudwego/configmanager v0.2.2/go.mod h1:ppiyU+5TPLonE8qMVi/pFQk2eL3Q4P7d4hbiNJn6jwI=
github.com/cloudwego/dynamicgo v0.4.4 h1:RuHhjy44Ajy2PLjrwOhI9EY874t9srhgwd/rkKTUKfQ=
github.com/cloudwego/dynamicgo v0.4.4/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY=
github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128 h1:StnQNfU+fb3PavTNydupCP/8Ges3/DDYRjL8HmLwOMI=
github.com/cloudwego/dynamicgo v0.4.6-0.20241115162834-0e99bc39b128/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY=
github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZU=
github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk=
github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8=
Expand Down Expand Up @@ -71,7 +71,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
Expand All @@ -81,8 +80,6 @@ github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHL
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/jhump/protoreflect v1.8.2 h1:k2xE7wcUomeqwY0LDCYA16y4WWfyTcMx5mKhk0d4ua0=
github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
Expand All @@ -93,8 +90,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 h1:uiS4zKYKJVj5F3ID+5iylfKPsEQmBEOucSD9Vgmn0i0=
github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5/go.mod h1:I8AX+yW//L8Hshx6+a1m3bYkwXkpsVjA2795vP4f4oQ=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
Expand All @@ -107,7 +102,6 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
199 changes: 199 additions & 0 deletions internal/generic/generic_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package generic

import (
"context"
"fmt"

"github.com/cloudwego/gopkg/bufiox"
"github.com/cloudwego/gopkg/protocol/thrift/base"

"github.com/cloudwego/kitex/pkg/generic/proto"
"github.com/cloudwego/kitex/pkg/generic/thrift"
codecProto "github.com/cloudwego/kitex/pkg/remote/codec/protobuf"
)

// Args generic request
type Args struct {
Request interface{}
Method string
base *base.Base
inner interface{}
}

var (
_ codecProto.MessageWriterWithContext = (*Args)(nil)
_ codecProto.MessageReaderWithMethodWithContext = (*Args)(nil)
)

func (g *Args) SetCodec(inner interface{}) {
g.inner = inner
}

func (g *Args) GetOrSetBase() interface{} {
if g.base == nil {
g.base = base.NewBase()
}
return g.base
}

// Write ...
func (g *Args) Write(ctx context.Context, method string, out bufiox.Writer) error {
if err, ok := g.inner.(error); ok {
return err
}
if w, ok := g.inner.(thrift.MessageWriter); ok {
return w.Write(ctx, out, g.Request, method, true, g.base)
}
return fmt.Errorf("unexpected Args writer type: %T", g.inner)
}

func (g *Args) WritePb(ctx context.Context, method string) (interface{}, error) {
if err, ok := g.inner.(error); ok {
return nil, err
}
if w, ok := g.inner.(proto.MessageWriter); ok {
return w.Write(ctx, g.Request, method, true)
}
return nil, fmt.Errorf("unexpected Args writer type: %T", g.inner)
}

// Read ...
func (g *Args) Read(ctx context.Context, method string, dataLen int, in bufiox.Reader) error {
if err, ok := g.inner.(error); ok {
return err
}
if rw, ok := g.inner.(thrift.MessageReader); ok {
g.Method = method
var err error
g.Request, err = rw.Read(ctx, method, false, dataLen, in)
return err
}
return fmt.Errorf("unexpected Args reader type: %T", g.inner)
}

func (g *Args) ReadPb(ctx context.Context, method string, in []byte) error {
if err, ok := g.inner.(error); ok {
return err
}
if w, ok := g.inner.(proto.MessageReader); ok {
g.Method = method
var err error
g.Request, err = w.Read(ctx, method, false, in)
return err
}
return fmt.Errorf("unexpected Args reader type: %T", g.inner)
}

// GetFirstArgument implements util.KitexArgs.
func (g *Args) GetFirstArgument() interface{} {
return g.Request
}

// Result generic response
type Result struct {
Success interface{}
inner interface{}
}

var (
_ codecProto.MessageWriterWithContext = (*Result)(nil)
_ codecProto.MessageReaderWithMethodWithContext = (*Result)(nil)
)

// SetCodec ...
func (r *Result) SetCodec(inner interface{}) {
r.inner = inner
}

// Write ...
func (r *Result) Write(ctx context.Context, method string, out bufiox.Writer) error {
if err, ok := r.inner.(error); ok {
return err
}
if w, ok := r.inner.(thrift.MessageWriter); ok {
return w.Write(ctx, out, r.Success, method, false, nil)
}
return fmt.Errorf("unexpected Result writer type: %T", r.inner)
}

func (r *Result) WritePb(ctx context.Context, method string) (interface{}, error) {
if err, ok := r.inner.(error); ok {
return nil, err
}
if w, ok := r.inner.(proto.MessageWriter); ok {
return w.Write(ctx, r.Success, method, false)
}
return nil, fmt.Errorf("unexpected Result writer type: %T", r.inner)
}

// Read ...
func (r *Result) Read(ctx context.Context, method string, dataLen int, in bufiox.Reader) error {
if err, ok := r.inner.(error); ok {
return err
}
if w, ok := r.inner.(thrift.MessageReader); ok {
var err error
r.Success, err = w.Read(ctx, method, true, dataLen, in)
return err
}
return fmt.Errorf("unexpected Result reader type: %T", r.inner)
}

func (r *Result) ReadPb(ctx context.Context, method string, in []byte) error {
if err, ok := r.inner.(error); ok {
return err
}
if w, ok := r.inner.(proto.MessageReader); ok {
var err error
r.Success, err = w.Read(ctx, method, true, in)
return err
}
return fmt.Errorf("unexpected Result reader type: %T", r.inner)
}

// GetSuccess implements util.KitexResult.
func (r *Result) GetSuccess() interface{} {
if !r.IsSetSuccess() {
return nil
}
return r.Success
}

// SetSuccess implements util.KitexResult.
func (r *Result) SetSuccess(x interface{}) {
r.Success = x
}

// IsSetSuccess ...
func (r *Result) IsSetSuccess() bool {
return r.Success != nil
}

// GetResult ...
func (r *Result) GetResult() interface{} {
return r.Success
}

type ThriftWriter interface {
Write(ctx context.Context, method string, w bufiox.Writer) error
}

type ThriftReader interface {
Read(ctx context.Context, method string, dataLen int, r bufiox.Reader) error
}
4 changes: 4 additions & 0 deletions internal/mocks/thrift/gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

kitex -thrift no_default_serdes -module github.com/cloudwego/kitex -gen-path .. ./test.thrift

kitex -thrift no_default_serdes -module github.com/cloudwego/kitex -gen-path .. ./stream.thrift

rm -rf ./mock # not in use, rm it

rm -rf ./testservice # not in use, rm it
Loading

0 comments on commit 8e9df9a

Please sign in to comment.