Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mypy: check untyped defs #396

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions nats/aio/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ def __init__(
self._pending_queue: asyncio.Queue[Msg] = asyncio.Queue(
maxsize=pending_msgs_limit
)
self._pending_size = 0
self._wait_for_msgs_task = None
self._message_iterator = None
self._pending_size: int = 0
self._wait_for_msgs_task: asyncio.Task | None = None
orsinium marked this conversation as resolved.
Show resolved Hide resolved
self._message_iterator: _SubscriptionMessageIterator | None = None

# For JetStream enabled subscriptions.
self._jsi: JetStreamContext._JSI | None = None
Expand Down Expand Up @@ -182,8 +182,10 @@ def _start(self, error_cb):
Creates the resources for the subscription to start processing messages.
"""
if self._cb:
if not asyncio.iscoroutinefunction(self._cb) and \
not (hasattr(self._cb, "func") and asyncio.iscoroutinefunction(self._cb.func)):
# TODO(@orsinium): What's this check? Does `self._cb` ever have `func`?
wrapped = getattr(self._cb, "func", None)
if (not asyncio.iscoroutinefunction(self._cb)
and not asyncio.iscoroutinefunction(wrapped)):
raise errors.Error(
"nats: must use coroutine for subscriptions"
)
Expand Down Expand Up @@ -302,7 +304,8 @@ async def _wait_for_msgs(self, error_cb) -> None:
self._pending_queue.task_done()

# Apply auto unsubscribe checks after having processed last msg.
if self._max_msgs > 0 and self._received >= self._max_msgs and self._pending_queue.empty:
if self._max_msgs > 0 and self._received >= self._max_msgs and self._pending_queue.empty(
):
orsinium marked this conversation as resolved.
Show resolved Hide resolved
self._stop_processing()

except asyncio.CancelledError:
Expand Down
22 changes: 15 additions & 7 deletions nats/js/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

if TYPE_CHECKING:
from nats.js import JetStreamContext
from nats.aio.msg import Msg

KV_OP = "KV-Operation"
KV_DEL = "DEL"
Expand Down Expand Up @@ -73,7 +74,7 @@ class Entry:
value: bytes | None
revision: int | None
delta: int | None
created: int | None
created: datetime.datetime | None
operation: str | None

@dataclass(frozen=True)
Expand Down Expand Up @@ -294,12 +295,17 @@ async def status(self) -> BucketStatus:
return KeyValue.BucketStatus(stream_info=info, bucket=self._name)

class KeyWatcher:
_js: JetStreamContext
_updates: asyncio.Queue[KeyValue.Entry | None]
_sub: JetStreamContext.PushSubscription | None
_pending: int | None
_init_done: bool

def __init__(self, js):
def __init__(self, js: JetStreamContext):
self._js = js
self._updates = asyncio.Queue(maxsize=256)
self._sub = None
self._pending: int | None = None
self._pending = None

# init done means that the nil marker has been sent,
# once this is sent it won't be sent anymore.
Expand All @@ -309,7 +315,8 @@ async def stop(self):
"""
stop will stop this watcher.
"""
await self._sub.unsubscribe()
if self._sub is not None:
await self._sub.unsubscribe()

async def updates(self, timeout=5):
"""
Expand Down Expand Up @@ -394,10 +401,10 @@ async def watch(
there are no pending updates.
"""
subject = f"{self._pre}{keys}"
watcher = KeyValue.KeyWatcher(self)
watcher = KeyValue.KeyWatcher(self._js)
init_setup: asyncio.Future[bool] = asyncio.Future()

async def watch_updates(msg):
async def watch_updates(msg: Msg) -> None:
if not init_setup.done():
await asyncio.wait_for(init_setup, timeout=self._js._timeout)

Expand All @@ -418,7 +425,8 @@ async def watch_updates(msg):
bucket=self._name,
key=msg.subject[len(self._pre):],
value=msg.data,
revision=meta.sequence.stream,
revision=meta.sequence.stream
if meta.sequence is not None else None,
delta=meta.num_pending,
created=meta.timestamp,
operation=op,
Expand Down