Skip to content

Commit

Permalink
Refactor tezos.operations index tech debt
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Aug 5, 2024
1 parent 7e34379 commit edda84f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 80 deletions.
18 changes: 9 additions & 9 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,20 @@ async def _process_level_data(
return

started_at = time.time()
batch_handlers = (
MatchedHandler(
index=self,
level=batch_level,
config=handler_config,
args=data if isinstance(data, Iterable) else (data,),
)
for handler_config, data in matched_handlers
)
async with self._ctx.transactions.in_transaction(
level=batch_level,
sync_level=sync_level,
index=self.name,
):
batch_handlers = (
MatchedHandler(
index=self,
level=batch_level,
config=handler_config,
args=data if isinstance(data, Iterable) else (data,),
)
for handler_config, data in matched_handlers
)
await self._ctx.fire_handler(
name='batch',
index=self._config.name,
Expand Down
79 changes: 13 additions & 66 deletions src/dipdup/indexes/tezos_operations/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,33 +187,10 @@ async def get_filters(self) -> tuple[set[str], set[str], set[int]]:

return self._entrypoint_filter, self._address_filter, self._code_hash_filter

# FIXME: Use method from Index
async def _process_queue(self) -> None:
"""Process WebSocket queue"""
self._logger.debug('Processing %s realtime messages from queue', len(self._queue))

while self._queue:
message = self._queue.popleft()
if isinstance(message, RollbackMessage):
await self._rollback(message.from_level, message.to_level)
continue

messages_left = len(self._queue)
Metrics.set_levels_to_realtime(self._config.name, messages_left)

message_level = message[0].operations[0].level

if message_level <= self.state.level:
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
continue

await self._process_level_operations(message, message_level)

else:
Metrics.set_levels_to_realtime(self._config.name, 0)

async def _create_fetcher(
self, first_level: int, sync_level: int
self,
first_level: int,
sync_level: int,
) -> OperationsFetcher | OperationsUnfilteredFetcher:
if isinstance(self._config, TezosOperationsIndexConfig):
return await OperationsFetcher.create(
Expand Down Expand Up @@ -255,27 +232,17 @@ async def _synchronize(self, sync_level: int) -> None:
)
if operation_subgroups:
self._logger.debug('Processing operations of level %s', level)
await self._process_level_operations(operation_subgroups, sync_level)
await self._process_level_data(operation_subgroups, sync_level)

await self._exit_sync_state(sync_level)

# FIXME: Use method from Index
async def _process_level_operations(
def _match_level_data(
self,
operation_subgroups: tuple[OperationSubgroup, ...],
sync_level: int,
) -> None:
if not operation_subgroups:
return

batch_level = operation_subgroups[0].operations[0].level
index_level = self.state.level
if batch_level <= index_level:
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')

self._logger.debug('Processing %s operation subgroups of level %s', len(operation_subgroups), batch_level)
handlers: Iterable[TezosOperationsHandlerConfig],
level_data: Iterable[OperationSubgroup],
) -> deque[Any]:
matched_handlers: deque[MatchedOperationsT] = deque()
for operation_subgroup in operation_subgroups:
for operation_subgroup in level_data:
metrics.objects_indexed += len(operation_subgroup.operations)
if isinstance(self._config, TezosOperationsUnfilteredIndexConfig):
subgroup_handlers = match_operation_unfiltered_subgroup(
Expand All @@ -285,37 +252,17 @@ async def _process_level_operations(
else:
subgroup_handlers = match_operation_subgroup(
self._ctx.package,
handlers=self._config.handlers,
handlers=handlers,
operation_subgroup=operation_subgroup,
alt=self._ctx.config.advanced.alt_operation_matcher,
)

if subgroup_handlers:
self._logger.debug(
'%s: `%s` handler matched!',
'%s: %s handlers matched!',
operation_subgroup.hash,
subgroup_handlers[0][1].callback,
len(subgroup_handlers),
)
matched_handlers += subgroup_handlers

Metrics.set_index_handlers_matched(len(matched_handlers))

# NOTE: We still need to bump index level but don't care if it will be done in existing transaction
if not matched_handlers:
await self._update_state(level=batch_level)
metrics.levels_nonempty += 1
return

async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name):
for _operation_subgroup, handler_config, args in matched_handlers:
await self._ctx.fire_handler(
name=handler_config.callback,
index=handler_config.parent.name,
args=args,
)
await self._update_state(level=batch_level)
metrics.levels_nonempty += 1

# FIXME: Use method from Index
def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]:
raise NotImplementedError
return matched_handlers
17 changes: 12 additions & 5 deletions src/dipdup/indexes/tezos_operations/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class OperationSubgroup:
counter: int
operations: tuple[TezosOperationData, ...]

@property
def level(self) -> int:
return self.operations[0].level

def __len__(self) -> int:
return len(self.operations)


TezosOperationsHandlerArgumentU = (
TezosTransaction[Any, Any]
Expand All @@ -54,7 +61,7 @@ class OperationSubgroup:
| TezosOperationData
| None
)
MatchedOperationsT = tuple[OperationSubgroup, TezosOperationsHandlerConfigU, deque[TezosOperationsHandlerArgumentU]]
MatchedOperationsT = tuple[TezosOperationsHandlerConfigU, deque[TezosOperationsHandlerArgumentU]]


def prepare_operation_handler_args(
Expand Down Expand Up @@ -194,7 +201,7 @@ def match_operation_unfiltered_subgroup(

for operation in operation_subgroup.operations:
if TezosOperationType[operation.type] in index.types:
matched_handlers.append((operation_subgroup, index.handlers[0], deque([operation])))
matched_handlers.append((index.handlers[0], deque([operation])))

return matched_handlers

Expand Down Expand Up @@ -249,7 +256,7 @@ def match_operation_subgroup(
_logger.debug('%s: `%s` handler matched!', operation_subgroup.hash, handler_config.callback)

args = prepare_operation_handler_args(package, handler_config, matched_operations)
matched_handlers.append((operation_subgroup, handler_config, args))
matched_handlers.append((handler_config, args))

matched_operations.clear()
pattern_index = 0
Expand All @@ -258,7 +265,7 @@ def match_operation_subgroup(
_logger.debug('%s: `%s` handler matched!', operation_subgroup.hash, handler_config.callback)

args = prepare_operation_handler_args(package, handler_config, matched_operations)
matched_handlers.append((operation_subgroup, handler_config, args))
matched_handlers.append((handler_config, args))

if not (alt and len(matched_handlers) in (0, 1)):
return matched_handlers
Expand All @@ -267,7 +274,7 @@ def match_operation_subgroup(
index_list = list(range(len(matched_handlers)))
id_list = []
for handler in matched_handlers:
last_operation = handler[2][-1]
last_operation = handler[1][-1]
if isinstance(last_operation, TezosOperationData):
id_list.append(last_operation.id)
elif isinstance(last_operation, TezosOrigination):
Expand Down

0 comments on commit edda84f

Please sign in to comment.