From 82d69cac913be3ef0636cf2253c475c5dd8e746b Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Sat, 11 Jan 2025 21:32:51 +0000 Subject: [PATCH] fix: Stop using api_core default timeouts in publish since they are broken --- google/cloud/pubsub_v1/types.py | 8 +++++++- .../pubsub_v1/publisher/test_publisher_client.py | 13 ++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 7e94a7250..11ca6008a 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -35,6 +35,7 @@ from google.protobuf import timestamp_pb2 from google.api_core.protobuf_helpers import get_messages +from google.api_core.timeout import ExponentialTimeout from google.pubsub_v1.types import pubsub as pubsub_gapic_types @@ -191,7 +192,12 @@ class PublisherOptions(NamedTuple): "an instance of :class:`google.api_core.retry.Retry`." ) - timeout: "OptionalTimeout" = gapic_v1.method.DEFAULT # use api_core default + # Use ExponentialTimeout instead of api_core default because the default + # value results in retries with zero deadline. + # Refer https://github.com/googleapis/python-api-core/issues/654 + timeout: "OptionalTimeout" = ExponentialTimeout( + initial=5, maximum=60, multiplier=1.3, deadline=600 + ) ( "Timeout settings for message publishing by the client. It should be " "compatible with :class:`~.pubsub_v1.types.TimeoutType`." diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 55198b590..fa08f4f1d 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -19,6 +19,7 @@ import sys import grpc +import math # special case python < 3.8 if sys.version_info.major == 3 and sys.version_info.minor < 8: @@ -35,6 +36,7 @@ from google.api_core import gapic_v1 from google.api_core import retry as retries from google.api_core.gapic_v1.client_info import METRICS_METADATA_KEY +from google.api_core.timeout import ExponentialTimeout from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types @@ -652,6 +654,8 @@ def test_publish_new_batch_needed(creds): future = client.publish(topic, b"foo", bar=b"baz") assert future is mock.sentinel.future + call_args = batch_class.call_args + # Check the mocks. batch_class.assert_called_once_with( client=mock.ANY, @@ -660,8 +664,15 @@ def test_publish_new_batch_needed(creds): batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, - commit_timeout=gapic_v1.method.DEFAULT, + commit_timeout=mock.ANY, ) + commit_timeout_arg = call_args[1]["commit_timeout"] + assert isinstance(commit_timeout_arg, ExponentialTimeout) + assert math.isclose(commit_timeout_arg._initial, 5) is True + assert math.isclose(commit_timeout_arg._maximum, 60) is True + assert math.isclose(commit_timeout_arg._multiplier, 1.3) is True + assert math.isclose(commit_timeout_arg._deadline, 600) is True + message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) wrapper = PublishMessageWrapper(message=message_pb) batch1.publish.assert_called_once_with(wrapper)