diff --git a/Dockerfile b/Dockerfile index 13c9e2b86..9086d880e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,7 +42,7 @@ COPY . /opt ARG PROXY_REVISION ARG LOG_CFG=log_cfg.json RUN (cp -f /opt/${LOG_CFG} /opt/log_cfg.json || true) -RUN sed -i 's/NEON_PROXY_REVISION_TO_BE_REPLACED/'"$PROXY_REVISION"'/g' /opt/proxy/neon_rpc_api_model/neon_rpc_api_model.py +RUN sed -i 's/NEON_PROXY_REVISION_TO_BE_REPLACED/'"$PROXY_REVISION"'/g' /opt/proxy/neon_rpc_api_model/neon_rcp_api_worker.py COPY ./proxy/solana-py.patch /opt RUN cd /usr/local/lib/python3.8/dist-packages/ && patch -p0 Any: @@ -25,7 +25,7 @@ def encode_pickable(object) -> bytes: @logged_group("neon.MemPool") class PickableDataServer(ABC): - def __init__(self, *, user: PickableDataServerUser): + def __init__(self, *, user: IPickableDataServerUser): self._user = user asyncio.get_event_loop().create_task(self.run_server()) @@ -63,7 +63,7 @@ async def _recv_pickable_data(self, reader: StreamReader): class AddrPickableDataSrv(PickableDataServer): - def __init__(self, *, user: PickableDataServerUser, address: Tuple[str, int]): + def __init__(self, *, user: IPickableDataServerUser, address: Tuple[str, int]): self._address = address PickableDataServer.__init__(self, user=user) @@ -75,7 +75,7 @@ async def run_server(self): class PipePickableDataSrv(PickableDataServer): - def __init__(self, *, user: PickableDataServerUser, srv_sock: socket.socket): + def __init__(self, *, user: IPickableDataServerUser, srv_sock: socket.socket): self._srv_sock = srv_sock PickableDataServer.__init__(self, user=user) diff --git a/proxy/mempool/executor_mng.py b/proxy/mempool/executor_mng.py index 91bac4898..4d2a5252b 100644 --- a/proxy/mempool/executor_mng.py +++ b/proxy/mempool/executor_mng.py @@ -13,7 +13,7 @@ from .mempool_executor import MPExecutor -class MpExecutorClient(PipePickableDataClient): +class MPExecutorClient(PipePickableDataClient): def __init__(self, client_sock: socket.socket): PipePickableDataClient.__init__(self, client_sock=client_sock) @@ -30,7 +30,7 @@ class MPExecutorMng(IMPExecutor): @dataclasses.dataclass class ExecutorInfo: executor: MPExecutor - client: MpExecutorClient + client: MPExecutorClient id: int def __init__(self, executor_count: int, config: IConfig): @@ -57,7 +57,7 @@ def is_available(self) -> bool: def _has_available(self) -> bool: return len(self._available_executor_pool) > 0 - def _get_executor(self) -> Tuple[int, MpExecutorClient]: + def _get_executor(self) -> Tuple[int, MPExecutorClient]: executor_id = self._available_executor_pool.pop() self.debug(f"Acquire executor: {executor_id}") self._busy_executor_pool.add(executor_id) @@ -81,7 +81,7 @@ def release_resource(self, resource_id: int): def _create_executor(executor_id: int, config: IConfig) -> ExecutorInfo: client_sock, srv_sock = socket.socketpair() executor = MPExecutor(executor_id, srv_sock, config) - client = MpExecutorClient(client_sock) + client = MPExecutorClient(client_sock) return MPExecutorMng.ExecutorInfo(executor=executor, client=client, id=executor_id) def __del__(self): diff --git a/proxy/mempool/mempool.py b/proxy/mempool/mempool.py index 9986d262a..1e245d2b6 100644 --- a/proxy/mempool/mempool.py +++ b/proxy/mempool/mempool.py @@ -3,7 +3,7 @@ from logged_groups import logged_group import bisect -from .mempool_api import MPRequest, MPResultCode, MPResult, IMPExecutor, MPRequestType, \ +from .mempool_api import MPRequest, MPResultCode, MPTxResult, IMPExecutor, MPRequestType, \ MPTxRequest, MPPendingTxCountReq @@ -16,7 +16,6 @@ class MemPool: def __init__(self, executor: IMPExecutor): self._req_queue = [] - self._lock = asyncio.Lock() self._req_queue_cond = asyncio.Condition() self._processing_tasks: List[Tuple[int, asyncio.Task, MPRequest]] = [] # signer -> pending_tx_counter @@ -29,12 +28,12 @@ def __init__(self, executor: IMPExecutor): async def enqueue_mp_request(self, mp_request: MPRequest): if mp_request.type == MPRequestType.SendTransaction: tx_request: MPTxRequest = mp_request - return await self.on_send_tx_request(tx_request) + return await self._on_send_tx_request(tx_request) elif mp_request.type == MPRequestType.GetTrxCount: pending_count_req: MPPendingTxCountReq = mp_request return self.get_pending_trx_count(pending_count_req.sender) - async def on_send_tx_request(self, mp_request: MPTxRequest): + async def _on_send_tx_request(self, mp_request: MPTxRequest): await self.enqueue_mp_transaction(mp_request) sender = "0x" + mp_request.neon_tx.sender() self._inc_pending_tx_counter(sender) @@ -89,33 +88,33 @@ async def check_processing_tasks(self): self._executor.release_resource(resource_id) continue - mp_result: MPResult = task.result() - assert isinstance(mp_result, MPResult) - assert mp_result.code != MPResultCode.Dummy - await self._process_mp_result(resource_id, mp_result, mp_request) + mp_tx_result: MPTxResult = task.result() + assert isinstance(mp_tx_result, MPTxResult) + assert mp_tx_result.code != MPResultCode.Dummy + await self._process_mp_result(resource_id, mp_tx_result, mp_request) self._processing_tasks = not_finished_tasks await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC) - async def _process_mp_result(self, resource_id: int, mp_result: MPResult, mp_request: MPTxRequest): + async def _process_mp_result(self, resource_id: int, mp_tx_result: MPTxResult, mp_request: MPTxRequest): tx_hash = "0x" + mp_request.neon_tx.hash_signed().hex() log_ctx = {"context": {"req_id": mp_request.req_id}} - if mp_result.code == MPResultCode.Done: + if mp_tx_result.code == MPResultCode.Done: self.debug(f"Neon tx: {tx_hash} - processed on executor: {resource_id} - done", extra=log_ctx) self._on_request_done(mp_request) self._executor.release_resource(resource_id) await self._kick_tx_queue() return - self.warning(f"Failed to process tx: {tx_hash} - on executor: {resource_id}, status: {mp_result} - reschedule", extra=log_ctx) - if mp_result.code == MPResultCode.BlockedAccount: + self.warning(f"Failed to process tx: {tx_hash} - on executor: {resource_id}, status: {mp_tx_result} - reschedule", extra=log_ctx) + if mp_tx_result.code == MPResultCode.BlockedAccount: self._executor.release_resource(resource_id) await self.enqueue_mp_request(mp_request) await self._kick_tx_queue() - elif mp_result.code == MPResultCode.NoLiquidity: + elif mp_tx_result.code == MPResultCode.NoLiquidity: self._executor.on_no_liquidity(resource_id) await self.enqueue_mp_request(mp_request) await self._kick_tx_queue() - elif mp_result.code == MPResultCode.Unspecified: + elif mp_tx_result.code == MPResultCode.Unspecified: self._executor.release_resource(resource_id) self._on_request_dropped_away(mp_request) await self._kick_tx_queue() diff --git a/proxy/mempool/mempool_api.py b/proxy/mempool/mempool_api.py index bc5dc3226..46274b016 100644 --- a/proxy/mempool/mempool_api.py +++ b/proxy/mempool/mempool_api.py @@ -73,6 +73,6 @@ class MPResultCode(IntEnum): @dataclass -class MPResult: +class MPTxResult: code: MPResultCode data: Any diff --git a/proxy/mempool/mempool_executor.py b/proxy/mempool/mempool_executor.py index 127ee4b02..91a86a33f 100644 --- a/proxy/mempool/mempool_executor.py +++ b/proxy/mempool/mempool_executor.py @@ -6,17 +6,17 @@ from ..common_neon.solana_interactor import SolanaInteractor from ..common_neon.config import IConfig -from ..common_neon.utils import PipePickableDataSrv, PickableDataServerUser, Any +from ..common_neon.utils import PipePickableDataSrv, IPickableDataServerUser, Any from ..common_neon.config import Config from ..memdb.memdb import MemDB from .transaction_sender import NeonTxSender from .operator_resource_list import OperatorResourceList -from .mempool_api import MPRequest, MPResult, MPResultCode +from .mempool_api import MPRequest, MPTxResult, MPResultCode @logged_group("neon.MemPool") -class MPExecutor(mp.Process, PickableDataServerUser): +class MPExecutor(mp.Process, IPickableDataServerUser): def __init__(self, executor_id: int, srv_sock: socket.socket, config: IConfig): self.info(f"Initialize mempool_executor: {executor_id}") @@ -44,8 +44,8 @@ def execute_neon_tx(self, mempool_request: MPRequest): self.execute_neon_tx_impl(mempool_request) except Exception as err: self.error(f"Failed to execute neon_tx: {err}") - return MPResult(MPResultCode.Unspecified, None) - return MPResult(MPResultCode.Done, None) + return MPTxResult(MPResultCode.Unspecified, None) + return MPTxResult(MPResultCode.Done, None) def execute_neon_tx_impl(self, mempool_tx_cfg: MPRequest): neon_tx = mempool_tx_cfg.neon_tx diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index 8478cfd9f..35e6c1131 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -3,7 +3,7 @@ from multiprocessing import Process from typing import Any -from ..common_neon.utils.pickable_data_server import AddrPickableDataSrv, PickableDataServerUser +from ..common_neon.utils.pickable_data_server import AddrPickableDataSrv, IPickableDataServerUser from ..common_neon.config import IConfig from .mempool import MemPool @@ -11,7 +11,7 @@ @logged_group("neon.MemPool") -class MPService(PickableDataServerUser): +class MPService(IPickableDataServerUser): MP_SERVICE_PORT = 9091 MP_SERVICE_HOST = "0.0.0.0" diff --git a/proxy/neon_rpc_api_model/__init__.py b/proxy/neon_rpc_api_model/__init__.py index 16b7421f3..059398e7d 100644 --- a/proxy/neon_rpc_api_model/__init__.py +++ b/proxy/neon_rpc_api_model/__init__.py @@ -1 +1 @@ -from . neon_rpc_api_model import NeonRpcApiModel, NEON_PROXY_PKG_VERSION, NEON_PROXY_REVISION +from . neon_rcp_api_worker import NeonRpcApiWorker, NEON_PROXY_PKG_VERSION, NEON_PROXY_REVISION diff --git a/proxy/neon_rpc_api_model/neon_rpc_api_model.py b/proxy/neon_rpc_api_model/neon_rcp_api_worker.py similarity index 99% rename from proxy/neon_rpc_api_model/neon_rpc_api_model.py rename to proxy/neon_rpc_api_model/neon_rcp_api_worker.py index 85ff17d6c..d408cb0e1 100644 --- a/proxy/neon_rpc_api_model/neon_rpc_api_model.py +++ b/proxy/neon_rpc_api_model/neon_rcp_api_worker.py @@ -42,7 +42,7 @@ def default(self, obj): @logged_group("neon.Proxy") -class NeonRpcApiModel: +class NeonRpcApiWorker: proxy_id_glob = multiprocessing.Value('i', 0) def __init__(self): diff --git a/proxy/neon_rpc_api_model/transaction_validator.py b/proxy/neon_rpc_api_model/transaction_validator.py index 90cc3228d..c4833efc6 100644 --- a/proxy/neon_rpc_api_model/transaction_validator.py +++ b/proxy/neon_rpc_api_model/transaction_validator.py @@ -80,11 +80,11 @@ def _prevalidate_tx(self): self._prevalidate_tx_chain_id() self._prevalidate_tx_size() self._prevalidate_sender_balance() + self._prevalidate_underpriced_tx_without_chainid() def _prevalidate_emulator(self, emulator_json: dict): self._prevalidate_gas_usage(emulator_json) self._prevalidate_account_sizes(emulator_json) - self._prevalidate_underpriced_tx_without_chainid() def extract_ethereum_error(self, e: Exception): receipt_parser = SolReceiptParser(e) diff --git a/proxy/plugin/neon_rpc_api_plugin.py b/proxy/plugin/neon_rpc_api_plugin.py index c7b3083b7..31d88306c 100644 --- a/proxy/plugin/neon_rpc_api_plugin.py +++ b/proxy/plugin/neon_rpc_api_plugin.py @@ -25,7 +25,7 @@ from ..common_neon.solana_receipt_parser import SolTxError from ..common_neon.errors import EthereumError from ..environment import ENABLE_PRIVATE_API -from ..neon_rpc_api_model import NeonRpcApiModel +from ..neon_rpc_api_model import NeonRpcApiWorker from ..statistics_exporter.prometheus_proxy_exporter import PrometheusExporter modelInstanceLock = threading.Lock() @@ -54,7 +54,7 @@ def getModel(cls): global modelInstance with modelInstanceLock: if modelInstance is None: - modelInstance = NeonRpcApiModel() + modelInstance = NeonRpcApiWorker() return modelInstance def routes(self) -> List[Tuple[int, str]]: diff --git a/proxy/testing/test_neon_rpc_api.py b/proxy/testing/test_neon_rpc_api.py index ee4342af7..ebc9c5dc3 100644 --- a/proxy/testing/test_neon_rpc_api.py +++ b/proxy/testing/test_neon_rpc_api.py @@ -1,13 +1,13 @@ import unittest from logged_groups import logged_group -from ..neon_rpc_api_model import NeonRpcApiModel +from ..neon_rpc_api_model import NeonRpcApiWorker @logged_group("neon.TestCases") class SolanaContractTests(unittest.TestCase): def setUp(self): - self.model = NeonRpcApiModel() + self.model = NeonRpcApiWorker() self.owner = '0xc1566af4699928fdf9be097ca3dc47ece39f8f8e' self.token1 = '0x49a449cd7fd8fbcf34d103d98f2c05245020e35b'