Skip to content

Commit

Permalink
fix: refactor client classes for safer type checking (#552)
Browse files Browse the repository at this point in the history
* Directly subclass generated PublisherClient

* Directly subclass generated SubscriberClient

* Remove unneded GAPIC helper

* Remove pytype

There are no real advantages over mypy, but at the same time several
downsides such as being slow, producing more false positives, etc.

* Re-enable mypy_samples nox session

* Convert a comment to docstring in publisher client

* Add api property back, but deprecated

* Assure that mypy_samples is not commented out

* Remove redundant type hint casts

* Disable mypy_samples session until blockers resolved
  • Loading branch information
plamut authored Jan 19, 2022
1 parent 7fd7694 commit 7f705be
Show file tree
Hide file tree
Showing 19 changed files with 180 additions and 325 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pip-log.txt
.nox
.cache
.pytest_cache
.pytype


# Mac
Expand Down
74 changes: 0 additions & 74 deletions google/cloud/pubsub_v1/_gapic.py

This file was deleted.

2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def _commit(self) -> None:
batch_transport_succeeded = True
try:
# Performs retries for errors defined by the retry configuration.
response = self._client.api.publish(
response = self._client._gapic_publish(
topic=self._topic,
messages=self._messages,
retry=self._commit_retry,
Expand Down
47 changes: 33 additions & 14 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import time
import typing
from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
import warnings

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import futures
Expand All @@ -49,15 +49,11 @@
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.publisher import _batch
from google.pubsub_v1.services.publisher.client import OptionalRetry
from google.pubsub_v1.types import pubsub as pubsub_types


_LOGGER = logging.getLogger(__name__)

_DENYLISTED_METHODS = (
"publish",
"from_service_account_file",
"from_service_account_json",
)

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()

Expand All @@ -66,8 +62,7 @@
]


@_gapic.add_methods(publisher_client.PublisherClient, denylist=_DENYLISTED_METHODS)
class Client(object):
class Client(publisher_client.PublisherClient):
"""A publisher client for Google Cloud Pub/Sub.
This creates an object that is capable of publishing messages.
Expand Down Expand Up @@ -146,8 +141,8 @@ def __init__(

# Add the metrics headers, and instantiate the underlying GAPIC
# client.
self.api = publisher_client.PublisherClient(**kwargs)
self._target = self.api._transport._host
super().__init__(**kwargs)
self._target = self._transport._host
self._batch_class = thread.Batch
self.batch_settings = types.BatchSettings(*batch_settings)

Expand All @@ -164,7 +159,7 @@ def __init__(
self._flow_controller = FlowController(self.publisher_options.flow_control)

@classmethod
def from_service_account_file(
def from_service_account_file( # type: ignore[override]
cls,
filename: str,
batch_settings: Union[types.BatchSettings, Sequence] = (),
Expand All @@ -188,7 +183,7 @@ def from_service_account_file(
kwargs["credentials"] = credentials
return cls(batch_settings, **kwargs)

from_service_account_json = from_service_account_file
from_service_account_json = from_service_account_file # type: ignore[assignment]

@property
def target(self) -> str:
Expand All @@ -199,6 +194,26 @@ def target(self) -> str:
"""
return self._target

@property
def api(self):
"""The underlying gapic API client.
.. versionchanged:: 2.10.0
Instead of a GAPIC ``PublisherClient`` client instance, this property is a
proxy object to it with the same interface.
.. deprecated:: 2.10.0
Use the GAPIC methods and properties on the client instance directly
instead of through the :attr:`api` attribute.
"""
msg = (
'The "api" property only exists for backward compatibility, access its '
'attributes directly thorugh the client instance (e.g. "client.foo" '
'instead of "client.api.foo").'
)
warnings.warn(msg, category=DeprecationWarning)
return super()

def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType:
""" Get an existing sequencer or create a new one given the (topic,
ordering_key) pair.
Expand Down Expand Up @@ -252,7 +267,11 @@ def resume_publish(self, topic: str, ordering_key: str) -> None:
else:
sequencer.unpause()

def publish(
def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
"""Call the GAPIC public API directly."""
return super().publish(*args, **kwargs)

def publish( # type: ignore[override]
self,
topic: str,
data: bytes,
Expand Down Expand Up @@ -382,7 +401,7 @@ def on_publish_done(future):
if self._enable_message_ordering:
if retry is gapic_v1.method.DEFAULT:
# use the default retry for the publish GRPC method as a base
transport = self.api._transport
transport = self._transport
base_retry = transport._wrapped_methods[transport.publish]._retry
retry = base_retry.with_deadline(2.0 ** 32)
else:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/flow_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
_LOGGER = logging.getLogger(__name__)


MessageType = Type[types.PubsubMessage] # type: ignore # pytype: disable=module-attr
MessageType = Type[types.PubsubMessage] # type: ignore


class _QuantityReservation:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def message_count(self) -> int:
return len(self._leased_messages)

@property
def ack_ids(self) -> KeysView[str]: # pytype: disable=invalid-annotation
def ack_ids(self) -> KeysView[str]:
"""The ack IDs of all leased messages."""
return self._leased_messages.keys()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def open(
self._get_initial_request, stream_ack_deadline_seconds
)
self._rpc = bidi.ResumableBidiRpc(
start_rpc=self._client.api.streaming_pull,
start_rpc=self._client.streaming_pull,
initial_request=get_initial_request,
should_recover=self._should_recover,
should_terminate=self._should_terminate,
Expand All @@ -548,14 +548,11 @@ def open(

# Create references to threads
assert self._scheduler is not None
# pytype: disable=wrong-arg-types
# (pytype incorrectly complains about "self" not being the right argument type)
scheduler_queue = self._scheduler.queue
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
self._leaser = leaser.Leaser(self)
self._heartbeater = heartbeater.Heartbeater(self)
# pytype: enable=wrong-arg-types

# Start the thread to pass the requests.
self._dispatcher.start()
Expand Down
48 changes: 29 additions & 19 deletions google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import pkg_resources
import typing
from typing import cast, Any, Callable, Optional, Sequence, Union
import warnings

from google.auth.credentials import AnonymousCredentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import futures
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
Expand All @@ -42,15 +42,8 @@
# a PIP package.
__version__ = "0.0"

_DENYLISTED_METHODS = (
"publish",
"from_service_account_file",
"from_service_account_json",
)


@_gapic.add_methods(subscriber_client.SubscriberClient, denylist=_DENYLISTED_METHODS)
class Client(object):
class Client(subscriber_client.SubscriberClient):
"""A subscriber client for Google Cloud Pub/Sub.
This creates an object that is capable of subscribing to messages.
Expand Down Expand Up @@ -91,12 +84,14 @@ def __init__(self, **kwargs: Any):
kwargs["credentials"] = AnonymousCredentials()

# Instantiate the underlying GAPIC client.
self._api = subscriber_client.SubscriberClient(**kwargs)
self._target = self._api._transport._host
super().__init__(**kwargs)
self._target = self._transport._host
self._closed = False

@classmethod
def from_service_account_file(cls, filename: str, **kwargs: Any) -> "Client":
def from_service_account_file( # type: ignore[override]
cls, filename: str, **kwargs: Any
) -> "Client":
"""Creates an instance of this client using the provided credentials
file.
Expand All @@ -112,7 +107,7 @@ def from_service_account_file(cls, filename: str, **kwargs: Any) -> "Client":
kwargs["credentials"] = credentials
return cls(**kwargs)

from_service_account_json = from_service_account_file
from_service_account_json = from_service_account_file # type: ignore[assignment]

@property
def target(self) -> str:
Expand All @@ -123,11 +118,6 @@ def target(self) -> str:
"""
return self._target

@property
def api(self) -> subscriber_client.SubscriberClient:
"""The underlying gapic API client."""
return self._api

@property
def closed(self) -> bool:
"""Return whether the client has been closed and cannot be used anymore.
Expand All @@ -136,6 +126,26 @@ def closed(self) -> bool:
"""
return self._closed

@property
def api(self):
"""The underlying gapic API client.
.. versionchanged:: 2.10.0
Instead of a GAPIC ``SubscriberClient`` client instance, this property is a
proxy object to it with the same interface.
.. deprecated:: 2.10.0
Use the GAPIC methods and properties on the client instance directly
instead of through the :attr:`api` attribute.
"""
msg = (
'The "api" property only exists for backward compatibility, access its '
'attributes directly thorugh the client instance (e.g. "client.foo" '
'instead of "client.api.foo").'
)
warnings.warn(msg, category=DeprecationWarning)
return super()

def subscribe(
self,
subscription: str,
Expand Down Expand Up @@ -266,7 +276,7 @@ def close(self) -> None:
This method is idempotent.
"""
transport = cast("SubscriberGrpcTransport", self.api._transport)
transport = cast("SubscriberGrpcTransport", self._transport)
transport.grpc_channel.close()
self._closed = True

Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Message(object):
The time that this message was originally published.
"""

def __init__( # pytype: disable=module-attr
def __init__(
self,
message: "types.PubsubMessage._meta._pb", # type: ignore
ack_id: str,
Expand Down
2 changes: 0 additions & 2 deletions google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,11 @@ class PublisherOptions(NamedTuple):
"an instance of :class:`google.api_core.retry.Retry`."
)

# pytype: disable=invalid-annotation
timeout: "OptionalTimeout" = gapic_v1.method.DEFAULT # use api_core default
(
"Timeout settings for message publishing by the client. It should be "
"compatible with :class:`~.pubsub_v1.types.TimeoutType`."
)
# pytype: enable=invalid-annotation


# Define the type class and default values for flow control settings.
Expand Down
Loading

0 comments on commit 7f705be

Please sign in to comment.