Skip to content

Commit

Permalink
savepoint
Browse files Browse the repository at this point in the history
  • Loading branch information
odesenfans committed Oct 31, 2023
1 parent befa366 commit e04e38a
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 90 deletions.
7 changes: 7 additions & 0 deletions src/aleph/db/accessors/pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def get_pending_messages(
return session.execute(select_stmt).scalars()


def get_pending_message(session: DbSession, pending_message_id: int) -> Optional[PendingMessageDb]:
select_stmt = select(PendingMessageDb).where(
PendingMessageDb.id == pending_message_id
)
return session.execute(select_stmt).scalar_one_or_none()


def count_pending_messages(session: DbSession, chain: Optional[Chain] = None) -> int:
"""
Counts pending messages.
Expand Down
15 changes: 14 additions & 1 deletion src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from typing import Optional, Dict, Any, Mapping

import aio_pika.abc
import psycopg2
import sqlalchemy.exc
from aleph_message.models import MessageType, ItemType
Expand Down Expand Up @@ -175,12 +176,22 @@ def __init__(
session_factory: DbSessionFactory,
storage_service: StorageService,
config: Config,
pending_message_exchange: aio_pika.abc.AbstractExchange,

):
super().__init__(
storage_service=storage_service,
config=config,
)
self.session_factory = session_factory
self.pending_message_exchange = pending_message_exchange

async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None:
mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8"))
process_or_fetch = "process" if pending_message.fetched else "fetch"
await self.pending_message_exchange.publish(
mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}"
)

async def add_pending_message(
self,
Expand Down Expand Up @@ -241,7 +252,6 @@ async def add_pending_message(
session.execute(upsert_message_status_stmt)
session.execute(insert_pending_message_stmt)
session.commit()
return pending_message

except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e:
LOGGER.warning(
Expand All @@ -259,6 +269,9 @@ async def add_pending_message(
session.commit()
return None

await self._publish_pending_message(pending_message)
return pending_message


class MessageHandler(BaseMessageHandler):
"""
Expand Down
123 changes: 65 additions & 58 deletions src/aleph/jobs/fetch_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,30 @@
AsyncIterator,
Sequence,
NewType,
Optional,
)

import aio_pika.abc
from aleph_message.models import ItemHash
from configmanager import Config
from setproctitle import setproctitle

from ..chains.signature_verifier import SignatureVerifier
from aleph.db.accessors.pending_messages import (
make_pending_message_fetched_statement,
get_next_pending_messages,
get_pending_message,
)
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.models import PendingMessageDb, MessageDb
from aleph.db.models import MessageDb
from aleph.handlers.message_handler import MessageHandler
from aleph.services.ipfs import IpfsService
from aleph.services.ipfs.common import make_ipfs_client
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
from aleph.storage import StorageService
from aleph.toolkit.logging import setup_logging
from aleph.toolkit.monitoring import setup_sentry
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSessionFactory
from .job_utils import prepare_loop, MessageJob
from .job_utils import prepare_loop, MessageJob, make_pending_message_queue
from ..chains.signature_verifier import SignatureVerifier
from ..services.cache.node_cache import NodeCache

LOGGER = getLogger(__name__)
Expand All @@ -47,15 +49,23 @@ def __init__(
session_factory: DbSessionFactory,
message_handler: MessageHandler,
max_retries: int,
pending_message_queue: aio_pika.abc.AbstractQueue,
):
super().__init__(
session_factory=session_factory,
message_handler=message_handler,
max_retries=max_retries,
)
self.pending_message_queue = pending_message_queue

async def fetch_pending_message(self, pending_message: PendingMessageDb):
async def fetch_pending_message(
self, pending_message_id: int
) -> Optional[MessageDb]:
with self.session_factory() as session:
pending_message = get_pending_message(
session=session, pending_message_id=pending_message_id
)

try:
message = await self.message_handler.verify_and_fetch(
session=session, pending_message=pending_message
Expand All @@ -76,6 +86,7 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb):
exception=e,
)
session.commit()
return None

async def fetch_pending_messages(
self, config: Config, node_cache: NodeCache, loop: bool = True
Expand All @@ -87,61 +98,55 @@ async def fetch_pending_messages(
await node_cache.set(retry_messages_cache_key, 0)
max_concurrent_tasks = config.aleph.jobs.pending_messages.max_concurrency.value
fetch_tasks: Set[asyncio.Task] = set()
task_message_dict: Dict[asyncio.Task, PendingMessageDb] = {}
task_message_dict: Dict[asyncio.Task, ItemHash] = {}
messages_being_fetched: Set[str] = set()
fetched_messages: List[MessageDb] = []

while True:
with self.session_factory() as session:
if fetch_tasks:
finished_tasks, fetch_tasks = await asyncio.wait(
fetch_tasks, return_when=asyncio.FIRST_COMPLETED
)
for finished_task in finished_tasks:
pending_message = task_message_dict.pop(finished_task)
messages_being_fetched.remove(pending_message.item_hash)
await node_cache.decr(retry_messages_cache_key)

if len(fetch_tasks) < max_concurrent_tasks:
pending_messages = get_next_pending_messages(
session=session,
current_time=utc_now(),
limit=max_concurrent_tasks - len(fetch_tasks),
offset=len(fetch_tasks),
exclude_item_hashes=messages_being_fetched,
fetched=False,
)
if fetch_tasks:
finished_tasks, fetch_tasks = await asyncio.wait(
fetch_tasks, return_when=asyncio.FIRST_COMPLETED
)
for finished_task in finished_tasks:
pending_message_hash = task_message_dict.pop(finished_task)
messages_being_fetched.remove(pending_message_hash)
await node_cache.decr(retry_messages_cache_key)

if len(fetch_tasks) < max_concurrent_tasks:
for i in range(0, max_concurrent_tasks - len(fetch_tasks)):
message = await self.pending_message_queue.get(fail=False)
if message is None:
break

for pending_message in pending_messages:
# Avoid processing the same message twice at the same time.
if pending_message.item_hash in messages_being_fetched:
async with message.process(requeue=True, ignore_processed=True):
pending_message_hash = ItemHash(
message.routing_key.split(".")[1]
)
# Avoid fetching the same message twice at the same time.
if pending_message_hash in messages_being_fetched:
await message.reject(requeue=True)
continue

# Check if the message is already processing
messages_being_fetched.add(pending_message.item_hash)

messages_being_fetched.add(pending_message_hash)
await node_cache.incr(retry_messages_cache_key)

pending_message_id = int(message.body.decode("utf-8"))
message_task = asyncio.create_task(
self.fetch_pending_message(
pending_message=pending_message,
pending_message_id=pending_message_id,
)
)
fetch_tasks.add(message_task)
task_message_dict[message_task] = pending_message
task_message_dict[message_task] = pending_message_hash

if fetched_messages:
yield fetched_messages
fetched_messages = []
if fetched_messages:
yield fetched_messages
fetched_messages = []

if not PendingMessageDb.count(session):
# If not in loop mode, stop if there are no more pending messages
if not loop:
break
# If we are done, wait a few seconds until retrying
if not fetch_tasks:
LOGGER.info("waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
# If not in loop mode, stop if there are no more pending messages
if not loop:
if not messages_being_fetched:
break

def make_pipeline(
self,
Expand Down Expand Up @@ -178,27 +183,29 @@ async def fetch_messages_task(config: Config):
storage_service=storage_service,
config=config,
)
pending_message_queue = await make_pending_message_queue(
config=config, routing_key="fetch.*"
)
fetcher = PendingMessageFetcher(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
pending_message_queue=pending_message_queue,
)

while True:
with session_factory() as session:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
)
async for fetched_messages in fetch_pipeline:
for fetched_message in fetched_messages:
LOGGER.info(
"Successfully fetched %s", fetched_message.item_hash
)
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
)
async for fetched_messages in fetch_pipeline:
for fetched_message in fetched_messages:
LOGGER.info(
"Successfully fetched %s", fetched_message.item_hash
)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()
except Exception:
LOGGER.exception("Unexpected error in pending messages fetch job")

LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
Expand Down
Loading

0 comments on commit e04e38a

Please sign in to comment.