Skip to content

Commit

Permalink
Minor indexes refactoring (#1035)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jun 18, 2024
1 parent eddb58e commit ca54d16
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 67 deletions.
12 changes: 10 additions & 2 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,20 @@ def realtime(self) -> bool:

def get_sync_level(self) -> int:
"""Get level index needs to be synchronized to depending on its subscription status"""
subs = self._config.get_subscriptions()
sync_levels = {d.get_sync_level(s) for s in subs for d in self.datasources}
sync_levels = set()
for sub in self._config.get_subscriptions():
for datasource in self._datasources:
if not isinstance(datasource, IndexDatasource):
continue
sync_levels.add(datasource.get_sync_level(sub))

if not sync_levels:
raise FrameworkException('Initialize config before starting `IndexDispatcher`')
if None in sync_levels:
sync_levels.remove(None)
if not sync_levels:
raise FrameworkException('Call `set_sync_level` before starting `IndexDispatcher`')

# NOTE: Multiple sync levels means index with new subscriptions was added in runtime.
# NOTE: Choose the highest level; outdated realtime messages will be dropped from the queue anyway.
return max(cast(set[int], sync_levels))
Expand Down
21 changes: 0 additions & 21 deletions src/dipdup/indexes/evm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
from typing import TYPE_CHECKING
from typing import Generic
from typing import TypeVar
from typing import cast

from web3 import Web3

from dipdup.config import EvmIndexConfigU
from dipdup.config.evm import EvmContractConfig
from dipdup.datasources import IndexDatasource
from dipdup.datasources.evm_node import NODE_LAST_MILE
from dipdup.datasources.evm_node import EvmNodeDatasource
from dipdup.datasources.evm_subsquid import EvmSubsquidDatasource
from dipdup.exceptions import ConfigurationError
from dipdup.exceptions import FrameworkException
from dipdup.index import Index
from dipdup.index import IndexQueueItemT
from dipdup.models.subsquid import SubsquidMessageType
Expand Down Expand Up @@ -72,24 +69,6 @@ async def _synchronize_subsquid(self, sync_level: int) -> None: ...
@abstractmethod
async def _synchronize_node(self, sync_level: int) -> None: ...

def get_sync_level(self) -> int:
"""Get level index needs to be synchronized to depending on its subscription status"""
sync_levels = set()
for sub in self._config.get_subscriptions():
for datasource in self._datasources:
if not isinstance(datasource, IndexDatasource):
continue
sync_levels.add(datasource.get_sync_level(sub))

if None in sync_levels:
sync_levels.remove(None)
if not sync_levels:
raise FrameworkException('Initialize config before starting `IndexDispatcher`')

# NOTE: Multiple sync levels means index with new subscriptions was added in runtime.
# NOTE: Choose the highest level; outdated realtime messages will be dropped from the queue anyway.
return max(cast(set[int], sync_levels))

async def _get_node_sync_level(
self,
subsquid_level: int,
Expand Down
21 changes: 0 additions & 21 deletions src/dipdup/indexes/starknet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
from typing import TYPE_CHECKING
from typing import Generic
from typing import TypeVar
from typing import cast

from dipdup.config import StarknetIndexConfigU
from dipdup.datasources import IndexDatasource
from dipdup.datasources.starknet_subsquid import StarknetSubsquidDatasource
from dipdup.exceptions import ConfigurationError
from dipdup.exceptions import FrameworkException
from dipdup.index import Index
from dipdup.index import IndexQueueItemT
from dipdup.models.subsquid import SubsquidMessageType
Expand Down Expand Up @@ -41,24 +38,6 @@ def __init__(
@abstractmethod
async def _synchronize_subsquid(self, sync_level: int) -> None: ...

def get_sync_level(self) -> int:
"""Get level index needs to be synchronized to depending on its subscription status"""
sync_levels = set()
for sub in self._config.get_subscriptions():
for datasource in self._datasources:
if not isinstance(datasource, IndexDatasource):
continue
sync_levels.add(datasource.get_sync_level(sub))

if None in sync_levels:
sync_levels.remove(None)
if not sync_levels:
raise FrameworkException('Initialize config before starting `IndexDispatcher`')

# NOTE: Multiple sync levels means index with new subscriptions was added in runtime.
# NOTE: Choose the highest level; outdated realtime messages will be dropped from the queue anyway.
return max(cast(set[int], sync_levels))

async def _synchronize(self, sync_level: int) -> None:
"""Fetch event logs via Fetcher and pass to message callback"""
index_level = await self._enter_sync_state(sync_level)
Expand Down
10 changes: 5 additions & 5 deletions src/dipdup/indexes/starknet_events/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ def __init__(
datasources: tuple[StarknetSubsquidDatasource, ...],
first_level: int,
last_level: int,
event_ids: list[str],
event_ids: set[str],
) -> None:
super().__init__(datasources, first_level, last_level)
self._event_ids = event_ids

async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[StarknetEventData, ...]]]:
# TODO: probably add from_address filter on this level?
# TODO: Probably add `from_address` filter there?
# key0 contains the event identifier
event_iter = self.random_datasource.iter_events(
self._first_level,
self._last_level,
({'key0': self._event_ids},),
first_level=self._first_level,
last_level=self._last_level,
filters=({'key0': list(self._event_ids)},),
)
async for level, batch in readahead_by_level(event_iter, limit=STARKNET_SUBSQUID_READAHEAD_LIMIT):
yield level, batch
35 changes: 17 additions & 18 deletions src/dipdup/indexes/starknet_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,18 @@ def __init__(
datasources: tuple[StarknetDatasource, ...],
) -> None:
super().__init__(ctx, config, datasources)
self._event_identifiers_dict: dict[str, dict[str, str]] | None = None
self._event_identifiers_list: list[str] | None = None
self._event_identifiers: dict[str, dict[str, str]] | None = None

@property
def event_identifiers_dict(self) -> dict[str, dict[str, str]]:
if self._event_identifiers_dict is None:
self._event_identifiers_dict = {}
def event_identifiers(self) -> dict[str, dict[str, str]]:
if self._event_identifiers is None:
self._event_identifiers = {}
for handler_config in self._config.handlers:
typename = handler_config.contract.module_name
event_abi = self._ctx.package.get_converted_starknet_abi(typename)['events']
self._event_identifiers_dict[typename] = {k: v['event_identifier'] for k, v in event_abi.items()}
self._event_identifiers[typename] = {k: v['event_identifier'] for k, v in event_abi.items()}

return self._event_identifiers_dict

@property
def event_identifiers_list(self) -> list[str]:
if self._event_identifiers_list is None:
result = set()
for _, map_ in self.event_identifiers_dict.items():
for _, identifier in map_.items():
result.add(identifier)
return list(result)
return self._event_identifiers

async def _synchronize_subsquid(self, sync_level: int) -> None:
first_level = self.state.level + 1
Expand All @@ -67,7 +57,11 @@ async def _synchronize_subsquid(self, sync_level: int) -> None:
Metrics.set_sqd_processor_last_block(_level)

def _create_subsquid_fetcher(self, first_level: int, last_level: int) -> StarknetSubsquidEventFetcher:
event_ids = self.event_identifiers_list
event_ids = set()
for map_ in self.event_identifiers.values():
for identifier in map_.values():
event_ids.add(identifier)

return StarknetSubsquidEventFetcher(
datasources=self.subsquid_datasources,
first_level=first_level,
Expand All @@ -80,7 +74,12 @@ def _match_level_data(
handlers: tuple[StarknetEventsHandlerConfig, ...],
level_data: Iterable[StarknetEventData],
) -> deque[Any]:
return match_events(self._ctx.package, handlers, level_data, self.event_identifiers_dict)
return match_events(
package=self._ctx.package,
handlers=handlers,
events=level_data,
event_identifiers=self.event_identifiers,
)

async def _call_matched_handler(
self,
Expand Down

0 comments on commit ca54d16

Please sign in to comment.