diff --git a/src/aleph_client/asynchronous.py b/src/aleph_client/asynchronous.py index b287a3dd..e565f0ae 100644 --- a/src/aleph_client/asynchronous.py +++ b/src/aleph_client/asynchronous.py @@ -24,11 +24,12 @@ AggregateMessage, StoreMessage, ProgramMessage, - MessagesResponse, ) +from pydantic import ValidationError from aleph_client.types import Account, StorageEnum, GenericMessage from .exceptions import MessageNotFoundError, MultipleMessagesError +from .models import MessagesResponse from .utils import get_message_type_value logger = logging.getLogger(__name__) @@ -572,6 +573,8 @@ async def get_messages( end_date: Optional[Union[datetime, float]] = None, session: Optional[ClientSession] = None, api_server: str = settings.API_HOST, + ignore_invalid_messages: bool = True, + invalid_messages_log_level: Optional[int] = None, ) -> MessagesResponse: session = session or get_fallback_session() @@ -607,8 +610,37 @@ async def get_messages( async with session.get(f"{api_server}/api/v0/messages.json", params=params) as resp: resp.raise_for_status() - messages_json = await resp.json() - return MessagesResponse(**messages_json) + response_json = await resp.json() + messages_raw = response_json["messages"] + + # All messages may not be valid according to the latest specification in + # aleph-message. This allows the user to specify how errors should be handled. + messages: List[AlephMessage] = [] + for message_raw in messages_raw: + try: + message = Message(**message_raw) + messages.append(message) + except KeyError as e: + if not ignore_invalid_messages: + raise e + if invalid_messages_log_level: + logger.log( + level=invalid_messages_log_level, + msg=f"KeyError: Field '{e.args[0]}' not found", + ) + except ValidationError as e: + if not ignore_invalid_messages: + raise e + if invalid_messages_log_level: + logger.log(level=invalid_messages_log_level, msg=e) + + return MessagesResponse( + messages=messages, + pagination_page=response_json["pagination_page"], + pagination_total=response_json["pagination_total"], + pagination_per_page=response_json["pagination_per_page"], + pagination_item=response_json["pagination_item"], + ) async def get_message( diff --git a/src/aleph_client/models.py b/src/aleph_client/models.py new file mode 100644 index 00000000..f7dfcd6e --- /dev/null +++ b/src/aleph_client/models.py @@ -0,0 +1,14 @@ +from typing import List + +from aleph_message.models import AlephMessage +from pydantic import BaseModel + + +class MessagesResponse(BaseModel): + """Response from an Aleph node API on the path /api/v0/messages.json""" + + messages: List[AlephMessage] + pagination_page: int + pagination_total: int + pagination_per_page: int + pagination_item: str