diff --git a/src/dipdup/datasources/substrate_node.py b/src/dipdup/datasources/substrate_node.py index 83183cc11..56ebc6bfa 100644 --- a/src/dipdup/datasources/substrate_node.py +++ b/src/dipdup/datasources/substrate_node.py @@ -4,6 +4,7 @@ from asyncio import Queue from collections.abc import Awaitable from collections.abc import Callable +from contextlib import suppress from copy import copy from dataclasses import dataclass from dataclasses import field @@ -203,7 +204,7 @@ async def _on_message(self, message: Message) -> None: # NOTE: Set None to identify possible subscriptions conflicts self._pending_subscription = None else: - raise Exception + raise FrameworkException('id in data, but no pending subscription') elif 'method' in data and data['method'].startswith('chain_'): subscription_id = data['params']['subscription'] if subscription_id not in self._subscription_ids: @@ -247,7 +248,12 @@ async def get_full_block(self, hash: str) -> dict[str, Any]: return await self._jsonrpc_request('chain_getBlock', [hash]) # type: ignore[no-any-return] async def get_events(self, block_hash: str) -> tuple[_SubstrateNodeEventResponse, ...]: - events = await self._interface.get_events(block_hash) + # FIXME: aiosubstrate bug, fix asap + while True: + with suppress(AttributeError): + events = await self._interface.get_events(block_hash) + break + await asyncio.sleep(0.1) result: list[_SubstrateNodeEventResponse] = [] for raw_event in events: diff --git a/src/dipdup/indexes/_subsquid.py b/src/dipdup/indexes/_subsquid.py index 9c93883e9..9c9210e3e 100644 --- a/src/dipdup/indexes/_subsquid.py +++ b/src/dipdup/indexes/_subsquid.py @@ -46,6 +46,8 @@ async def _get_node_sync_level( node = node or random.choice(self.node_datasources) node_sync_level = await node.get_head_level() + node._logger.info('current head is %s', node_sync_level) + subsquid_lag = abs(node_sync_level - subsquid_level) subsquid_available = subsquid_level - index_level self._logger.info('Subsquid is %s levels behind; %s available', subsquid_lag, subsquid_available) @@ -64,7 +66,9 @@ async def _synchronize(self, sync_level: int) -> None: return if self.subsquid_datasources: - subsquid_sync_level = await self.subsquid_datasources[0].get_head_level() + datasource = self.subsquid_datasources[0] + subsquid_sync_level = await datasource.get_head_level() + datasource._logger.info('current head is %s', subsquid_sync_level) metrics._sqd_processor_chain_height = subsquid_sync_level else: subsquid_sync_level = 0 @@ -74,15 +78,16 @@ async def _synchronize(self, sync_level: int) -> None: # NOTE: Fetch last blocks from node if there are not enough realtime messages in queue if node_sync_level: sync_level = min(sync_level, node_sync_level) - self._logger.debug('Using node datasource; sync level: %s', sync_level) + self._logger.info('Synchronizing with `node`: %s -> %s', index_level, sync_level) await self._synchronize_node(sync_level) else: sync_level = min(sync_level, subsquid_sync_level) + self._logger.info('Synchronizing with `subsquid`: %s -> %s', index_level, sync_level) await self._synchronize_subsquid(sync_level) if not self.node_datasources and not self._subsquid_started: self._subsquid_started = True - self._logger.info('No `evm.node` datasources available; polling Subsquid') + self._logger.info('No `node` datasources available; polling Subsquid') for datasource in self.subsquid_datasources: await datasource.start() diff --git a/src/dipdup/indexes/substrate_events/fetcher.py b/src/dipdup/indexes/substrate_events/fetcher.py index 04f791746..a56ae3710 100644 --- a/src/dipdup/indexes/substrate_events/fetcher.py +++ b/src/dipdup/indexes/substrate_events/fetcher.py @@ -1,6 +1,7 @@ import asyncio from collections.abc import AsyncIterator from collections.abc import Iterable +from contextlib import suppress from typing import Any from dipdup.datasources.substrate_node import SubstrateNodeDatasource @@ -159,14 +160,16 @@ async def _log_loop() -> None: ) while True: - if sum(queues[q].qsize() for q in queues) == 0 and all(t.done() for t in tasks): + if sum(queues[q].qsize() for q in queues) == 0 and all(t.done() for t in tasks[:-1]): break for t in tasks: - if t.done(): + if t.done() or t.cancelled(): await t - header, events = await queues['events'].get() - yield tuple(SubstrateEventData.from_node(event, header) for event in events) + with suppress(asyncio.TimeoutError): + while True: + header, events = await asyncio.wait_for(queues['events'].get(), timeout=1) + yield tuple(SubstrateEventData.from_node(event, header) for event in events) tasks[-1].cancel()