From d19f7997e6cfa6e18b8874f3ec009f1e2d0b38db Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 15 Apr 2024 13:51:22 -0300 Subject: [PATCH] Allow using `evm.node` as index datasource (#997) --- CHANGELOG.md | 6 ++++ docs/7.references/2.config.md | 10 +++--- schema.json | 21 +++++++++++ src/dipdup/config/__init__.py | 15 +++++--- src/dipdup/config/evm_subsquid.py | 2 +- src/dipdup/config/evm_subsquid_events.py | 4 +-- src/dipdup/config/evm_subsquid_traces.py | 3 +- .../config/evm_subsquid_transactions.py | 4 +-- src/dipdup/context.py | 35 +++++++++++-------- src/dipdup/indexes/evm_subsquid.py | 32 +++++++++++------ .../indexes/evm_subsquid_events/index.py | 9 +++-- .../evm_subsquid_transactions/index.py | 8 ++++- tests/configs/demo_evm_events_node.yml | 3 +- tests/configs/demo_evm_transactions_node.yml | 3 +- 14 files changed, 107 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c30f493e..96f234781 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning]. +## [Unreleased] + +### Added + +- evm.subsquid: `evm.node` datasources can be used as index datasources. + ## [7.5.4] - 2024-04-09 ### Fixed diff --git a/docs/7.references/2.config.md b/docs/7.references/2.config.md index 34e1e9dcd..88711929e 100644 --- a/docs/7.references/2.config.md +++ b/docs/7.references/2.config.md @@ -219,12 +219,12 @@ description: "Config file reference"
Parameters:
@@ -258,7 +258,7 @@ description: "Config file reference"
Parameters:
@@ -293,12 +293,12 @@ description: "Config file reference"
Parameters:
diff --git a/schema.json b/schema.json index 31bfb1262..0db3c5edd 100644 --- a/schema.json +++ b/schema.json @@ -2003,6 +2003,13 @@ "$ref": "#/$defs/SubsquidDatasourceConfig" } ] + }, + { + "allOf": [ + { + "$ref": "#/$defs/EvmNodeDatasourceConfig" + } + ] } ] }, @@ -2092,6 +2099,13 @@ "$ref": "#/$defs/SubsquidDatasourceConfig" } ] + }, + { + "allOf": [ + { + "$ref": "#/$defs/EvmNodeDatasourceConfig" + } + ] } ] }, @@ -2228,6 +2242,13 @@ "$ref": "#/$defs/SubsquidDatasourceConfig" } ] + }, + { + "allOf": [ + { + "$ref": "#/$defs/EvmNodeDatasourceConfig" + } + ] } ] }, diff --git a/src/dipdup/config/__init__.py b/src/dipdup/config/__init__.py index ae90177cf..3571fff63 100644 --- a/src/dipdup/config/__init__.py +++ b/src/dipdup/config/__init__.py @@ -934,12 +934,16 @@ def _resolve_index_links(self, index_config: ResolvedIndexConfigU) -> None: """ handler_config: HandlerConfig - # NOTE: Each index must have a corresponding (currently) TzKT datasource + # NOTE: Each index must have a corresponding index datasource if isinstance(index_config.datasource, str): - if 'tzkt' in index_config.kind: - index_config.datasource = self.get_tzkt_datasource(index_config.datasource) - elif 'subsquid' in index_config.kind: - index_config.datasource = self.get_subsquid_datasource(index_config.datasource) + name = index_config.datasource + if index_config.kind.startswith('tezos.tzkt'): + index_config.datasource = self.get_tzkt_datasource(name) + elif index_config.kind.startswith('evm.subsquid'): + try: + index_config.datasource = self.get_subsquid_datasource(name) + except ConfigurationError: + index_config.datasource = self.get_evm_node_datasource(name) else: raise FrameworkException(f'Unknown datasource type for index `{index_config.name}`') @@ -1150,6 +1154,7 @@ def _patch_annotations(replace_table: dict[str, str]) -> None: _original_to_aliased = { 'TzktDatasourceConfig': 'str | TzktDatasourceConfig', 'SubsquidDatasourceConfig': 'str | SubsquidDatasourceConfig', + 'SubsquidDatasourceConfig | EvmNodeDatasourceConfig': 'str | SubsquidDatasourceConfig | EvmNodeDatasourceConfig', 'ContractConfig': 'str | ContractConfig', 'ContractConfig | None': 'str | ContractConfig | None', 'TezosContractConfig': 'str | TezosContractConfig', diff --git a/src/dipdup/config/evm_subsquid.py b/src/dipdup/config/evm_subsquid.py index 418468649..8478e3dd5 100644 --- a/src/dipdup/config/evm_subsquid.py +++ b/src/dipdup/config/evm_subsquid.py @@ -60,4 +60,4 @@ class SubsquidIndexConfig(IndexConfig, ABC): :param datasource: Subsquid datasource config """ - datasource: SubsquidDatasourceConfig + datasource: SubsquidDatasourceConfig | EvmNodeDatasourceConfig diff --git a/src/dipdup/config/evm_subsquid_events.py b/src/dipdup/config/evm_subsquid_events.py index 94a54e4a2..f8ecd2871 100644 --- a/src/dipdup/config/evm_subsquid_events.py +++ b/src/dipdup/config/evm_subsquid_events.py @@ -11,6 +11,7 @@ from dipdup.config import AbiDatasourceConfig from dipdup.config import HandlerConfig from dipdup.config.evm import EvmContractConfig +from dipdup.config.evm_node import EvmNodeDatasourceConfig from dipdup.config.evm_subsquid import SubsquidDatasourceConfig from dipdup.config.evm_subsquid import SubsquidIndexConfig from dipdup.models.evm_node import EvmNodeHeadSubscription @@ -60,13 +61,12 @@ class SubsquidEventsIndexConfig(SubsquidIndexConfig): :param datasource: Subsquid datasource :param handlers: Event handlers :param abi: One or more `evm.abi` datasource(s) for the same network - :param node_only: Don't use Subsquid Network API (dev only) :param first_level: Level to start indexing from :param last_level: Level to stop indexing and disable this index """ kind: Literal['evm.subsquid.events'] - datasource: SubsquidDatasourceConfig + datasource: SubsquidDatasourceConfig | EvmNodeDatasourceConfig handlers: tuple[SubsquidEventsHandlerConfig, ...] = field(default_factory=tuple) abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None node_only: bool = False diff --git a/src/dipdup/config/evm_subsquid_traces.py b/src/dipdup/config/evm_subsquid_traces.py index 2529cb9c1..1f59149f4 100644 --- a/src/dipdup/config/evm_subsquid_traces.py +++ b/src/dipdup/config/evm_subsquid_traces.py @@ -9,6 +9,7 @@ from dipdup.config import AbiDatasourceConfig from dipdup.config import HandlerConfig +from dipdup.config.evm_node import EvmNodeDatasourceConfig from dipdup.config.evm_subsquid import SubsquidDatasourceConfig from dipdup.config.evm_subsquid import SubsquidIndexConfig @@ -21,7 +22,7 @@ class SubsquidTracesHandlerConfig(HandlerConfig): ... class SubsquidTracesIndexConfig(SubsquidIndexConfig): kind: Literal['evm.subsquid.traces'] - datasource: SubsquidDatasourceConfig + datasource: SubsquidDatasourceConfig | EvmNodeDatasourceConfig handlers: tuple[SubsquidTracesHandlerConfig, ...] = field(default_factory=tuple) abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None node_only: bool = False diff --git a/src/dipdup/config/evm_subsquid_transactions.py b/src/dipdup/config/evm_subsquid_transactions.py index e8e9301e8..1108f7208 100644 --- a/src/dipdup/config/evm_subsquid_transactions.py +++ b/src/dipdup/config/evm_subsquid_transactions.py @@ -12,6 +12,7 @@ from dipdup.config import CodegenMixin from dipdup.config import HandlerConfig from dipdup.config.evm import EvmContractConfig +from dipdup.config.evm_node import EvmNodeDatasourceConfig from dipdup.config.evm_subsquid import SubsquidDatasourceConfig from dipdup.config.evm_subsquid import SubsquidIndexConfig from dipdup.models.evm_node import EvmNodeHeadSubscription @@ -76,14 +77,13 @@ class SubsquidTransactionsIndexConfig(SubsquidIndexConfig): :param datasource: Subsquid datasource config :param handlers: Transaction handlers :param abi: One or many ABI datasource(s) - :param node_only: Don't use Subsquid Network, only node RPC :param first_level: Level to start indexing from :param last_level: Level to stop indexing at """ kind: Literal['evm.subsquid.transactions'] - datasource: SubsquidDatasourceConfig + datasource: SubsquidDatasourceConfig | EvmNodeDatasourceConfig handlers: tuple[SubsquidTransactionsHandlerConfig, ...] = field(default_factory=tuple) abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None node_only: bool = False diff --git a/src/dipdup/context.py b/src/dipdup/context.py index f65a8bc3d..def76ac6d 100644 --- a/src/dipdup/context.py +++ b/src/dipdup/context.py @@ -24,6 +24,8 @@ from dipdup.config import HookConfig from dipdup.config import ResolvedIndexConfigU from dipdup.config.evm import EvmContractConfig +from dipdup.config.evm_node import EvmNodeDatasourceConfig +from dipdup.config.evm_subsquid import SubsquidDatasourceConfig from dipdup.config.evm_subsquid_events import SubsquidEventsIndexConfig from dipdup.config.evm_subsquid_traces import SubsquidTracesIndexConfig from dipdup.config.evm_subsquid_transactions import SubsquidTransactionsIndexConfig @@ -76,7 +78,6 @@ from collections.abc import Iterator from types import ModuleType - from dipdup.config.evm_node import EvmNodeDatasourceConfig from dipdup.package import DipDupPackage from dipdup.transactions import TransactionManager @@ -320,8 +321,7 @@ async def _spawn_index(self, name: str, state: Index | None = None) -> Any: ) datasource_name = index_config.datasource.name - datasource: TzktDatasource | SubsquidDatasource - node_configs: tuple[EvmNodeDatasourceConfig, ...] = () + datasource: TzktDatasource | SubsquidDatasource | EvmNodeDatasource if isinstance(index_config, TzktOperationsIndexConfig | TzktOperationsUnfilteredIndexConfig): datasource = self.get_tzkt_datasource(datasource_name) @@ -342,26 +342,33 @@ async def _spawn_index(self, name: str, state: Index | None = None) -> Any: datasource = self.get_tzkt_datasource(datasource_name) index = TzktEventsIndex(self, index_config, datasource) elif isinstance(index_config, SubsquidEventsIndexConfig): - datasource = self.get_subsquid_datasource(datasource_name) - node_field = index_config.datasource.node - if node_field: - node_configs = node_configs + node_field if isinstance(node_field, tuple) else (node_field,) + datasource_config = index_config.datasource + if isinstance(datasource_config, SubsquidDatasourceConfig): + datasource = self.get_subsquid_datasource(datasource_name) + elif isinstance(datasource_config, EvmNodeDatasourceConfig): + datasource = self.get_evm_node_datasource(datasource_name) + else: + raise NotImplementedError index = SubsquidEventsIndex(self, index_config, datasource) + for node_datasource in index.node_datasources: + node_datasource.add_index(index_config) elif isinstance(index_config, SubsquidTracesIndexConfig): raise NotImplementedError elif isinstance(index_config, SubsquidTransactionsIndexConfig): - datasource = self.get_subsquid_datasource(datasource_name) - node_field = index_config.datasource.node - if node_field: - node_configs = node_configs + node_field if isinstance(node_field, tuple) else (node_field,) + datasource_config = index_config.datasource + if isinstance(datasource_config, SubsquidDatasourceConfig): + datasource = self.get_subsquid_datasource(datasource_name) + elif isinstance(datasource_config, EvmNodeDatasourceConfig): + datasource = self.get_evm_node_datasource(datasource_name) + else: + raise NotImplementedError index = SubsquidTransactionsIndex(self, index_config, datasource) + for node_datasource in index.node_datasources: + node_datasource.add_index(index_config) else: raise NotImplementedError datasource.add_index(index_config) - for node_config in node_configs: - node_datasource = self.get_evm_node_datasource(node_config.name) - node_datasource.add_index(index_config) handlers = ( (index_config.handler_config,) diff --git a/src/dipdup/indexes/evm_subsquid.py b/src/dipdup/indexes/evm_subsquid.py index 6698b899e..65ad9063f 100644 --- a/src/dipdup/indexes/evm_subsquid.py +++ b/src/dipdup/indexes/evm_subsquid.py @@ -11,6 +11,7 @@ from dipdup.config import SubsquidIndexConfigU from dipdup.config.evm import EvmContractConfig from dipdup.config.evm_node import EvmNodeDatasourceConfig +from dipdup.config.evm_subsquid import SubsquidDatasourceConfig from dipdup.context import DipDupContext from dipdup.datasources import IndexDatasource from dipdup.datasources.evm_node import NODE_LAST_MILE @@ -27,7 +28,7 @@ SUBSQUID_READAHEAD_LIMIT = 10000 IndexConfigT = TypeVar('IndexConfigT', bound=SubsquidIndexConfigU) -DatasourceT = TypeVar('DatasourceT', bound=SubsquidDatasource) +DatasourceT = TypeVar('DatasourceT', bound=SubsquidDatasource | EvmNodeDatasource) _sighashes: dict[str, str] = {} @@ -62,14 +63,19 @@ def __init__( ) -> None: super().__init__(ctx, config, datasource) - node_field = self._config.datasource.node - if node_field is None: - node_field = () - elif isinstance(node_field, EvmNodeDatasourceConfig): - node_field = (node_field,) - self._node_datasources = tuple( - self._ctx.get_evm_node_datasource(node_config.name) for node_config in node_field - ) + if isinstance(datasource, SubsquidDatasource) and isinstance(config.datasource, SubsquidDatasourceConfig): + node_field = config.datasource.node + if node_field is None: + node_field = () + elif isinstance(node_field, EvmNodeDatasourceConfig): + node_field = (node_field,) + self._node_datasources = tuple( + self._ctx.get_evm_node_datasource(node_config.name) for node_config in node_field + ) + elif isinstance(datasource, EvmNodeDatasource) and isinstance(config.datasource, EvmNodeDatasourceConfig): + self._node_datasources = (datasource,) + else: + raise FrameworkException('Invalid datasource type') @abstractmethod async def _synchronize_subsquid(self, sync_level: int) -> None: ... @@ -133,8 +139,12 @@ async def _synchronize(self, sync_level: int) -> None: if levels_left <= 0: return - subsquid_sync_level = await self.datasource.get_head_level() - Metrics.set_sqd_processor_chain_height(subsquid_sync_level) + if isinstance(self.datasource, SubsquidDatasource): + subsquid_sync_level = await self.datasource.get_head_level() + Metrics.set_sqd_processor_chain_height(subsquid_sync_level) + else: + subsquid_sync_level = 0 + node_sync_level = await self._get_node_sync_level(subsquid_sync_level, index_level) # NOTE: Fetch last blocks from node if there are not enough realtime messages in queue diff --git a/src/dipdup/indexes/evm_subsquid_events/index.py b/src/dipdup/indexes/evm_subsquid_events/index.py index e6305951d..2d9e20e76 100644 --- a/src/dipdup/indexes/evm_subsquid_events/index.py +++ b/src/dipdup/indexes/evm_subsquid_events/index.py @@ -5,6 +5,7 @@ from dipdup.config.evm_subsquid_events import SubsquidEventsHandlerConfig from dipdup.config.evm_subsquid_events import SubsquidEventsIndexConfig from dipdup.context import DipDupContext +from dipdup.datasources.evm_node import EvmNodeDatasource from dipdup.datasources.evm_subsquid import SubsquidDatasource from dipdup.exceptions import ConfigInitializationException from dipdup.exceptions import FrameworkException @@ -20,17 +21,18 @@ from dipdup.prometheus import Metrics QueueItem = tuple[EvmNodeLogData, ...] | RollbackMessage +Datasource = SubsquidDatasource | EvmNodeDatasource class SubsquidEventsIndex( - SubsquidIndex[SubsquidEventsIndexConfig, QueueItem, SubsquidDatasource], + SubsquidIndex[SubsquidEventsIndexConfig, QueueItem, Datasource], message_type=SubsquidMessageType.logs, ): def __init__( self, ctx: DipDupContext, config: SubsquidEventsIndexConfig, - datasource: SubsquidDatasource, + datasource: Datasource, ) -> None: super().__init__(ctx, config, datasource) self._topics: dict[str, dict[str, str]] | None = None @@ -78,6 +80,9 @@ def _create_subsquid_fetcher(self, first_level: int, last_level: int) -> Subsqui ] topics.append((address, event_abi['topic0'])) + if not isinstance(self._datasource, SubsquidDatasource): + raise FrameworkException('Creating subsquid fetcher with non-subsquid datasource') + return SubsquidEventFetcher( datasource=self._datasource, first_level=first_level, diff --git a/src/dipdup/indexes/evm_subsquid_transactions/index.py b/src/dipdup/indexes/evm_subsquid_transactions/index.py index 708c2e39f..83459a592 100644 --- a/src/dipdup/indexes/evm_subsquid_transactions/index.py +++ b/src/dipdup/indexes/evm_subsquid_transactions/index.py @@ -3,8 +3,10 @@ from dipdup.config.evm_subsquid_transactions import SubsquidTransactionsHandlerConfig from dipdup.config.evm_subsquid_transactions import SubsquidTransactionsIndexConfig +from dipdup.datasources.evm_node import EvmNodeDatasource from dipdup.datasources.evm_subsquid import SubsquidDatasource from dipdup.exceptions import ConfigInitializationException +from dipdup.exceptions import FrameworkException from dipdup.indexes.evm_subsquid import SubsquidIndex from dipdup.indexes.evm_subsquid import get_sighash from dipdup.indexes.evm_subsquid_transactions.fetcher import EvmNodeTransactionFetcher @@ -18,10 +20,11 @@ from dipdup.prometheus import Metrics QueueItem = tuple[EvmNodeTransactionData, ...] | RollbackMessage +Datasource = SubsquidDatasource | EvmNodeDatasource class SubsquidTransactionsIndex( - SubsquidIndex[SubsquidTransactionsIndexConfig, QueueItem, SubsquidDatasource], + SubsquidIndex[SubsquidTransactionsIndexConfig, QueueItem, Datasource], message_type=SubsquidMessageType.transactions, ): def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: @@ -74,6 +77,9 @@ def _create_subsquid_fetcher(self, first_level: int, last_level: int) -> Subsqui raise NotImplementedError filters.append(query) + if not isinstance(self._datasource, SubsquidDatasource): + raise FrameworkException('Creating subsquid fetcher with non-subsquid datasource') + return SubsquidTransactionFetcher( datasource=self._datasource, first_level=first_level, diff --git a/tests/configs/demo_evm_events_node.yml b/tests/configs/demo_evm_events_node.yml index eef2c6afe..3aa75cdeb 100644 --- a/tests/configs/demo_evm_events_node.yml +++ b/tests/configs/demo_evm_events_node.yml @@ -26,11 +26,10 @@ contracts: indexes: eth_usdt_events: kind: evm.subsquid.events - datasource: subsquid + datasource: evm_node handlers: - callback: on_transfer contract: eth_usdt name: Transfer first_level: 18077421 last_level: 18077421 - node_only: true diff --git a/tests/configs/demo_evm_transactions_node.yml b/tests/configs/demo_evm_transactions_node.yml index 55fe06ec4..e8dffcceb 100644 --- a/tests/configs/demo_evm_transactions_node.yml +++ b/tests/configs/demo_evm_transactions_node.yml @@ -24,11 +24,10 @@ contracts: indexes: eth_usdt_transactions: kind: evm.subsquid.transactions - datasource: subsquid + datasource: evm_node handlers: - callback: on_transfer to: eth_usdt method: transfer first_level: 18077421 last_level: 18077421 - node_only: true