Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#764 Decrease the storage usage for Solana receipts #769

Merged
merged 13 commits into from
May 20, 2022
3 changes: 1 addition & 2 deletions proxy/airdropper/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,13 @@ def process_functions(self):
"""
Overrides IndexerBase.process_functions
"""
IndexerBase.process_functions(self)
self.debug("Process receipts")
self.process_receipts()
self.process_scheduled_trxs()

def process_receipts(self):
max_slot = 0
for slot, _, trx in self.transaction_receipts.get_txs(self.latest_processed_slot):
for slot, _, trx in self.get_tx_receipts():
max_slot = max(max_slot, slot)
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
Expand Down
17 changes: 3 additions & 14 deletions proxy/db/scheme.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,7 @@
neon_income BIGINT
);

CREATE TABLE IF NOT EXISTS solana_transaction_receipts (
slot BIGINT,
tx_idx INT,
signature VARCHAR(88),
tx BYTEA,
PRIMARY KEY (slot, signature)
);

CREATE TABLE IF NOT EXISTS test_storage (
slot BIGINT,
tx_idx INT,
signature VARCHAR(88),
tx BYTEA,
PRIMARY KEY (slot, signature)
CREATE TABLE IF NOT EXISTS solana_transaction_signatures (
slot BIGINT UNIQUE,
signature VARCHAR(88)
);
4 changes: 1 addition & 3 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ def __init__(self, solana_url, indexer_user: IIndexerUser):

def process_functions(self):
self.block_indexer.gather_blocks()
IndexerBase.process_functions(self)
self.process_receipts()
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
Expand All @@ -915,7 +914,7 @@ def process_receipts(self):
max_slot = 1
while max_slot > 0:
max_slot = 0
for slot, sign, tx in self.transaction_receipts.get_txs(self.indexed_slot, last_block_slot):
for slot, sign, tx in self.get_tx_receipts(last_block_slot):
max_slot = max(max_slot, slot)

ix_info = SolanaIxInfo(sign=sign, slot=slot, tx=tx)
Expand Down Expand Up @@ -955,7 +954,6 @@ def process_receipts(self):
"processed slots": self.indexed_slot - start_indexed_slot
},
latest_params={
"transaction receipts len": self.transaction_receipts.size(),
"indexed slot": self.indexed_slot,
"min used slot": self.min_used_slot
}
Expand Down
134 changes: 68 additions & 66 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group
from typing import List, Optional, Tuple
from typing import Dict, List, Optional, Union

from .trx_receipts_storage import TxReceiptsStorage
from .solana_signatures_db import SolanaSignatures
from .utils import MetricsToLogBuff
from ..common_neon.solana_interactor import SolanaInteractor
from ..indexer.sql_dict import SQLDict
Expand All @@ -20,18 +20,19 @@ def __init__(self,
solana: SolanaInteractor,
last_slot: int):
self.solana = solana
self.transaction_receipts = TxReceiptsStorage('solana_transaction_receipts')
self.solana_signatures = SolanaSignatures()
self.last_slot = self._init_last_slot('receipt', last_slot)
self.current_slot = 0
self.counter_ = 0
self.count_log = MetricsToLogBuff()
self._constants = SQLDict(tablename="constants")
self._maximum_tx = self._get_maximum_tx()
self._tx_receipts = {}

def _get_maximum_tx(self) -> str:
if "maximum_tx" in self._constants:
return self._constants["maximum_tx"]
return ""
return HISTORY_START

def _set_maximum_tx(self, tx: str):
self._maximum_tx = tx
Expand Down Expand Up @@ -90,109 +91,109 @@ def run(self):
def process_functions(self):
self.gather_unknown_transactions()

def gather_unknown_transactions(self):
start_time = time.time()
def get_tx_receipts(self, stop_slot=None):
signatures = self.gather_unknown_transactions()
self.debug(f'{len(signatures)}')

poll_txs = []
tx_list = []
for signature, _ in reversed(signatures):
if signature not in self._tx_receipts:
tx_list.append(signature)
if len(tx_list) >= 20:
poll_txs.append(tx_list)
tx_list = []
if len(tx_list) > 0:
poll_txs.append(tx_list)
self._get_txs(poll_txs)

minimal_tx = None
maximum_tx = None
maximum_slot = None
continue_flag = True
current_slot = self.solana.get_slot(commitment=FINALIZED)["result"]
tx_per_request = 20
for signature, _ in reversed(signatures):
if signature not in self._tx_receipts:
self.error(f'{signature} receipt not found')
continue

tx = self._tx_receipts[signature]
slot = tx['slot']
if stop_slot and slot > stop_slot:
break
afalaleev marked this conversation as resolved.
Show resolved Hide resolved
yield (slot, signature, tx)

self._set_maximum_tx(signature)
self.solana_signatures.remove_signature(signature)
del self._tx_receipts[signature]

def gather_unknown_transactions(self):
minimal_tx = self.solana_signatures.get_minimal_tx()
continue_flag = True
counter = 0
gathered_signatures = 0
tx_list = []
while continue_flag:
results = self._get_signatures(minimal_tx, 1000)
results = self._get_signatures(minimal_tx, self._maximum_tx, INDEXER_POLL_COUNT)
len_results = len(results)
if len_results == 0:
break

minimal_tx = results[-1]["signature"]
if maximum_tx is None:
tx = results[0]
maximum_tx = tx["signature"]
maximum_slot = tx["slot"]

gathered_signatures += len_results
counter += 1

tx_idx = 0
prev_slot = 0

for tx in results:
sol_sign = tx["signature"]
slot = tx["slot"]

if slot != prev_slot:
tx_idx = 0
prev_slot = slot

if slot < self.last_slot:
if sol_sign == self._maximum_tx:
continue_flag = False
break

if sol_sign in [HISTORY_START, self._maximum_tx]:
if slot < self.last_slot:
continue_flag = False
break

tx_list.append((sol_sign, slot, tx_idx))
if len(tx_list) >= tx_per_request:
poll_txs.append(tx_list)
if len(tx_list) >= INDEXER_POLL_COUNT:
self.solana_signatures.add_signature(tx_list[0][0], tx_list[0][1])
afalaleev marked this conversation as resolved.
Show resolved Hide resolved
tx_list = []
if len(poll_txs) >= INDEXER_POLL_COUNT / tx_per_request:
self._get_txs(poll_txs)

tx_idx += 1
tx_list.append((sol_sign, slot))

if len(tx_list) > 0:
poll_txs.append(tx_list)
if len(poll_txs) > 0:
self._get_txs(poll_txs)

self.current_slot = current_slot
self.counter_ = 0
self._set_maximum_tx(maximum_tx)
return tx_list

get_history_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
self.count_log.print(
self.debug,
list_params={"get_history_ms": get_history_ms, "gathered_signatures": gathered_signatures, "counter": counter},
latest_params={"maximum_tx": maximum_tx, "maximum_slot": maximum_slot}
)

def _get_signatures(self, before: Optional[str], limit: int) -> []:
response = self.solana.get_signatures_for_address(before, limit, FINALIZED)
def _get_signatures(self, before: Optional[str], until: Optional[str], limit: int) -> List[Dict[str, Union[int, str]]]:
opts: Dict[str, Union[int, str]] = {}
if before is not None:
opts["before"] = before
if until is not None:
opts["until"] = until
afalaleev marked this conversation as resolved.
Show resolved Hide resolved
opts["limit"] = limit
opts["commitment"] = FINALIZED
response = self.solana._send_rpc_request("getSignaturesForAddress", EVM_LOADER_ID, opts)
error = response.get('error')
result = response.get('result', [])
if error:
self.warning(f'Fail to get signatures: {error}')
return result

def _get_txs(self, poll_txs: List[List[Tuple[str, int, int]]]) -> None:
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()

def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None:
sign_list = []
filtered_list = []
for sol_sign, slot, tx_idx in full_list:
if not self.transaction_receipts.contains(slot, sol_sign):
sign_list.append(sol_sign)
filtered_list.append((sol_sign, slot, tx_idx))
def _get_txs(self, poll_txs: List[List[str]]) -> None:
if len(poll_txs) > 1:
pool = ThreadPool(min(PARALLEL_REQUESTS, len(poll_txs)))
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()
else:
if len(poll_txs) > 0:
self._get_tx_receipts(poll_txs[0])

def _get_tx_receipts(self, sign_list: List[str]) -> None:
if len(sign_list) == 0:
return

retry = RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
while retry > 0:
try:
tx_list = self.solana.get_multiple_receipts(sign_list)
for tx_info, tx in zip(filtered_list, tx_list):
sol_sign, slot, tx_idx = tx_info
self._add_tx(sol_sign, tx, slot, tx_idx)
for sol_sign, tx in zip(sign_list, tx_list):
self._add_tx(sol_sign, tx)
retry = 0
except Exception as err:
retry -= 1
Expand All @@ -206,16 +207,17 @@ def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None:
if self.counter_ % 100 == 0:
self.debug(f"Acquired {self.counter_} receipts")

def _add_tx(self, sol_sign, tx, slot, tx_idx):
def _add_tx(self, sol_sign, tx):
if tx is not None:
add = False
msg = tx['transaction']['message']
slot = tx['slot']
for instruction in msg['instructions']:
if msg["accountKeys"][instruction["programIdIndex"]] == EVM_LOADER_ID:
add = True
if add:
self.debug(f'{(slot, tx_idx, sol_sign)}')
self.transaction_receipts.add_tx(slot, tx_idx, sol_sign, tx)
self.debug(f'{(slot, sol_sign)}')
self._tx_receipts[sol_sign] = tx
else:
self.debug(f"trx is None {sol_sign}")

26 changes: 26 additions & 0 deletions proxy/indexer/solana_signatures_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from ..indexer.base_db import BaseDB


class SolanaSignatures(BaseDB):
def __init__(self):
BaseDB.__init__(self, 'solana_transaction_signatures')

def add_signature(self, signature, slot):
with self._conn.cursor() as cursor:
cursor.execute(f'''
INSERT INTO solana_transaction_signatures
(slot, signature)
VALUES(%s, %s) ON CONFLICT DO NOTHING''',
(slot, signature))

def remove_signature(self, signature):
with self._conn.cursor() as cursor:
cursor.execute(f'DELETE FROM solana_transaction_signatures WHERE signature = %s', (signature,))

def get_minimal_tx(self):
with self._conn.cursor() as cursor:
cursor.execute(f'SELECT slot, signature FROM solana_transaction_signatures ORDER BY slot LIMIT 1')
row = cursor.fetchone()
if row is not None:
return row[1]
return None
77 changes: 0 additions & 77 deletions proxy/indexer/trx_receipts_storage.py

This file was deleted.

Loading