Skip to content

Commit

Permalink
Add Prometheus metrics for subsquid cloud (#892)
Browse files Browse the repository at this point in the history
Co-authored-by: Vladimir Bobrikov <[email protected]>
  • Loading branch information
Wizard1209 and Vladimir Bobrikov authored Nov 13, 2023
1 parent 8f3687a commit 78fac19
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning].

## Unreleased

### Added

- evm.subsquid: Added metrics for Subsquid Cloud deploy.

## [7.1.1] - 2023-11-07

### Fixed
Expand Down
1 change: 1 addition & 0 deletions src/dipdup/config/evm_subsquid_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SubsquidEventsIndexConfig(IndexConfig):
handlers: tuple[SubsquidEventsHandlerConfig, ...] = field(default_factory=tuple)
abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None
node_only: bool = False
expose_metrics: bool = False

first_level: int = 0
last_level: int = 0
Expand Down
7 changes: 7 additions & 0 deletions src/dipdup/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ async def _retry_request(
retry_count = 0 if env.TEST else self._config.retry_count
retry_count_str = 'inf' if retry_count is sys.maxsize else str(retry_count)

if Metrics.enabled:
Metrics.set_http_errors_in_row(self._url, 0)

while True:
self._logger.debug('HTTP request attempt %s/%s', attempt, retry_count_str)
try:
Expand Down Expand Up @@ -177,6 +180,10 @@ async def _retry_request(

self._logger.warning('HTTP request attempt %s/%s failed: %s', attempt, retry_count_str, e)
self._logger.info('Waiting %s seconds before retry', ratelimit_sleep or retry_sleep)

if Metrics.enabled:
Metrics.set_http_errors_in_row(self._url, attempt)

await asyncio.sleep(ratelimit_sleep or retry_sleep)
attempt += 1
if not ratelimit_sleep:
Expand Down
11 changes: 10 additions & 1 deletion src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dipdup.models.evm_subsquid import SubsquidEventData
from dipdup.models.evm_subsquid import SubsquidMessageType
from dipdup.performance import metrics
from dipdup.prometheus import Metrics

LEVEL_BATCH_TIMEOUT = 1
NODE_SYNC_LIMIT = 128
Expand Down Expand Up @@ -100,6 +101,8 @@ async def _process_queue(self) -> None:

for message_level, level_logs in logs_by_level.items():
await self._process_level_events(tuple(level_logs), message_level)
if self._config.expose_metrics:
Metrics.set_sqd_processor_last_block(message_level)

def get_sync_level(self) -> int:
"""Get level index needs to be synchronized to depending on its subscription status"""
Expand Down Expand Up @@ -131,6 +134,8 @@ async def _synchronize(self, sync_level: int) -> None:
return

subsquid_sync_level = await self.datasource.get_head_level()
if self._config.expose_metrics:
Metrics.set_sqd_processor_chain_height(subsquid_sync_level)

use_node = False
if self.node_datasources:
Expand Down Expand Up @@ -166,14 +171,18 @@ async def _synchronize(self, sync_level: int) -> None:
raise FrameworkException(f'Block {level} not found')
timestamp = int(block['timestamp'], 16)
parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamp) for log in level_logs)
await self._process_level_events(parsed_level_logs, sync_level)

await self._process_level_events(parsed_level_logs, sync_level)
if self._config.expose_metrics:
Metrics.set_sqd_processor_last_block(level)
else:
sync_level = min(sync_level, subsquid_sync_level)
fetcher = self._create_fetcher(first_level, sync_level)

async for _level, events in fetcher.fetch_by_level():
await self._process_level_events(events, sync_level)
if self._config.expose_metrics:
Metrics.set_sqd_processor_last_block(_level)

await self._exit_sync_state(sync_level)

Expand Down
31 changes: 31 additions & 0 deletions src/dipdup/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,29 @@
'Number of http errors',
['url', 'status'],
)
_http_errors_in_row = Histogram(
'dipdup_http_errors_in_row',
"""The number of consecutive failed requests"""
)
_callback_duration = Histogram(
'dipdup_callback_duration_seconds',
'Duration of callback execution',
['callback'],
)

_sqd_processor_last_block = Gauge(
'sqd_processor_last_block',
'The last processed block',
)
_sqd_processor_chain_height = Gauge(
'sqd_processor_chain_height',
'Current chain height as reported by the archive',
)
_sqd_processor_archive_http_errors_in_row = Histogram(
'sqd_processor_archive_http_errors_in_row',
"""The number of consecutive failed Archive requests"""
)


class Metrics:
enabled = False
Expand Down Expand Up @@ -133,3 +150,17 @@ def set_levels_to_sync(cls, index: str, levels: int) -> None:
@classmethod
def set_levels_to_realtime(cls, index: str, levels: int) -> None:
_index_levels_to_realtime.labels(index=index).observe(levels)

@classmethod
def set_sqd_processor_last_block(cls, last_block: int) -> None:
_sqd_processor_last_block.set(last_block)

@classmethod
def set_sqd_processor_chain_height(cls, chain_height: int) -> None:
_sqd_processor_chain_height.set(chain_height)

@classmethod
def set_http_errors_in_row(cls, url: str, errors_count: int) -> None:
_http_errors_in_row.observe(errors_count)
if 'subsquid' in url:
_sqd_processor_archive_http_errors_in_row.observe(errors_count)

0 comments on commit 78fac19

Please sign in to comment.