Skip to content

Commit

Permalink
#673 prometheus metrics in indexer (#689)
Browse files Browse the repository at this point in the history
Co-authored-by: rozhkovdmitrii <[email protected]>
  • Loading branch information
2 people authored and afalaleev committed Apr 13, 2022
1 parent f1f15bc commit 7b08462
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 51 deletions.
2 changes: 1 addition & 1 deletion proxy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from .proxy import entry_point
import os
from .indexer.indexer import run_indexer
from .indexer.indexer_app import run_indexer


if __name__ == '__main__':
Expand Down
12 changes: 12 additions & 0 deletions proxy/common_neon/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@


class NeonTxStatData:
def __init__(self, neon_tx_hash: str, neon_income: int, tx_type: str, is_canceled: bool) -> None:
self.neon_tx_hash = neon_tx_hash
self.neon_income = neon_income
self.tx_type = tx_type
self.is_canceled = is_canceled
self.instructions = []

def add_instruction(self, sol_tx_hash: str, sol_spent: int, steps: int, bpf: int) -> None:
self.instructions.append((sol_tx_hash, sol_spent, steps, bpf))
4 changes: 4 additions & 0 deletions proxy/common_neon/solana_interactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ def get_slots_behind(self) -> Optional[int]:
return int(slots_behind)
return None

def is_healthy(self) -> bool:
status = self._send_rpc_request('getHealth').get('result', 'bad')
return status == 'ok'

def get_signatures_for_address(self, before: Optional[str], limit: int, commitment='confirmed') -> []:
opts: Dict[str, Union[int, str]] = {}
if before is not None:
Expand Down
6 changes: 4 additions & 2 deletions proxy/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ services:
condition: service_completed_successfully
dbcreation:
condition: service_completed_successfully
expose:
- "8887"
networks:
- net
entrypoint: proxy/run-indexer.sh
Expand All @@ -192,9 +194,9 @@ services:
image: prom/prometheus
privileged: true
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- ./statistics_exporter/config/prometheus-config.yml:/etc/prometheus/prometheus-config.yml:ro
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--config.file=/etc/prometheus/prometheus-config.yml'
ports:
- 127.0.0.1:9009:9090
expose:
Expand Down
8 changes: 8 additions & 0 deletions proxy/indexer/base_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,11 @@ def decode_list(self, v):

def encode_list(self, v: []):
return None if (not v) or (len(v) == 0) else encode(v)

def is_connected(self) -> bool:
try:
cur = self._conn.cursor()
cur.execute('SELECT 1')
return True
except psycopg2.OperationalError:
return False
18 changes: 18 additions & 0 deletions proxy/indexer/i_inidexer_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from abc import ABC, abstractmethod

from ..common_neon.data import NeonTxStatData


class IIndexerUser(ABC):

@abstractmethod
def on_neon_tx_result(self, result: NeonTxStatData):
"""On Neon transaction result """

@abstractmethod
def on_solana_rpc_status(self, status):
"""On Solana status"""

@abstractmethod
def on_db_status(self, status):
"""On Neon database status"""
88 changes: 53 additions & 35 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Optional
import copy
from typing import Iterator, Optional

import base58
import time
from logged_groups import logged_group, logging_context
from solana.system_program import SYS_PROGRAM_ID

from ..common_neon.data import NeonTxStatData
from ..indexer.i_inidexer_user import IIndexerUser
from ..indexer.accounts_db import NeonAccountInfo
from ..indexer.indexer_base import IndexerBase
from ..indexer.indexer_db import IndexerDB
Expand All @@ -16,7 +19,7 @@
from ..common_neon.solana_interactor import SolanaInteractor
from ..common_neon.solana_receipt_parser import SolReceiptParser

from ..environment import EVM_LOADER_ID, FINALIZED, CANCEL_TIMEOUT, SOLANA_URL
from ..environment import EVM_LOADER_ID, FINALIZED, CANCEL_TIMEOUT


@logged_group("neon.Indexer")
Expand Down Expand Up @@ -101,15 +104,19 @@ def get_account_list(self, start: int) -> [str]:
class BaseEvmObject:
def __init__(self):
self.used_ixs = []
self.ixs_cost = []
self.slot = 0

def mark_ix_used(self, ix_info: SolanaIxInfo):
self.used_ixs.append(ix_info.sign)
self.ixs_cost.append(ix_info.cost_info)
self.slot = max(self.slot, ix_info.sign.slot)

def move_ix_used(self, obj):
self.used_ixs += obj.used_ixs
self.ixs_cost += obj.ixs_cost
obj.used_ixs.clear()
obj.ixs_cost.clear()
self.slot = max(self.slot, obj.slot)


Expand All @@ -125,13 +132,12 @@ def __str__(self):
return str_fmt_object(self)


class NeonTxObject(BaseEvmObject):
class NeonTxResult(BaseEvmObject):
def __init__(self, storage_account: str, neon_tx: NeonTxInfo, neon_res: NeonTxResultInfo):
BaseEvmObject.__init__(self)
self.storage_account = storage_account
self.neon_tx = (neon_tx or NeonTxInfo())
self.neon_res = (neon_res or NeonTxResultInfo())
self.step_count = []
self.holder_account = ''
self.blocked_accounts = []
self.canceled = False
Expand Down Expand Up @@ -178,6 +184,7 @@ def __init__(self, db: IndexerDB, solana: SolanaInteractor):
self._holder_table = {}
self._tx_table = {}
self._done_tx_list = []
self.neon_tx_result = []
self._used_ixs = {}
self.ix = SolanaIxInfo(sign='', slot=-1, tx=None)

Expand Down Expand Up @@ -215,21 +222,21 @@ def add_holder(self, account: str) -> NeonHolderObject:
def del_holder(self, holder: NeonHolderObject):
self._holder_table.pop(holder.account, None)

def get_tx(self, storage_account: str) -> Optional[NeonTxObject]:
def get_tx(self, storage_account: str) -> Optional[NeonTxResult]:
return self._tx_table.get(storage_account)

def add_tx(self, storage_account: str, neon_tx=None, neon_res=None) -> NeonTxObject:
def add_tx(self, storage_account: str, neon_tx=None, neon_res=None) -> NeonTxResult:
if storage_account in self._tx_table:
self.debug(f'{self.ix} ATTENTION: the tx {storage_account} is already used!')

tx = NeonTxObject(storage_account=storage_account, neon_tx=neon_tx, neon_res=neon_res)
tx = NeonTxResult(storage_account=storage_account, neon_tx=neon_tx, neon_res=neon_res)
self._tx_table[storage_account] = tx
return tx

def del_tx(self, tx: NeonTxObject):
def del_tx(self, tx: NeonTxResult):
self._tx_table.pop(tx.storage_account, None)

def done_tx(self, tx: NeonTxObject):
def done_tx(self, tx: NeonTxResult):
"""
Continue waiting of ixs in the slot with the same neon tx,
because the parsing order can be other than the execution order.
Expand All @@ -248,6 +255,7 @@ def complete_done_txs(self):
if tx.neon_tx.is_valid() and tx.neon_res.is_valid():
with logging_context(neon_tx=tx.neon_tx.sign[:7]):
self._db.submit_transaction(tx.neon_tx, tx.neon_res, tx.used_ixs)
self.neon_tx_result.append(copy.deepcopy(tx))
self.del_tx(tx)
self._done_tx_list.clear()

Expand All @@ -267,6 +275,11 @@ def iter_txs(self):
def add_account_to_db(self, neon_account: NeonAccountInfo):
self._db.fill_account_info_by_indexer(neon_account)

def iter_neon_tx_results(self) -> Iterator[NeonTxResult]:
for tx in self.neon_tx_result:
yield tx
self.neon_tx_result.clear()


@logged_group("neon.Indexer")
class DummyIxDecoder:
Expand All @@ -281,7 +294,7 @@ def __str__(self):
def neon_addr_fmt(neon_tx: NeonTxInfo):
return f'Neon tx {neon_tx.sign}, Neon addr {neon_tx.addr}'

def _getadd_tx(self, storage_account, neon_tx=None, blocked_accounts=None) -> NeonTxObject:
def _getadd_tx(self, storage_account, neon_tx=None, blocked_accounts=None) -> NeonTxResult:
if blocked_accounts is None:
blocked_accounts = ['']
tx = self.state.get_tx(storage_account)
Expand Down Expand Up @@ -321,7 +334,7 @@ def _decoding_done(self, obj: BaseEvmObject, msg: str) -> bool:
"""
Assembling of the object has been successfully finished.
"""
if isinstance(obj, NeonTxObject):
if isinstance(obj, NeonTxResult):
self.state.mark_ix_used(obj)
self.state.done_tx(obj)
elif isinstance(obj, NeonHolderObject):
Expand All @@ -348,15 +361,15 @@ def _decoding_fail(self, obj: BaseEvmObject, reason: str) -> bool:
self.warning(f'{reason} - {obj}')
self.state.unmark_ix_used(obj)

if isinstance(obj, NeonTxObject):
if isinstance(obj, NeonTxResult):
self.state.del_tx(obj)
elif isinstance(obj, NeonHolderObject):
self.state.del_holder(obj)
else:
assert False, 'Unknown type of object'
return False

def _decode_tx(self, tx: NeonTxObject):
def _decode_tx(self, tx: NeonTxResult):
"""
If the transaction doesn't have results, then try to get results for the transaction.
If the transaction has received results, then call done for the transaction.
Expand All @@ -369,7 +382,7 @@ def _decode_tx(self, tx: NeonTxObject):
return self._decoding_done(tx, 'found Neon results')
return self._decoding_success(tx, 'mark ix used')

def _init_tx_from_holder(self, holder_account: str, storage_account: str, blocked_accounts: [str]) -> Optional[NeonTxObject]:
def _init_tx_from_holder(self, holder_account: str, storage_account: str, blocked_accounts: [str]) -> Optional[NeonTxResult]:
tx = self._getadd_tx(storage_account, blocked_accounts=blocked_accounts)
if tx.holder_account:
return tx
Expand Down Expand Up @@ -553,7 +566,7 @@ def execute(self) -> bool:
return self._decoding_skip(f'Neon tx rlp error "{neon_tx.error}"')

neon_res = NeonTxResultInfo(neon_tx.sign, self.ix.tx, self.ix.sign.idx)
tx = NeonTxObject('', neon_tx=neon_tx, neon_res=neon_res)
tx = NeonTxResult('', neon_tx=neon_tx, neon_res=neon_res)

return self._decoding_done(tx, 'call success')

Expand Down Expand Up @@ -583,7 +596,6 @@ def execute(self) -> bool:
return self._decoding_skip(f'Neon tx rlp error "{neon_tx.error}"')

tx = self._getadd_tx(storage_account, neon_tx=neon_tx, blocked_accounts=blocked_accounts)
tx.step_count.append(step_count)

self.ix.sign.set_steps(step_count)
return self._decode_tx(tx)
Expand Down Expand Up @@ -617,7 +629,6 @@ def execute(self) -> bool:
step_count = int.from_bytes(self.ix.ix_data[5:13], 'little')

tx = self._getadd_tx(storage_account, blocked_accounts=blocked_accounts)
tx.step_count.append(step_count)

self.ix.sign.set_steps(step_count)
return self._decode_tx(tx)
Expand Down Expand Up @@ -648,7 +659,6 @@ def execute(self) -> bool:
tx = self._init_tx_from_holder(holder_account, storage_account, blocked_accounts)
if not tx:
return self._decoding_skip(f'fail to init in storage {storage_account} from holder {holder_account}')
tx.step_count.append(step_count)

self.ix.sign.set_steps(step_count)
return self._decode_tx(tx)
Expand Down Expand Up @@ -711,7 +721,6 @@ def execute(self) -> bool:
tx = self._init_tx_from_holder(holder_account, storage_account, blocked_accounts)
if not tx:
return self._decoding_skip(f'fail to init the storage {storage_account} from the holder {holder_account}')
tx.step_count.append(step_count)

self.ix.sign.set_steps(step_count)
return self._decode_tx(tx)
Expand All @@ -738,7 +747,7 @@ def gather_blocks(self):

@logged_group("neon.Indexer")
class Indexer(IndexerBase):
def __init__(self, solana_url):
def __init__(self, solana_url, indexer_user: IIndexerUser):
self.debug(f'Finalized commitment: {FINALIZED}')
solana = SolanaInteractor(solana_url)
self.db = IndexerDB(solana)
Expand All @@ -749,6 +758,7 @@ def __init__(self, solana_url):
self.blocked_storages = {}
self.block_indexer = BlocksIndexer(db=self.db, solana=solana)
self.counted_logger = MetricsToLogBuff()
self._user = indexer_user

self.state = ReceiptsParserState(db=self.db, solana=solana)
self.ix_decoder_map = {
Expand Down Expand Up @@ -785,6 +795,7 @@ def process_functions(self):
self.process_receipts()
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
self.process_neon_tx_results()

def process_receipts(self):
tx_costs = []
Expand Down Expand Up @@ -812,7 +823,6 @@ def process_receipts(self):

tx_costs.append(ix_info.cost_info)


self.indexed_slot = last_block_slot
self.db.set_min_receipt_slot(self.state.find_min_used_slot(self.indexed_slot))

Expand Down Expand Up @@ -882,17 +892,25 @@ def unlock_accounts(self, tx) -> bool:
tx.canceled = True
return True


@logged_group("neon.Indexer")
def run_indexer(solana_url, *, logger):
logger.info(f"""Running indexer with params:
solana_url: {solana_url},
evm_loader_id: {EVM_LOADER_ID}""")

indexer = Indexer(solana_url)
indexer.run()


if __name__ == "__main__":
solana_url = SOLANA_URL
run_indexer(solana_url)
def process_neon_tx_results(self):
for neon_tx_result in self.state.iter_neon_tx_results():
neon_tx_hash = neon_tx_result.neon_tx.sign
neon_income = int(neon_tx_result.neon_res.gas_used, 0) * int(neon_tx_result.neon_tx.gas_price, 0)
if neon_tx_result.holder_account != '':
tx_type = 'holder'
elif neon_tx_result.storage_account != '':
tx_type = 'iterative'
else:
tx_type = 'single'
is_canceled = neon_tx_result.neon_res.status == '0x0'
neon_tx_stat_data = NeonTxStatData(neon_tx_hash, neon_income, tx_type, is_canceled)
for sign_info, cost_info in zip(neon_tx_result.used_ixs, neon_tx_result.ixs_cost):
sol_tx_hash = sign_info.sign
sol_spent = cost_info.sol_spent
steps = sign_info.steps
bpf = cost_info.bpf
neon_tx_stat_data.add_instruction(sol_tx_hash, sol_spent, steps, bpf)

self._user.on_neon_tx_result(neon_tx_stat_data)
self._user.on_db_status(self.db.status())
self._user.on_solana_rpc_status(self.solana.is_healthy())
40 changes: 40 additions & 0 deletions proxy/indexer/indexer_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from logged_groups import logged_group

from ..environment import EVM_LOADER_ID, SOLANA_URL
from ..statistics_exporter.prometheus_indexer_exporter import IndexerStatistics
from ..common_neon.data import NeonTxStatData
from .indexer import Indexer
from .i_inidexer_user import IIndexerUser


@logged_group("neon.Indexer")
class IndexerApp(IIndexerUser):

def __init__(self, solana_url: str):
self.neon_statistics = IndexerStatistics()
indexer = Indexer(solana_url, self)
indexer.run()

def on_neon_tx_result(self, tx_stat: NeonTxStatData):
self.neon_statistics.on_neon_tx_result(tx_stat)


def on_db_status(self, neon_db_status: bool):
self.neon_statistics.stat_commit_postgres_availability(neon_db_status)

def on_solana_rpc_status(self, solana_status: bool):
self.neon_statistics.stat_commit_solana_rpc_health(solana_status)


@logged_group("neon.Indexer")
def run_indexer(solana_url, *, logger):
logger.info(f"""Running indexer with params:
solana_url: {solana_url},
evm_loader_id: {EVM_LOADER_ID}""")

IndexerApp(solana_url)


if __name__ == "__main__":
solana_url = SOLANA_URL
run_indexer(solana_url)
Loading

0 comments on commit 7b08462

Please sign in to comment.