Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Stop using api_core default timeouts in publish since they are broken #1326

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mukund-ananthu
Copy link
Contributor

@mukund-ananthu mukund-ananthu commented Jan 11, 2025

Change in behavior

60s initial deadline for RPCs reduced to 5s. Other timeout parameters changed changed:

ExponentialTimeout(
        initial=5, maximum=60, multiplier=1.3, deadline=600
    )

Change

Only non-test / non sample(i.e functional) usage found for PublisherOptions.timeout is here:

if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in
timeout = self.publisher_options.timeout

This change in usage is intended.

Call stack:

timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,

invokes one of the sequencers(ordered or unordered)

future = sequencer.publish(
wrapper=wrapper, retry=retry, timeout=timeout
)

which then create a batch with this timeout

return self._client._batch_class(
client=self._client,
topic=self._topic,
settings=self._client.batch_settings,
batch_done_callback=self._batch_done_callback,
commit_when_full=False,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

which uses it when committing the batch / publishing the messages:

response = self._client._gapic_publish(
topic=self._topic,
messages=[wrapper.message for wrapper in self._message_wrappers],
retry=self._commit_retry,
timeout=self._commit_timeout,
)

and invokes gapic_publish:

def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
"""Call the GAPIC public API directly."""
return super().publish(*args, **kwargs)

which then uses it in the RPC call:

response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

that is obtained by wrapping the transport base publish call:

rpc = self._transport._wrapped_methods[self._transport.publish]

The wrapper uses the timeout when invoking the GapicCallable:

https://github.com/googleapis/python-api-core/blob/b1fae31c8c71ed65ad22a415dab2b54720c3d4ba/google/api_core/gapic_v1/method.py#L245-L252

which currently overrides float and int timeouts to use TimeToDeadlineTimeout:

https://github.com/googleapis/python-api-core/blob/b1fae31c8c71ed65ad22a415dab2b54720c3d4ba/google/api_core/gapic_v1/method.py#L112-L113

which is incorrect(why its incorrect is mentioned in the attached bug)

Fixes #1325 🦕

@mukund-ananthu mukund-ananthu requested review from a team as code owners January 11, 2025 21:35
@mukund-ananthu mukund-ananthu self-assigned this Jan 11, 2025
@product-auto-label product-auto-label bot added size: xs Pull request size is extra small. api: pubsub Issues related to the googleapis/python-pubsub API. labels Jan 11, 2025
@product-auto-label product-auto-label bot added size: s Pull request size is small. and removed size: xs Pull request size is extra small. labels Jan 11, 2025
@mukund-ananthu mukund-ananthu force-pushed the timeout branch 2 times, most recently from ad458aa to c527c2e Compare January 11, 2025 22:10
@mukund-ananthu
Copy link
Contributor Author

mukund-ananthu commented Jan 11, 2025

@beltran PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. size: s Pull request size is small.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python api_core has a broken timeout logic. Put in an interim fix before they get to fixing it.
1 participant