Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#748 High memory usage in indexer on gathering txs from Solana and parsing txs #751

Merged
merged 11 commits into from
Apr 17, 2022
25 changes: 21 additions & 4 deletions proxy/common_neon/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import Dict, Any
from typing import Dict, Any, Optional

import json
import base58
Expand All @@ -12,11 +12,17 @@
from ..eth_proto import Trx as EthTx


def str_fmt_object(obj):
def str_fmt_object(obj) -> str:
def lookup(o) -> Optional[Dict]:
if hasattr(o, '__str_dict__'):
return o.__str_dict__()
elif hasattr(o, '__dict__'):
return o.__dict__
return None

name = f'{type(obj)}'
name = name[name.rfind('.') + 1:-2]
lookup = lambda o: o.__dict__ if hasattr(o, '__dict__') else None
members = {json.dumps(obj, skipkeys=True, default=lookup, sort_keys=True)}
members = json.dumps(obj, skipkeys=True, default=lookup, sort_keys=True)
return f'{name}: {members}'


Expand All @@ -33,6 +39,17 @@ def __init__(self, slot: int, is_finalized=False, hash=None, parent_hash=None, t
def __str__(self) -> str:
return str_fmt_object(self)

def __str_dict__(self) -> Dict:
return {
'slot': self.slot,
'is_finalized': self.is_finalized,
'is_fake': self.is_fake,
'hash': self.hash,
'parent_hash': self.parent_hash,
'time': self.time,
'len(signs)': len(self.signs)
}

def __getstate__(self) -> Dict:
return self.__dict__

Expand Down
3 changes: 3 additions & 0 deletions proxy/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
CONFIRM_TIMEOUT = max(int(os.environ.get("CONFIRM_TIMEOUT", 10)), 10)
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
INDEXER_POLL_COUNT = int(os.environ.get("INDEXER_POLL_COUNT", "1000"))
START_SLOT = os.environ.get('START_SLOT', 0)
INDEXER_RECEIPTS_COUNT_LIMIT = int(os.environ.get("INDEXER_RECEIPTS_COUNT_LIMIT", "1000"))
FINALIZED = os.environ.get('FINALIZED', 'finalized')
CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))
HOLDER_TIMEOUT = int(os.environ.get("HOLDER_TIMEOUT", "216000")) # 1 day by default
ACCOUNT_PERMISSION_UPDATE_INT = int(os.environ.get("ACCOUNT_PERMISSION_UPDATE_INT", 60 * 5))
PERM_ACCOUNT_LIMIT = max(int(os.environ.get("PERM_ACCOUNT_LIMIT", 2)), 2)
OPERATOR_FEE = Decimal(os.environ.get("OPERATOR_FEE", "0.1"))
Expand Down
88 changes: 71 additions & 17 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import copy
from typing import Iterator, Optional
from typing import Iterator, Optional, Dict

import base58
import time
Expand All @@ -19,7 +19,7 @@
from ..common_neon.solana_interactor import SolanaInteractor
from ..common_neon.solana_receipt_parser import SolReceiptParser

from ..environment import EVM_LOADER_ID, FINALIZED, CANCEL_TIMEOUT
from ..environment import EVM_LOADER_ID, FINALIZED, CANCEL_TIMEOUT, HOLDER_TIMEOUT


@logged_group("neon.Indexer")
Expand Down Expand Up @@ -128,9 +128,20 @@ def __init__(self, account: str):
self.count_written = 0
self.max_written = 0

def __str__(self):
def __str__(self) -> str:
return str_fmt_object(self)

def __str_dict__(self) -> Dict:
return {
'len(used_ixs)': len(self.used_ixs),
'len(ixs_cost)': len(self.ixs_cost),
'slot': self.slot,
'account': str(self.account),
'len(self.data)': len(self.data),
'count_written': self.count_written,
'max_written': self.max_written
}


class NeonTxResult(BaseEvmObject):
def __init__(self, storage_account: str, neon_tx: NeonTxInfo, neon_res: NeonTxResultInfo):
Expand All @@ -146,6 +157,19 @@ def __init__(self, storage_account: str, neon_tx: NeonTxInfo, neon_res: NeonTxRe
def __str__(self):
return str_fmt_object(self)

def __str_dict__(self) -> Dict:
return {
'len(used_ixs)': len(self.used_ixs),
'len(ixs_cost)': len(self.ixs_cost),
'slot': self.slot,
'storage_account': str(self.storage_account),
'holder_account': str(self.holder_account),
'neon_tx': self.neon_tx,
'neon_res': self.neon_res,
'len(blocked_accounts)': len(self.blocked_accounts),
'canceled': self.canceled,
'done': self.done
}

@logged_group("neon.Indexer")
class ReceiptsParserState:
Expand Down Expand Up @@ -184,6 +208,7 @@ def __init__(self, db: IndexerDB, solana: SolanaInteractor):
self._holder_table = {}
self._tx_table = {}
self._done_tx_list = []
self._done_holder_list = []
self.neon_tx_result = []
self._used_ixs = {}
self.ix = SolanaIxInfo(sign='', slot=-1, tx=None)
Expand All @@ -203,7 +228,7 @@ def unmark_ix_used(self, obj: BaseEvmObject):
if self._used_ixs[ix] == 0:
del self._used_ixs[ix]

def find_min_used_slot(self, min_slot):
def find_min_used_slot(self, min_slot) -> int:
for ix in self._used_ixs:
min_slot = min(min_slot, ix.slot)
return min_slot
Expand Down Expand Up @@ -246,7 +271,10 @@ def done_tx(self, tx: NeonTxResult):
tx.done = True
self._done_tx_list.append(tx)

def complete_done_txs(self):
def done_holder(self, holder: NeonHolderObject):
self._done_holder_list.append(holder)

def complete_done_objects(self, min_used_slot: int):
"""
Slot is done, store all done neon txs into the DB.
"""
Expand All @@ -259,19 +287,29 @@ def complete_done_txs(self):
self.del_tx(tx)
self._done_tx_list.clear()

for holder in self._done_holder_list:
self.unmark_ix_used(holder)
self.del_holder(holder)
self._done_holder_list.clear()

holders = len(self._holder_table)
transactions = len(self._tx_table)
used_ixs = len(self._used_ixs)
if holders > 0 or transactions > 0 or used_ixs > 0:
if ((holders > 0) or (transactions > 0) or (used_ixs > 0)) and (min_used_slot != 0):
self.debug('Receipt state stats: ' +
f'holders {holders}, ' +
f'transactions {transactions}, ' +
f'used ixs {used_ixs}')
f'holders {holders}, ' +
f'transactions {transactions}, ' +
f'used ixs {used_ixs}, ' +
f'min_used_slot {min_used_slot}')

def iter_txs(self):
def iter_txs(self) -> Iterator[NeonTxResult]:
for tx in self._tx_table.values():
yield tx

def iter_holders(self) -> Iterator[NeonHolderObject]:
for holder in self._holder_table.values():
yield holder

def add_account_to_db(self, neon_account: NeonAccountInfo):
self._db.fill_account_info_by_indexer(neon_account)

Expand Down Expand Up @@ -759,6 +797,7 @@ def __init__(self, solana_url, indexer_user: IIndexerUser):
last_known_slot = self.db.get_min_receipt_slot()
IndexerBase.__init__(self, solana, last_known_slot)
self.indexed_slot = self.last_slot
self.min_used_slot = 0
self.canceller = Canceller(solana)
self.blocked_storages = {}
self.block_indexer = BlocksIndexer(db=self.db, solana=solana)
Expand Down Expand Up @@ -817,7 +856,7 @@ def process_receipts(self):
break

if max_slot != slot:
self.state.complete_done_txs()
self.state.complete_done_objects(self.min_used_slot)
max_slot = max(max_slot, slot)

ix_info = SolanaIxInfo(sign=sign, slot=slot, tx=tx)
Expand All @@ -830,8 +869,10 @@ def process_receipts(self):

tx_costs.append(ix_info.cost_info)

self.indexed_slot = last_block_slot
self.db.set_min_receipt_slot(self.state.find_min_used_slot(self.indexed_slot))
if max_slot:
self.indexed_slot = max_slot + 1
self.min_used_slot = self.state.find_min_used_slot(self.indexed_slot)
self.db.set_min_receipt_slot(self.state.find_min_used_slot(self.indexed_slot))

# cancel transactions with long inactive time
for tx in self.state.iter_txs():
Expand All @@ -840,15 +881,28 @@ def process_receipts(self):
tx.neon_res.slot = self.indexed_slot
self.state.done_tx(tx)

# remove old holders with long inactive time
for holder in self.state.iter_holders():
if abs(holder.slot - self.current_slot) > HOLDER_TIMEOUT:
self.state.done_holder(holder)

# after last instruction and slot
self.state.complete_done_txs()
self.db.add_tx_costs(tx_costs)
self.state.complete_done_objects(self.min_used_slot)

if max_slot:
self.db.add_tx_costs(tx_costs)

process_receipts_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
self.counted_logger.print(
self.debug,
list_params={"process_receipts_ms": process_receipts_ms, "processed_slots": self.current_slot - self.indexed_slot},
latest_params={"transaction_receipts.len": self.transaction_receipts.size(), "indexed_slot": self.indexed_slot}
list_params={
"process_receipts_ms": process_receipts_ms,
"processed_slots": self.current_slot - self.indexed_slot
},
latest_params={
"transaction_receipts.len": self.transaction_receipts.size(),
"indexed_slot": self.indexed_slot
}
)

def unlock_accounts(self, tx) -> bool:
Expand Down
19 changes: 14 additions & 5 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group
from typing import Optional
from typing import List, Optional, Tuple

from .trx_receipts_storage import TxReceiptsStorage
from .utils import MetricsToLogBuff
from ..common_neon.solana_interactor import SolanaInteractor
from ..indexer.sql_dict import SQLDict

from ..environment import RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
from ..environment import INDEXER_POLL_COUNT, RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
from ..environment import HISTORY_START, PARALLEL_REQUESTS, FINALIZED, EVM_LOADER_ID


Expand Down Expand Up @@ -138,10 +138,14 @@ def gather_unknown_transactions(self):

if not self.transaction_receipts.contains(slot, sol_sign):
poll_txs.append((sol_sign, slot, tx_idx))

if len(poll_txs) >= INDEXER_POLL_COUNT:
self._get_txs(poll_txs)

tx_idx += 1

pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
if len(poll_txs) > 0:
self._get_txs(poll_txs)

self.current_slot = current_slot
self.counter_ = 0
Expand All @@ -162,7 +166,12 @@ def _get_signatures(self, before: Optional[str], limit: int) -> []:
self.warning(f'Fail to get signatures: {error}')
return result

def _get_tx_receipts(self, param):
def _get_txs(self, poll_txs: List[Tuple[str, int, int]]) -> None:
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()

def _get_tx_receipts(self, param: Tuple[str, int, int]) -> None:
# tx = None
retry = RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION

Expand Down
28 changes: 26 additions & 2 deletions proxy/indexer/trx_receipts_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from proxy.environment import INDEXER_RECEIPTS_COUNT_LIMIT
from proxy.indexer.pg_common import encode, decode
from proxy.indexer.base_db import BaseDB

Expand Down Expand Up @@ -44,8 +45,31 @@ def contains(self, slot, signature):

def get_txs(self, start_slot=0):
with self._conn.cursor() as cur:
cur.execute(f'SELECT slot, signature, tx FROM {self._table_name}' +
f' WHERE slot >= {start_slot} ORDER BY slot ASC, tx_idx DESC')
cur.execute(f'SELECT MIN(slot) FROM {self._table_name} WHERE slot > %s', (start_slot,))
min_slot_row = cur.fetchone()
min_slot = (min_slot_row[0] if min_slot_row and min_slot_row[0] else 0)

cur.execute(f'''
SELECT MAX(t.slot) FROM (
SELECT slot FROM {self._table_name}
WHERE slot > %s
ORDER BY slot
LIMIT {INDEXER_RECEIPTS_COUNT_LIMIT}
) AS t
''',
(start_slot,))
limit_slot_row = cur.fetchone()
limit_slot = (limit_slot_row[0] if limit_slot_row and limit_slot_row[0] else 0)

stop_slot = max(min_slot, limit_slot, start_slot + 1)

cur.execute(f'''
SELECT slot, signature, tx FROM {self._table_name}
WHERE slot >= %s AND slot <= %s
ORDER BY slot ASC, tx_idx DESC
''',
(start_slot, stop_slot,))
rows = cur.fetchall()

for row in rows:
yield int(row[0]), row[1], decode(row[2])