Skip to content

Commit

Permalink
Batch requesting logs from EVM node (#887)
Browse files Browse the repository at this point in the history
Co-authored-by: Vladimir Bobrikov <[email protected]>
Co-authored-by: Lev Gorodetskiy <[email protected]>
  • Loading branch information
3 people authored Nov 13, 2023
1 parent 78fac19 commit d1dfc13
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic

### Added

- evm.node: Requesting logs for multiple blocks at once for EVM nodes to improve performance
- evm.subsquid: Added metrics for Subsquid Cloud deploy.

## [7.1.1] - 2023-11-07
Expand Down
31 changes: 22 additions & 9 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

LEVEL_BATCH_TIMEOUT = 1
NODE_SYNC_LIMIT = 128
NODE_BATCH_SIZE = 100


class SubsquidEventsIndex(
Expand Down Expand Up @@ -157,20 +158,32 @@ async def _synchronize(self, sync_level: int) -> None:
for handler in self._config.handlers:
typename = handler.contract.module_name
topics.add(self.topics[typename][handler.name])
# FIXME: This is terribly inefficient (but okay for the last mile); see advanced example in web3.py docs.
for level in range(first_level, sync_level + 1):

# Requesting blocks info by batch
windows = ((i, min(i + NODE_BATCH_SIZE, sync_level)) for i in range(first_level, sync_level + 1, NODE_BATCH_SIZE + 1))
for start_level, end_level in windows:
# NOTE: Get random one every time
# NOTE: Data for blocks start_level and end_level will be included
level_logs = await self.random_node.get_logs(
{
'fromBlock': hex(level),
'toBlock': hex(level),
'fromBlock': hex(start_level),
'toBlock': hex(end_level),
}
)
block = await self.random_node.get_block_by_level(level)
if block is None:
raise FrameworkException(f'Block {level} not found')
timestamp = int(block['timestamp'], 16)
parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamp) for log in level_logs)

# get timestamps for levels
timestamps = {}
for level in range(start_level, end_level + 1):
block = await self.random_node.get_block_by_level(level)
try:
timestamps[hex(level)] = int(block['timestamp'], 16)
except TypeError as e:
raise FrameworkException(f'Block {level} not found') from e

# match timestamps with logs
parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamps[log['blockNumber']])
for log in level_logs)
await self._process_level_events(parsed_level_logs, self.topics, sync_level)

await self._process_level_events(parsed_level_logs, sync_level)
if self._config.expose_metrics:
Expand Down

0 comments on commit d1dfc13

Please sign in to comment.