Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a typo in index.py, code quality improvements #897

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ rlp==3.0.0
ruamel-yaml==0.18.5
ruamel-yaml-clib==0.2.7
ruff==0.1.5
sentry-sdk==1.34.0
sentry-sdk==1.35.0
setuptools==68.2.2
six==1.16.0
sniffio==1.3.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ rfc3339-validator==0.1.4
rlp==3.0.0
ruamel-yaml==0.18.5
ruamel-yaml-clib==0.2.7
sentry-sdk==1.34.0
sentry-sdk==1.35.0
setuptools==68.2.2
six==1.16.0
sniffio==1.3.0
Expand Down
2 changes: 0 additions & 2 deletions src/dipdup/codegen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ async def init(
await self.generate_system_hooks()
await self.generate_handlers()

# self._package.verify()

@abstractmethod
async def generate_abi(self) -> None:
...
Expand Down
2 changes: 1 addition & 1 deletion src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async def _on_message(self, message: Message) -> None:
if subscription_id not in self._subscription_ids:
raise FrameworkException(f'{self.name}: Unknown subscription ID: {subscription_id}')
subscription = self._subscription_ids[subscription_id]
self._logger.info('Received a message from channel %s', subscription_id)
self._logger.debug('Received a message from channel %s', subscription_id)
await self._handle_subscription(subscription, data['params']['result'])
else:
raise DatasourceError(f'Unknown method: {data["method"]}', self.name)
Expand Down
4 changes: 1 addition & 3 deletions src/dipdup/datasources/evm_subsquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
'address': True,
'data': True,
'topics': True,
# 'blockNumber': True,
# 'blockHash': True,
}


Expand Down Expand Up @@ -68,7 +66,7 @@ async def iter_event_logs(
) -> AsyncIterator[tuple[SubsquidEventData, ...]]:
current_level = first_level

# TODO: smarter query optimizator
# TODO: Smarter query optimizator
topics_by_address = defaultdict(list)
for address, topic in topics:
topics_by_address[address].append(topic)
Expand Down
4 changes: 2 additions & 2 deletions src/dipdup/datasources/tezos_tzkt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ def _get_request_params(
offset: int | None = None,
limit: int | None = None,
select: tuple[str, ...] | None = None,
values: bool = False, # return only list of chosen values instead of dict
values: bool = False,
cursor: bool = False,
sort: str | None = None,
**kwargs: Any,
Expand All @@ -1074,8 +1074,8 @@ def _get_request_params(
params['offset.cr'] = offset
else:
params['offset'] = offset
# NOTE: If `values` is set request will return list of lists instead of list of dicts.
if select:
# filter fields
params['select.values' if values else 'select'] = ','.join(select)
if sort:
if sort.startswith('-'):
Expand Down
2 changes: 1 addition & 1 deletion src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ async def run(self) -> None:
advanced = self._config.advanced
tasks: set[Task[None]] = set()

# verify before start so obvious mistakes can be seen instantly
# NOTE: Verify package before indexing to ensure that all modules are importable
self._ctx.package.verify()

async with AsyncExitStack() as stack:
Expand Down
48 changes: 29 additions & 19 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import random
import time
from collections import defaultdict
from collections import deque
from typing import Any
from typing import cast

Expand All @@ -25,7 +26,7 @@

LEVEL_BATCH_TIMEOUT = 1
NODE_SYNC_LIMIT = 128
NODE_BATCH_SIZE = 100
NODE_BATCH_SIZE = 10


class SubsquidEventsIndex(
Expand Down Expand Up @@ -101,6 +102,7 @@ async def _process_queue(self) -> None:
break

for message_level, level_logs in logs_by_level.items():
self._logger.info('Processing %s event logs of level %s', len(level_logs), message_level)
await self._process_level_events(tuple(level_logs), message_level)
if self._config.expose_metrics:
Metrics.set_sqd_processor_last_block(message_level)
Expand Down Expand Up @@ -159,35 +161,43 @@ async def _synchronize(self, sync_level: int) -> None:
typename = handler.contract.module_name
topics.add(self.topics[typename][handler.name])

# Requesting blocks info by batch
windows = ((i, min(i + NODE_BATCH_SIZE, sync_level)) for i in range(first_level, sync_level + 1, NODE_BATCH_SIZE + 1))
for start_level, end_level in windows:
# NOTE: Get random one every time
# NOTE: Data for blocks start_level and end_level will be included
# NOTE: Requesting logs by batches of NODE_BATCH_SIZE.
batch_first_level = first_level
while batch_first_level <= sync_level:
batch_last_level = min(batch_first_level + NODE_BATCH_SIZE, sync_level)
level_logs = await self.random_node.get_logs(
{
'fromBlock': hex(start_level),
'toBlock': hex(end_level),
'fromBlock': hex(batch_first_level),
'toBlock': hex(batch_last_level),
}
)

# get timestamps for levels
timestamps = {}
for level in range(start_level, end_level + 1):
# NOTE: We need block timestamps for each level, so fetch them separately and match with logs.
timestamps: dict[int, int] = {}
tasks: deque[asyncio.Task[None]] = deque()

async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:
block = await self.random_node.get_block_by_level(level)
try:
timestamps[hex(level)] = int(block['timestamp'], 16)
except TypeError as e:
raise FrameworkException(f'Block {level} not found') from e
timestamps[level] = int(block['timestamp'], 16)

for level in range(batch_first_level, batch_last_level + 1):
tasks.append(asyncio.create_task(_fetch_timestamp(level, timestamps)))

# match timestamps with logs
parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamps[log['blockNumber']])
for log in level_logs)
await self._process_level_events(parsed_level_logs, self.topics, sync_level)
await asyncio.gather(*tasks)

parsed_level_logs = tuple(
EvmNodeLogData.from_json(
log,
timestamps[int(log['blockNumber'], 16)],
)
for log in level_logs
)

await self._process_level_events(parsed_level_logs, sync_level)
if self._config.expose_metrics:
Metrics.set_sqd_processor_last_block(level)

batch_first_level = batch_last_level + 1
else:
sync_level = min(sync_level, subsquid_sync_level)
fetcher = self._create_fetcher(first_level, sync_level)
Expand Down
5 changes: 1 addition & 4 deletions src/dipdup/models/tezos_tzkt.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,14 +562,11 @@ class TzktTokenBalanceData(HasLevel):
transfers_count: int
first_level: int
first_time: datetime
# level is not defined in tzkt balances data, so it is
# Level of the block where the token balance was last changed.
# NOTE: Level of the block where the token balance has been changed for the last time.
last_level: int
last_time: datetime
# owner account
account_address: str | None = None
account_alias: str | None = None
# token object
tzkt_token_id: int | None = None
contract_address: str | None = None
contract_alias: str | None = None
Expand Down
10 changes: 4 additions & 6 deletions src/dipdup/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
_logger = logging.getLogger(__name__)


# NOTE: All templates are stored in src/dipdup/projects
TEMPLATES: dict[str, tuple[str, ...]] = {
'evm': (
'demo_evm_events',
Expand Down Expand Up @@ -147,8 +148,6 @@ def answers_from_terminal() -> Answers:
options.append(_answers['template'])
comments.append(_answers['description'])

# list of options can contain folder name of template or folder name of template with description
# all project templates are in src/dipdup/projects
_, template = prompt_anyof(
'Choose a project template:',
options=tuple(options),
Expand Down Expand Up @@ -183,7 +182,6 @@ def answers_from_terminal() -> Answers:
value=answers['description'],
)

# define author and license for new indexer
answers['license'] = survey.routines.input(
'Enter project license (DipDup itself is MIT-licensed.): ',
value=answers['license'],
Expand Down Expand Up @@ -266,17 +264,17 @@ def render_base(
include: set[str] | None = None,
) -> None:
"""Render base from template"""
# NOTE: Common base
# NOTE: Render common base
_render_templates(
answers=answers,
path=Path('base'),
force=force,
include=include,
exists=True,
)

# NOTE: Don't forget to update replay.yaml with new values
_render(
answers,
answers=answers,
template_path=Path(__file__).parent / 'templates' / 'replay.yaml.j2',
output_path=Path('configs') / 'replay.yaml',
force=force,
Expand Down
10 changes: 5 additions & 5 deletions src/dipdup/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
)
_http_errors_in_row = Histogram(
'dipdup_http_errors_in_row',
"""The number of consecutive failed requests"""
'Number of consecutive failed requests',
)
_callback_duration = Histogram(
'dipdup_callback_duration_seconds',
Expand All @@ -73,15 +73,15 @@

_sqd_processor_last_block = Gauge(
'sqd_processor_last_block',
'The last processed block',
'Level of the last processed block from Subsquid Archives',
)
_sqd_processor_chain_height = Gauge(
'sqd_processor_chain_height',
'Current chain height as reported by the archive',
'Current chain height as reported by Subsquid Archives',
)
_sqd_processor_archive_http_errors_in_row = Histogram(
'sqd_processor_archive_http_errors_in_row',
"""The number of consecutive failed Archive requests"""
'Number of consecutive failed requests to Subsquid Archives',
)


Expand Down Expand Up @@ -154,7 +154,7 @@ def set_levels_to_realtime(cls, index: str, levels: int) -> None:
@classmethod
def set_sqd_processor_last_block(cls, last_block: int) -> None:
_sqd_processor_last_block.set(last_block)

@classmethod
def set_sqd_processor_chain_height(cls, chain_height: int) -> None:
_sqd_processor_chain_height.set(chain_height)
Expand Down