Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Any invalid message in a response crashed the entire response #109

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src/aleph_client/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
AggregateMessage,
StoreMessage,
ProgramMessage,
MessagesResponse,
)
from pydantic import ValidationError

from aleph_client.types import Account, StorageEnum, GenericMessage
from .exceptions import MessageNotFoundError, MultipleMessagesError
from .models import MessagesResponse
from .utils import get_message_type_value

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -576,6 +577,8 @@ async def get_messages(
end_date: Optional[Union[datetime, float]] = None,
session: Optional[ClientSession] = None,
api_server: str = settings.API_HOST,
ignore_invalid_messages: bool = True,
invalid_messages_log_level: int = logging.NOTSET,
) -> MessagesResponse:
session = session or get_fallback_session()

Expand Down Expand Up @@ -611,8 +614,36 @@ async def get_messages(

async with session.get(f"{api_server}/api/v0/messages.json", params=params) as resp:
resp.raise_for_status()
messages_json = await resp.json()
return MessagesResponse(**messages_json)
response_json = await resp.json()
messages_raw = response_json["messages"]

# All messages may not be valid according to the latest specification in
# aleph-message. This allows the user to specify how errors should be handled.
messages: List[AlephMessage] = []
for message_raw in messages_raw:
try:
message = Message(**message_raw)
messages.append(message)
except KeyError as e:
if not ignore_invalid_messages:
raise e
logger.log(
level=invalid_messages_log_level,
msg=f"KeyError: Field '{e.args[0]}' not found",
)
except ValidationError as e:
if not ignore_invalid_messages:
raise e
if invalid_messages_log_level:
logger.log(level=invalid_messages_log_level, msg=e)

return MessagesResponse(
messages=messages,
pagination_page=response_json["pagination_page"],
pagination_total=response_json["pagination_total"],
pagination_per_page=response_json["pagination_per_page"],
pagination_item=response_json["pagination_item"],
)


async def get_message(
Expand Down
14 changes: 14 additions & 0 deletions src/aleph_client/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import List

from aleph_message.models import AlephMessage
from pydantic import BaseModel


class MessagesResponse(BaseModel):
"""Response from an Aleph node API on the path /api/v0/messages.json"""

messages: List[AlephMessage]
pagination_page: int
pagination_total: int
pagination_per_page: int
pagination_item: str