Skip to content

Commit

Permalink
pydantic fixes (#83)
Browse files Browse the repository at this point in the history
* pydantic fixes

* mypy

* mypy

* mypy

* .

* model construct

* revert
  • Loading branch information
gitcarbs authored Sep 17, 2024
1 parent f594d69 commit 10e88fd
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 196 deletions.
2 changes: 1 addition & 1 deletion examples/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class UserLog(BaseModel):
_is_log_model = True
user: Optional[str]
user: Optional[str] = None


async def test_log() -> None:
Expand Down
8 changes: 4 additions & 4 deletions kafkaesk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class Application(Router):
Application configuration
"""

_producer: Optional[aiokafka.AIOKafkaProducer]
_producer: Optional[aiokafka.AIOKafkaProducer] = None

def __init__(
self,
Expand Down Expand Up @@ -535,9 +535,9 @@ async def __aenter__(self) -> "Application":

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
exc_type: Optional[Type[BaseException]] = None,
exc: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
logger.info("Stopping application...", exc_info=exc)
await self.finalize()
Expand Down
10 changes: 5 additions & 5 deletions kafkaesk/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
*,
pattern: typing.Optional[str] = None,
topics: typing.Optional[typing.List[str]] = None,
timeout_seconds: float = None,
timeout_seconds: float = 0.0,
concurrency: int = None,
):
self.consumer_id = consumer_id
Expand Down Expand Up @@ -143,7 +143,7 @@ async def inner(record: aiokafka.ConsumerRecord, span: opentracing.Span) -> None

class BatchConsumer(aiokafka.ConsumerRebalanceListener):
_subscription: Subscription
_close: typing.Optional[asyncio.Future]
_close: typing.Optional[asyncio.Future] = None
_consumer: aiokafka.AIOKafkaConsumer
_offsets: typing.Dict[aiokafka.TopicPartition, int]
_message_handler: typing.Callable
Expand Down Expand Up @@ -252,12 +252,12 @@ async def _consumer_factory(self) -> aiokafka.AIOKafkaConsumer:

if "*" in self.pattern:
pattern = fnmatch.translate(topic_id)
consumer.subscribe(pattern=pattern, listener=self)
consumer.subscribe(pattern=pattern, listener=self) # type: ignore
else:
consumer.subscribe(topics=[topic_id], listener=self)
consumer.subscribe(topics=[topic_id], listener=self) # type: ignore
elif self.topics:
topics = [self._app.topic_mng.get_topic_id(topic) for topic in self.topics]
consumer.subscribe(topics=topics, listener=self)
consumer.subscribe(topics=topics, listener=self) # type: ignore
else:
raise ValueError("Either `topics` or `pattern` should be defined")

Expand Down
8 changes: 4 additions & 4 deletions kafkaesk/ext/logging/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def __init__(
exc_info: Union[
Tuple[type, BaseException, Optional[TracebackType]], Tuple[None, None, None], None
],
func: Optional[str],
sinfo: Optional[str],
func: Optional[str] = None,
sinfo: Optional[str] = None,
pydantic_data: Optional[List[pydantic.BaseModel]] = None,
):
super().__init__(name, level, fn, lno, msg, args, exc_info, func, sinfo)
Expand All @@ -41,8 +41,8 @@ def factory(
exc_info: Union[
Tuple[type, BaseException, Optional[TracebackType]], Tuple[None, None, None], None
],
func: Optional[str],
sinfo: Optional[str],
func: Optional[str] = None,
sinfo: Optional[str] = None,
) -> PydanticLogRecord:
pydantic_data: List[pydantic.BaseModel] = []

Expand Down
6 changes: 3 additions & 3 deletions kafkaesk/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@


class KafkaTopicManager:
_admin_client: Optional[kafka.admin.client.KafkaAdminClient]
_client: Optional[kafka.KafkaClient]
_kafka_api_version: Optional[Tuple[int, ...]]
_admin_client: Optional[kafka.admin.client.KafkaAdminClient] = None
_client: Optional[kafka.KafkaClient] = None
_kafka_api_version: Optional[Tuple[int, ...]] = None

def __init__(
self,
Expand Down
6 changes: 3 additions & 3 deletions kafkaesk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ def __enter__(self) -> None:

def __exit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
exc_traceback: Optional[traceback.StackSummary],
exc_type: Optional[Type[Exception]] = None,
exc_value: Optional[Exception] = None,
exc_traceback: Optional[traceback.StackSummary] = None,
) -> None:
error = NOERROR
if self.histogram is not None:
Expand Down
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ check_untyped_defs = True
disallow_untyped_calls = True
disallow_untyped_defs = True
disallow_untyped_decorators = True
disable_error_code=empty-body,assignment,unused-coroutine,var-annotated,arg-type

[mypy-aiohttp_client]
ignore_missing_imports = True
Expand Down
316 changes: 157 additions & 159 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[tool.poetry]
name = "kafkaesk"
version = "0.8.4"
version = "0.8.5"
description = "Easy publish and subscribe to events with python and Kafka."
authors = ["vangheem <[email protected]>", "pfreixes <[email protected]>"]
classifiers = [
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Framework :: AsyncIO",
"License :: OSI Approved :: BSD License",
"Topic :: System :: Distributed Computing"
Expand All @@ -29,7 +30,7 @@ async-timeout = ">=3.0.1"
pytest = "^7.4.0"
pytest-docker-fixtures = "^1.3.17"
pytest-asyncio = "^0.21.0"
mypy = "0.910"
mypy = "1.0.0"
flake8 = "^6.0.0"
isort = "^5.12.0"

Expand Down
9 changes: 6 additions & 3 deletions stubs/aiokafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ class AIOKafkaConsumer:
_client: AIOKafkaClient
_coordinator: GroupCoordinator
_subscription: Subscription
_group_id: Optional[str]
_group_id: Optional[str] = None

def __init__(
self,
bootstrap_servers: List[str],
loop: AbstractEventLoop,
group_id: Optional[str],
group_id: Optional[str] = None,
api_version: str = "auto",
**kwargs: Any,
):
Expand All @@ -95,7 +95,10 @@ async def getone(self, *partitions: Optional[List[TopicPartition]]) -> ConsumerR
...

async def subscribe(
self, topics: Optional[List[str]] = None, pattern: Optional[str] = None, listener: Optional["ConsumerRebalanceListener"] = None
self,
topics: Optional[List[str]] = None,
pattern: Optional[str] = None,
listener: Optional["ConsumerRebalanceListener"] = None,
) -> None:
...

Expand Down
2 changes: 1 addition & 1 deletion stubs/kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class ConsumerRecord:
topic: str
value: bytes
key: bytes
headers: Optional[List[Tuple[str, bytes]]]
headers: Optional[List[Tuple[str, bytes]]] = None


class TopicPartition:
Expand Down
11 changes: 2 additions & 9 deletions tests/acceptance/ext/logging/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def logger():

@pytest_asyncio.fixture(scope="function")
def stream_handler(logger):

stream = io.StringIO()
handler = PydanticStreamHandler(stream=stream)
logger.addHandler(handler)
Expand Down Expand Up @@ -76,7 +75,6 @@ async def consume(data: PydanticLogModel):

class TestPydanticStreamHandler:
async def test_stream_handler(self, stream_handler, logger):

logger.info("Test Message %s", "extra")

message = stream_handler.getvalue()
Expand All @@ -86,7 +84,7 @@ async def test_stream_handler(self, stream_handler, logger):
async def test_stream_handler_with_log_model(self, stream_handler, logger):
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

logger.info("Test Message %s", "extra", LogModel(foo="bar"))

Expand All @@ -112,7 +110,6 @@ class LogModel(pydantic.BaseModel):

class TestPydanticKafkaeskHandler:
async def test_kafka_handler(self, app, kafakesk_handler, logger, log_consumer):

async with app:
logger.info("Test Message %s", "extra")
await app.flush()
Expand All @@ -124,7 +121,7 @@ async def test_kafka_handler(self, app, kafakesk_handler, logger, log_consumer):
async def test_kafka_handler_with_log_model(self, app, kafakesk_handler, logger, log_consumer):
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

async with app:
logger.info("Test Message %s", "extra", LogModel(foo="bar"))
Expand Down Expand Up @@ -184,7 +181,6 @@ def test_emit_drops_message_on_runtime_error_start(self):
class TestKafkaeskQueue:
@pytest_asyncio.fixture(scope="function")
async def queue(self, request, app):

max_queue = 10000
for marker in request.node.iter_markers("with_max_queue"):
max_queue = marker.args[0]
Expand Down Expand Up @@ -214,7 +210,6 @@ async def consume(data: PydanticLogModel):
assert len(consumed) == 1

async def test_queue_flush(self, app, queue, log_consumer):

async with app:
queue.start()
for i in range(10):
Expand All @@ -228,7 +223,6 @@ async def test_queue_flush(self, app, queue, log_consumer):
assert len(log_consumer) == 10

async def test_queue_flush_on_close(self, app, queue, log_consumer):

async with app:
queue.start()
await asyncio.sleep(0.1)
Expand All @@ -245,7 +239,6 @@ async def test_queue_flush_on_close(self, app, queue, log_consumer):

@pytest.mark.with_max_queue(1)
async def test_queue_max_size(self, app, queue):

queue.start()
queue.put_nowait("log.test", PydanticLogModel())

Expand Down
4 changes: 2 additions & 2 deletions tests/acceptance/ext/logging/test_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_factory_return_type() -> None:
async def test_factory_adds_pydantic_models() -> None:
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

record = factory(
name="logger.test",
Expand Down Expand Up @@ -66,7 +66,7 @@ async def test_factory_formats_msg() -> None:
async def test_factory_formats_msg_and_adds_pydantic_model() -> None:
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

record = factory(
name="logger.test",
Expand Down

0 comments on commit 10e88fd

Please sign in to comment.