diff --git a/internal/impl/gcp/output_pubsub.go b/internal/impl/gcp/output_pubsub.go index 0457ff474..8a4a4d8a4 100644 --- a/internal/impl/gcp/output_pubsub.go +++ b/internal/impl/gcp/output_pubsub.go @@ -300,7 +300,9 @@ func (out *pubsubOutput) Close(_ context.Context) error { out.clientCancel() } - return nil + err := out.client.Close() + out.client = nil + return err } func (out *pubsubOutput) writeMessage(ctx context.Context, cachedTopics map[string]pubsubTopic, msg *service.Message) (publishResult, error) { diff --git a/internal/impl/gcp/output_pubsub_test.go b/internal/impl/gcp/output_pubsub_test.go index bd53f5aef..c31b98234 100644 --- a/internal/impl/gcp/output_pubsub_test.go +++ b/internal/impl/gcp/output_pubsub_test.go @@ -49,6 +49,7 @@ func TestPubSubOutput(t *testing.T) { client.On("Topic", "test_foo").Return(fooTopic).Once() client.On("Topic", "test_bar").Return(barTopic).Once() + client.On("Close").Return(nil).Once() fooMsgA := service.NewMessage([]byte("foo_a")) fooResA := &mockPublishResult{} @@ -114,6 +115,7 @@ func TestPubSubOutput_MessageAttr(t *testing.T) { fooTopic.On("Publish", "foo", mock.AnythingOfType("*pubsub.Message")).Return(fooMsgA).Once() client.On("Topic", "test").Return(fooTopic).Once() + client.On("Close").Return(nil).Once() out, err := newPubSubOutput(conf) require.NoError(t, err, "failed to create output") @@ -169,6 +171,7 @@ func TestPubSubOutput_MissingTopic(t *testing.T) { client.On("Topic", "test_foo").Return(fooTopic).Once() client.On("Topic", "test_bar").Return(barTopic).Once() + client.On("Close").Return(nil).Once() out, err := newPubSubOutput(conf) require.NoError(t, err, "failed to create output") @@ -240,6 +243,7 @@ func TestPubSubOutput_PublishErrors(t *testing.T) { client.On("Topic", "test_foo").Return(fooTopic).Once() client.On("Topic", "test_bar").Return(barTopic).Once() + client.On("Close").Return(nil).Once() fooMsgA := service.NewMessage([]byte("foo_a")) fooResA := &mockPublishResult{} diff --git a/internal/impl/gcp/pubsub.go b/internal/impl/gcp/pubsub.go index 1cd39a46a..f32b3044c 100644 --- a/internal/impl/gcp/pubsub.go +++ b/internal/impl/gcp/pubsub.go @@ -20,8 +20,13 @@ import ( "cloud.google.com/go/pubsub" ) +var ( + _ pubsubClient = (*airGappedPubsubClient)(nil) +) + type pubsubClient interface { Topic(id string, settings *pubsub.PublishSettings) pubsubTopic + Close() error } type pubsubTopic interface { @@ -39,6 +44,10 @@ type airGappedPubsubClient struct { c *pubsub.Client } +func (ac *airGappedPubsubClient) Close() error { + return ac.c.Close() +} + func (ac *airGappedPubsubClient) Topic(id string, settings *pubsub.PublishSettings) pubsubTopic { t := ac.c.Topic(id) t.PublishSettings = *settings diff --git a/internal/impl/gcp/pubsub_mock_test.go b/internal/impl/gcp/pubsub_mock_test.go index 1efa8dd03..97045c952 100644 --- a/internal/impl/gcp/pubsub_mock_test.go +++ b/internal/impl/gcp/pubsub_mock_test.go @@ -27,6 +27,12 @@ type mockPubSubClient struct { var _ pubsubClient = &mockPubSubClient{} +func (c *mockPubSubClient) Close() error { + args := c.Called() + + return args.Error(0) +} + func (c *mockPubSubClient) Topic(id string, settings *pubsub.PublishSettings) pubsubTopic { args := c.Called(id)