Skip to content

Commit

Permalink
Merge pull request #589 from hello-wn/feat-bus
Browse files Browse the repository at this point in the history
Feat bus: add health check method
  • Loading branch information
nicholasxuu authored Sep 26, 2024
2 parents 1e8217c + 20e4031 commit 2ceecd6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
19 changes: 19 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# 1.2.3 (2024-9-26)

- 新增 busext 的 HealthCheck 方法

GRPC 和 OpenAPI 的 health check 检查时,可以添加检查 BusExt。

```go
func (s *myServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
if req.Service == "liveness" || req.Service == "readiness" {
if app.BusExt != nil {
if err := app.BusExt.HealthCheck(); err != nil {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, nil
}
```
# 1.2.2 (2024-7-23)
- 对 otelsql 设置 `DisableErrSkip: true` 以忽略 ErrSkip
Expand Down
15 changes: 15 additions & 0 deletions extensions/busext/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
defaultPrefetch = 100
defaultPublishRetry = 3
defaultPushTimeout = "5s"
healthCheckRetryTimes = 10
healthCheckRetryInterval = 1 * time.Second
)

type customLoggerInterface interface {
Expand Down Expand Up @@ -499,3 +501,16 @@ func setDefaultConfig(v *viper.Viper) {
v.SetDefault("reinit_delay", defaultReinitDelay)
v.SetDefault("push_timeout", defaultPushTimeout)
}

func (b *BusExt) HealthCheck() error {
if b.mocked {
return nil
}
for i := 0; i < healthCheckRetryTimes; i++ {
if b.isReady {
return nil
}
time.Sleep(healthCheckRetryInterval)
}
return ErrNotReady
}
6 changes: 5 additions & 1 deletion extensions/busext/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func init() {
}

func TestPushConsume(t *testing.T) {
// health check
err := bus.HealthCheck()
assert.Nil(t, err)

// publish
routingKey := "gobay.buses.test"
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -99,7 +103,7 @@ func TestPushConsume(t *testing.T) {
},
)
// case-1: 超时后会结束本次 push 并返回 errTimeout error, 并且尝试重连
err := bus.Push("sbay-exchange", routingKey, *msg)
err = bus.Push("sbay-exchange", routingKey, *msg)
assert.NotNil(t, err)
assert.Equal(t, ErrTimeout, err)

Expand Down

0 comments on commit 2ceecd6

Please sign in to comment.