From 78fac19caa2644b468b27a5083edf839f7eea599 Mon Sep 17 00:00:00 2001 From: Wizard1209 <34334729+Wizard1209@users.noreply.github.com> Date: Mon, 13 Nov 2023 18:00:24 -0300 Subject: [PATCH] Add Prometheus metrics for subsquid cloud (#892) Co-authored-by: Vladimir Bobrikov --- CHANGELOG.md | 6 ++++ src/dipdup/config/evm_subsquid_events.py | 1 + src/dipdup/http.py | 7 +++++ .../indexes/evm_subsquid_events/index.py | 11 ++++++- src/dipdup/prometheus.py | 31 +++++++++++++++++++ 5 files changed, 55 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07b0bc0ed..f20266613 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning]. +## Unreleased + +### Added + +- evm.subsquid: Added metrics for Subsquid Cloud deploy. + ## [7.1.1] - 2023-11-07 ### Fixed diff --git a/src/dipdup/config/evm_subsquid_events.py b/src/dipdup/config/evm_subsquid_events.py index 55f2ac254..789740420 100644 --- a/src/dipdup/config/evm_subsquid_events.py +++ b/src/dipdup/config/evm_subsquid_events.py @@ -68,6 +68,7 @@ class SubsquidEventsIndexConfig(IndexConfig): handlers: tuple[SubsquidEventsHandlerConfig, ...] = field(default_factory=tuple) abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None node_only: bool = False + expose_metrics: bool = False first_level: int = 0 last_level: int = 0 diff --git a/src/dipdup/http.py b/src/dipdup/http.py index 7e7a7fa82..758c536b0 100644 --- a/src/dipdup/http.py +++ b/src/dipdup/http.py @@ -147,6 +147,9 @@ async def _retry_request( retry_count = 0 if env.TEST else self._config.retry_count retry_count_str = 'inf' if retry_count is sys.maxsize else str(retry_count) + if Metrics.enabled: + Metrics.set_http_errors_in_row(self._url, 0) + while True: self._logger.debug('HTTP request attempt %s/%s', attempt, retry_count_str) try: @@ -177,6 +180,10 @@ async def _retry_request( self._logger.warning('HTTP request attempt %s/%s failed: %s', attempt, retry_count_str, e) self._logger.info('Waiting %s seconds before retry', ratelimit_sleep or retry_sleep) + + if Metrics.enabled: + Metrics.set_http_errors_in_row(self._url, attempt) + await asyncio.sleep(ratelimit_sleep or retry_sleep) attempt += 1 if not ratelimit_sleep: diff --git a/src/dipdup/indexes/evm_subsquid_events/index.py b/src/dipdup/indexes/evm_subsquid_events/index.py index 79fa4bf6a..71c362668 100644 --- a/src/dipdup/indexes/evm_subsquid_events/index.py +++ b/src/dipdup/indexes/evm_subsquid_events/index.py @@ -21,6 +21,7 @@ from dipdup.models.evm_subsquid import SubsquidEventData from dipdup.models.evm_subsquid import SubsquidMessageType from dipdup.performance import metrics +from dipdup.prometheus import Metrics LEVEL_BATCH_TIMEOUT = 1 NODE_SYNC_LIMIT = 128 @@ -100,6 +101,8 @@ async def _process_queue(self) -> None: for message_level, level_logs in logs_by_level.items(): await self._process_level_events(tuple(level_logs), message_level) + if self._config.expose_metrics: + Metrics.set_sqd_processor_last_block(message_level) def get_sync_level(self) -> int: """Get level index needs to be synchronized to depending on its subscription status""" @@ -131,6 +134,8 @@ async def _synchronize(self, sync_level: int) -> None: return subsquid_sync_level = await self.datasource.get_head_level() + if self._config.expose_metrics: + Metrics.set_sqd_processor_chain_height(subsquid_sync_level) use_node = False if self.node_datasources: @@ -166,14 +171,18 @@ async def _synchronize(self, sync_level: int) -> None: raise FrameworkException(f'Block {level} not found') timestamp = int(block['timestamp'], 16) parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamp) for log in level_logs) - await self._process_level_events(parsed_level_logs, sync_level) + await self._process_level_events(parsed_level_logs, sync_level) + if self._config.expose_metrics: + Metrics.set_sqd_processor_last_block(level) else: sync_level = min(sync_level, subsquid_sync_level) fetcher = self._create_fetcher(first_level, sync_level) async for _level, events in fetcher.fetch_by_level(): await self._process_level_events(events, sync_level) + if self._config.expose_metrics: + Metrics.set_sqd_processor_last_block(_level) await self._exit_sync_state(sync_level) diff --git a/src/dipdup/prometheus.py b/src/dipdup/prometheus.py index 8356647a7..87121eed6 100644 --- a/src/dipdup/prometheus.py +++ b/src/dipdup/prometheus.py @@ -61,12 +61,29 @@ 'Number of http errors', ['url', 'status'], ) +_http_errors_in_row = Histogram( + 'dipdup_http_errors_in_row', + """The number of consecutive failed requests""" +) _callback_duration = Histogram( 'dipdup_callback_duration_seconds', 'Duration of callback execution', ['callback'], ) +_sqd_processor_last_block = Gauge( + 'sqd_processor_last_block', + 'The last processed block', +) +_sqd_processor_chain_height = Gauge( + 'sqd_processor_chain_height', + 'Current chain height as reported by the archive', +) +_sqd_processor_archive_http_errors_in_row = Histogram( + 'sqd_processor_archive_http_errors_in_row', + """The number of consecutive failed Archive requests""" +) + class Metrics: enabled = False @@ -133,3 +150,17 @@ def set_levels_to_sync(cls, index: str, levels: int) -> None: @classmethod def set_levels_to_realtime(cls, index: str, levels: int) -> None: _index_levels_to_realtime.labels(index=index).observe(levels) + + @classmethod + def set_sqd_processor_last_block(cls, last_block: int) -> None: + _sqd_processor_last_block.set(last_block) + + @classmethod + def set_sqd_processor_chain_height(cls, chain_height: int) -> None: + _sqd_processor_chain_height.set(chain_height) + + @classmethod + def set_http_errors_in_row(cls, url: str, errors_count: int) -> None: + _http_errors_in_row.observe(errors_count) + if 'subsquid' in url: + _sqd_processor_archive_http_errors_in_row.observe(errors_count)