Skip to content

Commit

Permalink
fix: wait wg for unary invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 20, 2024
1 parent 9edb312 commit 827c6ff
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions thrift_streaming/thrift_tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"time"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/kitex-tests/common"
"github.com/cloudwego/kitex-tests/pkg/test"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo/echoservice"
"github.com/cloudwego/kitex/client/streamclient"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/kerrors"
Expand All @@ -39,6 +35,11 @@ import (
"github.com/cloudwego/kitex/pkg/stats"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/server"

"github.com/cloudwego/kitex-tests/common"
"github.com/cloudwego/kitex-tests/pkg/test"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo/echoservice"
)

/*
Expand Down Expand Up @@ -114,12 +115,16 @@ func (t *testTracer) Finish(ctx context.Context) {
}

func (tr *testTracer) finishCheck(t *testing.T, info string) {
tr.wg.Wait()
test.Assert(t, tr.finishCalled, tr)
tr.finishCalledCheck(t)
test.Assert(t, tr.sendSize == tr.finishSendSize, tr.sendSize, tr.finishSendSize, info)
test.Assert(t, tr.recvSize == tr.finishRecvSize, tr.recvSize, tr.finishRecvSize, info)
}

func (tr *testTracer) finishCalledCheck(t *testing.T) {
tr.wg.Wait()
test.Assert(t, tr.finishCalled, tr)
}

var _ streaming.Stream = (*wrapStream)(nil)

type wrapStream struct {
Expand Down Expand Up @@ -205,6 +210,8 @@ func TestTracerNormalEndOfStream(t *testing.T) {
// save for server
test.Assert(t, serverTracer.recvSize == 0 && serverTracer.sendSize == 0, serverTracer)
test.Assert(t, serverTracer.finishSendSize > 0 && serverTracer.finishRecvSize > 0, serverTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

t.Run("bidirectional api", func(t *testing.T) {
Expand Down Expand Up @@ -330,7 +337,8 @@ func TestTracerNormalEndOfStream(t *testing.T) {
test.Assert(t, clientTracer.sendCount == count, clientTracer)
test.Assert(t, clientTracer.recvCount == 1, clientTracer)
// regardless of whether wrapped stream implements WithDoFinish, it's done within client.stream.RecvMsg
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

t.Run("client streaming with wrapped stream with DoFinish", func(t *testing.T) {
Expand Down Expand Up @@ -365,8 +373,8 @@ func TestTracerNormalEndOfStream(t *testing.T) {
test.Assert(t, err == nil, err)
test.Assert(t, clientTracer.sendCount == count, clientTracer)
test.Assert(t, clientTracer.recvCount == 1, clientTracer)
// wrapped stream doesn't implement WithDoFinish, so the DoFinish won't be called
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})
}

Expand Down Expand Up @@ -460,8 +468,8 @@ func TestTracingServerReturnError(t *testing.T) {
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, serverTracer.sendCount == 0, serverTracer)
test.Assert(t, serverTracer.recvCount == 0, serverTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
test.Assert(t, serverTracer.finishCalled, serverTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

t.Run("server", func(t *testing.T) {
Expand Down Expand Up @@ -545,8 +553,8 @@ func TestTracingServerReturnBizError(t *testing.T) {
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, serverTracer.sendCount == 0, serverTracer)
test.Assert(t, serverTracer.recvCount == 0, serverTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
test.Assert(t, serverTracer.finishCalled, serverTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

// Waiting for fix for streaming apis with biz error
Expand Down Expand Up @@ -635,7 +643,7 @@ func TestTracingClientTimeout(t *testing.T) {
// recv/send event is not reported to tracer for unary api
test.Assert(t, clientTracer.sendCount == 0, clientTracer)
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCalledCheck(t)
})

t.Run("server", func(t *testing.T) {
Expand Down Expand Up @@ -702,7 +710,7 @@ func TestTracingServerStop(t *testing.T) {
// recv/send event is not reported to tracer for unary api
test.Assert(t, clientTracer.sendCount == 0, clientTracer)
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCalledCheck(t)
})

t.Run("server", func(t *testing.T) {
Expand Down

0 comments on commit 827c6ff

Please sign in to comment.