diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 125cb127..117fa3f2 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -8,7 +8,7 @@ from aleph_message.models import ItemHash, ItemType, MessageType from configmanager import Config from pydantic import ValidationError -from sqlalchemy import insert +from sqlalchemy.dialects.postgresql import insert from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.files import insert_content_file_pin, upsert_file @@ -241,14 +241,29 @@ async def add_pending_message( session.commit() return None + # Check if there are an already existing record + existing_message = ( + session.query(PendingMessageDb) + .filter_by( + sender=pending_message.sender, + item_hash=pending_message.item_hash, + signature=pending_message.signature, + ) + .one_or_none() + ) + if existing_message: + return existing_message + upsert_message_status_stmt = make_message_status_upsert_query( item_hash=pending_message.item_hash, new_status=MessageStatus.PENDING, reception_time=reception_time, where=MessageStatusDb.status == MessageStatus.REJECTED, ) - insert_pending_message_stmt = insert(PendingMessageDb).values( - pending_message.to_dict(exclude={"id"}) + insert_pending_message_stmt = ( + insert(PendingMessageDb) + .values(pending_message.to_dict(exclude={"id"})) + .on_conflict_do_nothing("uq_pending_message") ) try: @@ -256,23 +271,9 @@ async def add_pending_message( session.execute(insert_pending_message_stmt) session.commit() except sqlalchemy.exc.IntegrityError: - # Handle the unique constraint violation and log as debug to avoid multiple errors. - LOGGER.debug( - "Duplicate pending message detected. Fetching existing record." - ) - session.rollback() - # Fetch the existing record - existing_message = ( - session.query(PendingMessageDb) - .filter_by( - sender=pending_message.sender, - item_hash=pending_message.item_hash, - signature=pending_message.signature, - ) - .one_or_none() - ) - if existing_message: - return existing_message + # Handle the unique constraint violation. + LOGGER.warning("Duplicate pending message detected trying to save it.") + return None except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e: LOGGER.warning( diff --git a/tests/message_processing/test_process_pending_messages.py b/tests/message_processing/test_process_pending_messages.py index 8a0f3889..0cae91bb 100644 --- a/tests/message_processing/test_process_pending_messages.py +++ b/tests/message_processing/test_process_pending_messages.py @@ -12,7 +12,7 @@ @pytest.mark.asyncio -async def test_pending_message( +async def test_duplicated_pending_message( mocker, mock_config: Config, session_factory: DbSessionFactory,