Skip to content

Commit

Permalink
Fix: warning on failure to close node cache properly
Browse files Browse the repository at this point in the history
Problem: a warning occurs in the tests because of an improper cleanup of
the Redis client object.

Solution: make the NodeCache class an asynchronous context manager and
make it clean up after itself.
  • Loading branch information
odesenfans committed Nov 2, 2023
1 parent 7cc7086 commit 5bc35e2
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 162 deletions.
3 changes: 3 additions & 0 deletions src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ async def configure_aiohttp_app(
node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
# TODO: find a way to close the node cache when exiting the API process, not closing it causes
# a warning.
await node_cache.open()

ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
Expand Down
101 changes: 51 additions & 50 deletions src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ async def init_node_cache(config: Config) -> NodeCache:
node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)

# Reset the cache
await node_cache.reset()
return node_cache


Expand Down Expand Up @@ -130,63 +127,67 @@ async def main(args: List[str]) -> None:
setup_logging(args.loglevel)

node_cache = await init_node_cache(config)
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
chain_data_service = ChainDataService(
session_factory=session_factory,
storage_service=storage_service,
)
pending_tx_publisher = await PendingTxPublisher.new(config=config)
chain_connector = ChainConnector(
session_factory=session_factory,
pending_tx_publisher=pending_tx_publisher,
chain_data_service=chain_data_service,
)
async with node_cache:
# Reset the cache
await node_cache.reset()

set_start_method("spawn")
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
chain_data_service = ChainDataService(
session_factory=session_factory,
storage_service=storage_service,
)
pending_tx_publisher = await PendingTxPublisher.new(config=config)
chain_connector = ChainConnector(
session_factory=session_factory,
pending_tx_publisher=pending_tx_publisher,
chain_data_service=chain_data_service,
)

set_start_method("spawn")

tasks: List[Coroutine] = []
tasks: List[Coroutine] = []

if not args.no_jobs:
LOGGER.debug("Creating jobs")
tasks += start_jobs(
if not args.no_jobs:
LOGGER.debug("Creating jobs")
tasks += start_jobs(
config=config,
session_factory=session_factory,
ipfs_service=ipfs_service,
use_processes=True,
)

LOGGER.debug("Initializing p2p")
p2p_client, p2p_tasks = await p2p.init_p2p(
config=config,
session_factory=session_factory,
service_name="network-monitor",
ipfs_service=ipfs_service,
use_processes=True,
node_cache=node_cache,
)
tasks += p2p_tasks
LOGGER.debug("Initialized p2p")

LOGGER.debug("Initializing p2p")
p2p_client, p2p_tasks = await p2p.init_p2p(
config=config,
session_factory=session_factory,
service_name="network-monitor",
ipfs_service=ipfs_service,
node_cache=node_cache,
)
tasks += p2p_tasks
LOGGER.debug("Initialized p2p")

LOGGER.debug("Initializing listeners")
tasks += listener_tasks(
config=config,
session_factory=session_factory,
node_cache=node_cache,
p2p_client=p2p_client,
)
tasks.append(chain_connector.chain_event_loop(config))
LOGGER.debug("Initialized listeners")
LOGGER.debug("Initializing listeners")
tasks += listener_tasks(
config=config,
session_factory=session_factory,
node_cache=node_cache,
p2p_client=p2p_client,
)
tasks.append(chain_connector.chain_event_loop(config))
LOGGER.debug("Initialized listeners")

LOGGER.debug("Initializing cache tasks")
tasks.append(refresh_cache_materialized_views(session_factory))
LOGGER.debug("Initialized cache tasks")
LOGGER.debug("Initializing cache tasks")
tasks.append(refresh_cache_materialized_views(session_factory))
LOGGER.debug("Initialized cache tasks")

LOGGER.debug("Running event loop")
await asyncio.gather(*tasks)
LOGGER.debug("Running event loop")
await asyncio.gather(*tasks)


def run():
Expand Down
73 changes: 37 additions & 36 deletions src/aleph/jobs/fetch_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,46 +162,47 @@ async def fetch_messages_task(config: Config):
engine = make_engine(config=config, application_name="aleph-fetch")
session_factory = make_session_factory(engine)

node_cache = NodeCache(
async with NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
fetcher = PendingMessageFetcher(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
)
) as node_cache:

ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
fetcher = PendingMessageFetcher(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
)

while True:
with session_factory() as session:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
)
async for fetched_messages in fetch_pipeline:
for fetched_message in fetched_messages:
LOGGER.info(
"Successfully fetched %s", fetched_message.item_hash
)
while True:
with session_factory() as session:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
)
async for fetched_messages in fetch_pipeline:
for fetched_message in fetched_messages:
LOGGER.info(
"Successfully fetched %s", fetched_message.item_hash
)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()
except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()

LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)


def fetch_pending_messages_subprocess(config_values: Dict):
Expand Down
75 changes: 38 additions & 37 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,47 +142,48 @@ async def fetch_and_process_messages_task(config: Config):
engine = make_engine(config=config, application_name="aleph-process")
session_factory = make_session_factory(engine)

node_cache = NodeCache(
async with NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
pending_message_processor = await PendingMessageProcessor.new(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
mq_host=config.p2p.mq_host.value,
mq_port=config.rabbitmq.port.value,
mq_username=config.rabbitmq.username.value,
mq_password=config.rabbitmq.password.value,
message_exchange_name=config.rabbitmq.message_exchange.value,
)
) as node_cache:

ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
pending_message_processor = await PendingMessageProcessor.new(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
mq_host=config.p2p.mq_host.value,
mq_port=config.rabbitmq.port.value,
mq_username=config.rabbitmq.username.value,
mq_password=config.rabbitmq.password.value,
message_exchange_name=config.rabbitmq.message_exchange.value,
)

while True:
with session_factory() as session:
try:
message_processing_pipeline = pending_message_processor.make_pipeline()
async for processing_results in message_processing_pipeline:
for result in processing_results:
LOGGER.info("Successfully processed %s", result.item_hash)
while True:
with session_factory() as session:
try:
message_processing_pipeline = pending_message_processor.make_pipeline()
async for processing_results in message_processing_pipeline:
for result in processing_results:
LOGGER.info("Successfully processed %s", result.item_hash)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()
except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()

LOGGER.info("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.info("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)


def pending_messages_subprocess(config_values: Dict):
Expand Down
71 changes: 36 additions & 35 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,44 +140,45 @@ async def handle_txs_task(config: Config):

pending_tx_queue = await make_pending_tx_queue(config=config)

node_cache = NodeCache(
async with NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
message_publisher = MessagePublisher(
session_factory=session_factory,
storage_service=storage_service,
config=config,
)
chain_data_service = ChainDataService(
session_factory=session_factory, storage_service=storage_service
)
pending_tx_processor = PendingTxProcessor(
session_factory=session_factory,
message_publisher=message_publisher,
chain_data_service=chain_data_service,
pending_tx_queue=pending_tx_queue,
)
) as node_cache:

ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
message_publisher = MessagePublisher(
session_factory=session_factory,
storage_service=storage_service,
config=config,
)
chain_data_service = ChainDataService(
session_factory=session_factory, storage_service=storage_service
)
pending_tx_processor = PendingTxProcessor(
session_factory=session_factory,
message_publisher=message_publisher,
chain_data_service=chain_data_service,
pending_tx_queue=pending_tx_queue,
)

async with pending_tx_processor:
while True:
try:
await pending_tx_processor.process_pending_txs(
max_concurrent_tasks=max_concurrent_tasks
)
except Exception:
LOGGER.exception("Error in pending txs job")
async with pending_tx_processor:
while True:
try:
await pending_tx_processor.process_pending_txs(
max_concurrent_tasks=max_concurrent_tasks
)
except Exception:
LOGGER.exception("Error in pending txs job")

try:
await asyncio.wait_for(pending_tx_processor.ready(), 5)
except TimeoutError:
pass
try:
await asyncio.wait_for(pending_tx_processor.ready(), 5)
except TimeoutError:
pass


def pending_txs_subprocess(config_values: Dict):
Expand Down
Loading

0 comments on commit 5bc35e2

Please sign in to comment.