Skip to content

Commit

Permalink
feat: add Send type hint parameter to udf when a Stream is defined (#202
Browse files Browse the repository at this point in the history
)
  • Loading branch information
marcosschroh authored Aug 14, 2024
1 parent 67e6165 commit 98dd6df
Show file tree
Hide file tree
Showing 12 changed files with 782 additions and 128 deletions.
44 changes: 30 additions & 14 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,55 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
members:
-

## Dependency Injection and typing
## Dependency Injection

The old way to itereate over a stream is with the `async for _ in stream` loop. The iterable approach works but in most cases end users are interested only in the `ConsumerRecord` and not in the `stream`, for this reason now it is possible to remove the `loop` and every time that a new event is in the stream the `coroutine` function defined by the end user will ba `awaited`. If the `stream` is also needed, for example because `manual` commit is enabled then you can also add the `stream` as an argument in the coroutine.
The old way to itereate over a stream is with the `async for _ in stream` loop. The iterable approach works but in most cases end users are interested only in the `ConsumerRecord`,
for this reason it is possible to remove the `async for loop` using proper `typing hints`. The available `typing hints` are:

=== "Use only the ConsumerRecord"
- `ConsumerRecord`: The `aiokafka` ConsumerRecord that will be received every time that a new event is in the `Stream`
- `Stream`: The `Stream` object that is subscribed to the topic/s. Useful when `manual` commit is enabled or when other `Stream` operations are needed
- `Send`: Coroutine to produce events. The same as `stream_engine.send(...)`

if you use `type hints` then every time that a new event is in the stream the `coroutine` function defined by the end user will ba `awaited` with the specified types

=== "ConsumerRecord"
```python
@stream_engine.stream(topic, name="my-stream")
@stream_engine.stream(topic)
async def my_stream(cr: ConsumerRecord):
save_to_db(cr.value)
print(cr.value)
```

=== "Use ConsumerRecord and Stream"
=== "ConsumerRecord and Stream"
```python
@stream_engine.stream(topic, name="my-stream", enable_auto_commit=False)
@stream_engine.stream(topic, enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream):
save_to_db(cr.value)
print(cr.value)
await stream.commit()
```

=== "ConsumerRecord, Stream and Send"
```python
@stream_engine.stream(topic, enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream, send: Send):
print(cr.value)
await stream.commit()
await send("sink-to-elastic-topic", value=cr.value)
```

=== "Old fashion"
```python
@stream_engine.stream(topic, name="my-stream")
@stream_engine.stream(topic)
async def consume(stream): # you can specify the type but it will be the same result
async for cr in stream:
save_to_db(cr.value)
print(cr.value)
# you can do something with the stream as well!!
```

!!! note
A proper typing is required in order to remove the `async for in` loop. The argument order is also important, this might change in the future.
!!! Note
The type arguments can be in `any` order. This might change in the future.

!!! note
It is still possible to use the `async for in` loop, but it might be removed in the future.
!!! warning
It is still possible to use the `async for in` loop, but it might be removed in the future. Migrate to the typing approach

## Creating a Stream instance

Expand Down
500 changes: 500 additions & 0 deletions examples/fastapi-sse/poetry.lock

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion examples/recommended-worker-app/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Recommended usage

This example shows the recommended way to organize your application.
This example shows the recommended way to organize your application and to prevent circular imports when producing events

## Requirements

Expand All @@ -17,6 +17,7 @@ poetry install
1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:

```bash
./scripts/cluster/events/send "local--hello-world"
```
Expand All @@ -28,6 +29,14 @@ showing bytes: b'foo'
showing bytes: b'bar'
```

4. After the `stream` receives an event, then an event is produced to the topic `local--kstreams` using the `send` argument. To check the event execute:

```bash
./scripts/cluster/events/read "local--kstreams"
```

Then, you should see something like: `Event confirmed. b'foo'`

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
Expand Down
4 changes: 2 additions & 2 deletions examples/recommended-worker-app/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import logging

from kstreams import ConsumerRecord, stream
from kstreams import ConsumerRecord, Send, stream

logger = logging.getLogger(__name__)


@stream("local--hello-world", group_id="example-group")
async def consume(cr: ConsumerRecord) -> None:
async def consume(cr: ConsumerRecord, send: Send) -> None:
logger.info(f"showing bytes: {cr.value}")
value = f"Event confirmed. {cr.value}"

await send(
"local--kstreams",
value=value.encode(),
key="1",
)
4 changes: 2 additions & 2 deletions examples/simple-example/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions kstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .streams import Stream, stream
from .structs import TopicPartitionOffset
from .test_utils import TestStreamClient
from .types import Send

__all__ = [
"Consumer",
Expand All @@ -22,6 +23,7 @@
"MetricsRebalanceListener",
"ManualCommitRebalanceListener",
"RebalanceListener",
"Send",
"Stream",
"stream",
"ConsumerRecord",
Expand Down
15 changes: 13 additions & 2 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .clients import Consumer, Producer
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .middleware import ExceptionMiddleware, Middleware
from .middleware.udf_middleware import UdfHandler
from .prometheus.monitor import PrometheusMonitor
from .rebalance_listener import MetricsRebalanceListener, RebalanceListener
from .serializers import Deserializer, Serializer
Expand Down Expand Up @@ -357,10 +358,20 @@ def add_stream(self, stream: Stream) -> None:
stream.rebalance_listener.stream = stream # type: ignore
stream.rebalance_listener.engine = self # type: ignore

stream.udf_handler = UdfHandler(
next_call=stream.func,
send=self.send,
stream=stream,
)

# NOTE: When `no typing` support is deprecated this check can
# be removed
if stream.udf_handler.type != UDFType.NO_TYPING:
stream.func = self.build_stream_middleware_stack(stream)
stream.func = self.build_stream_middleware_stack(stream=stream)

def build_stream_middleware_stack(self, *, stream: Stream) -> NextMiddlewareCall:
assert stream.udf_handler, "UdfHandler can not be None"

def build_stream_middleware_stack(self, stream: Stream) -> NextMiddlewareCall:
stream.middlewares = [Middleware(ExceptionMiddleware)] + stream.middlewares

next_call = stream.udf_handler
Expand Down
59 changes: 59 additions & 0 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import inspect
import sys
import typing

from kstreams import ConsumerRecord, types
from kstreams.streams import Stream
from kstreams.streams_utils import UDFType, setup_type

from .middleware import BaseMiddleware

if sys.version_info < (3, 10):

async def anext(async_gen: typing.AsyncGenerator):
return await async_gen.__anext__()


class UdfHandler(BaseMiddleware):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
signature = inspect.signature(self.next_call)
self.params = list(signature.parameters.values())
self.type: UDFType = setup_type(self.params)

def bind_udf_params(self, cr: ConsumerRecord) -> typing.List:
# NOTE: When `no typing` support is deprecated then this can
# be more eficient as the CR will be always there.
ANNOTATIONS_TO_PARAMS = {
ConsumerRecord: cr,
Stream: self.stream,
types.Send: self.send,
}

return [ANNOTATIONS_TO_PARAMS[param.annotation] for param in self.params]

async def __call__(self, cr: ConsumerRecord) -> typing.Any:
"""
Call the coroutine `async def my_function(...)` defined by the end user
in a proper way according to its parameters. The `handler` is the
coroutine defined by the user.
Use cases:
1. UDFType.CR_ONLY_TYPING: Only ConsumerRecord with typing
@stream_engine.stream(topic, name="my-stream")
async def consume(cr: ConsumerRecord):
...
2. UDFType.ALL_TYPING: ConsumerRecord and Stream with typing.
The order is important as they are arguments and not kwargs
@stream_engine.stream(topic, name="my-stream")
async def consume(cr: ConsumerRecord, stream: Stream):
...
"""
params = self.bind_udf_params(cr)

if inspect.isasyncgenfunction(self.next_call):
return await anext(self.next_call(*params))
return await self.next_call(*params)
Loading

0 comments on commit 98dd6df

Please sign in to comment.