Skip to content

Commit

Permalink
Properly close PubSub Client
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasBergmeier6176 committed Oct 25, 2024
1 parent 5b508db commit 54afb8d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 1 deletion.
4 changes: 3 additions & 1 deletion internal/impl/gcp/output_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ func (out *pubsubOutput) Close(_ context.Context) error {
out.clientCancel()
}

return nil
err := out.client.Close()
out.client = nil
return err
}

func (out *pubsubOutput) writeMessage(ctx context.Context, cachedTopics map[string]pubsubTopic, msg *service.Message) (publishResult, error) {
Expand Down
4 changes: 4 additions & 0 deletions internal/impl/gcp/output_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestPubSubOutput(t *testing.T) {

client.On("Topic", "test_foo").Return(fooTopic).Once()
client.On("Topic", "test_bar").Return(barTopic).Once()
client.On("Close").Return(nil).Once()

fooMsgA := service.NewMessage([]byte("foo_a"))
fooResA := &mockPublishResult{}
Expand Down Expand Up @@ -114,6 +115,7 @@ func TestPubSubOutput_MessageAttr(t *testing.T) {
fooTopic.On("Publish", "foo", mock.AnythingOfType("*pubsub.Message")).Return(fooMsgA).Once()

client.On("Topic", "test").Return(fooTopic).Once()
client.On("Close").Return(nil).Once()

out, err := newPubSubOutput(conf)
require.NoError(t, err, "failed to create output")
Expand Down Expand Up @@ -169,6 +171,7 @@ func TestPubSubOutput_MissingTopic(t *testing.T) {

client.On("Topic", "test_foo").Return(fooTopic).Once()
client.On("Topic", "test_bar").Return(barTopic).Once()
client.On("Close").Return(nil).Once()

out, err := newPubSubOutput(conf)
require.NoError(t, err, "failed to create output")
Expand Down Expand Up @@ -240,6 +243,7 @@ func TestPubSubOutput_PublishErrors(t *testing.T) {

client.On("Topic", "test_foo").Return(fooTopic).Once()
client.On("Topic", "test_bar").Return(barTopic).Once()
client.On("Close").Return(nil).Once()

fooMsgA := service.NewMessage([]byte("foo_a"))
fooResA := &mockPublishResult{}
Expand Down
9 changes: 9 additions & 0 deletions internal/impl/gcp/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ import (
"cloud.google.com/go/pubsub"
)

var (
_ pubsubClient = (*airGappedPubsubClient)(nil)
)

type pubsubClient interface {
Topic(id string, settings *pubsub.PublishSettings) pubsubTopic
Close() error
}

type pubsubTopic interface {
Expand All @@ -39,6 +44,10 @@ type airGappedPubsubClient struct {
c *pubsub.Client
}

func (ac *airGappedPubsubClient) Close() error {
return ac.c.Close()
}

func (ac *airGappedPubsubClient) Topic(id string, settings *pubsub.PublishSettings) pubsubTopic {
t := ac.c.Topic(id)
t.PublishSettings = *settings
Expand Down
6 changes: 6 additions & 0 deletions internal/impl/gcp/pubsub_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ type mockPubSubClient struct {

var _ pubsubClient = &mockPubSubClient{}

func (c *mockPubSubClient) Close() error {
args := c.Called()

return args.Error(0)
}

func (c *mockPubSubClient) Topic(id string, settings *pubsub.PublishSettings) pubsubTopic {
args := c.Called(id)

Expand Down

0 comments on commit 54afb8d

Please sign in to comment.