From ad458aad7f96f34beb846e2a6bb883942f5b7eb3 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Sat, 11 Jan 2025 21:32:51 +0000 Subject: [PATCH] fix: Stop using api_core default timeouts in publish since they are broken --- google/cloud/pubsub_v1/types.py | 8 +++++++- .../pubsub_v1/publisher/test_publisher_client.py | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 7e94a7250..11ca6008a 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -35,6 +35,7 @@ from google.protobuf import timestamp_pb2 from google.api_core.protobuf_helpers import get_messages +from google.api_core.timeout import ExponentialTimeout from google.pubsub_v1.types import pubsub as pubsub_gapic_types @@ -191,7 +192,12 @@ class PublisherOptions(NamedTuple): "an instance of :class:`google.api_core.retry.Retry`." ) - timeout: "OptionalTimeout" = gapic_v1.method.DEFAULT # use api_core default + # Use ExponentialTimeout instead of api_core default because the default + # value results in retries with zero deadline. + # Refer https://github.com/googleapis/python-api-core/issues/654 + timeout: "OptionalTimeout" = ExponentialTimeout( + initial=5, maximum=60, multiplier=1.3, deadline=600 + ) ( "Timeout settings for message publishing by the client. It should be " "compatible with :class:`~.pubsub_v1.types.TimeoutType`." diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 55198b590..2905714c1 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -35,6 +35,7 @@ from google.api_core import gapic_v1 from google.api_core import retry as retries from google.api_core.gapic_v1.client_info import METRICS_METADATA_KEY +from google.api_core.timeout import ExponentialTimeout from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types @@ -646,12 +647,15 @@ def test_publish_new_batch_needed(creds): # Actually mock the batch class now. batch_class = mock.Mock(spec=(), return_value=batch2) + client._set_batch_class(batch_class) # Publish a message. future = client.publish(topic, b"foo", bar=b"baz") assert future is mock.sentinel.future + call_args = batch_class.call_args + # Check the mocks. batch_class.assert_called_once_with( client=mock.ANY, @@ -660,8 +664,17 @@ def test_publish_new_batch_needed(creds): batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, - commit_timeout=gapic_v1.method.DEFAULT, + commit_timeout=mock.ANY, ) + commit_timeout_arg = call_args[1]["commit_timeout"] + assert isinstance(commit_timeout_arg, ExponentialTimeout) + assert commit_timeout_arg.initial == 5 + assert commit_timeout_arg.maximum == 60 + assert commit_timeout_arg.multiplier == 1.3 + assert commit_timeout_arg.deadline == 600 + # ExponentialTimeout( + # initial=5, maximum=60, multiplier=1.3, deadline=600 + # ), message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) wrapper = PublishMessageWrapper(message=message_pb) batch1.publish.assert_called_once_with(wrapper)