diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 7274fd096..eadca2688 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -9,15 +9,13 @@ steps: - label: ":terraform: build infrastructure" key: "create_infrastructure" if: &is_fts_enabled | - (build.pull_request.base_branch == "develop" && !build.pull_request.draft) || + (build.pull_request.base_branch == "712-mempool" && !build.pull_request.draft) || (build.source == "trigger_job" && build.env("NEON_EVM_FULL_TEST_SUITE") == "true") agents: queue: "testing" command: - ".buildkite/steps/full_test_suite/terraform-build.sh" - - wait - - label: ":cop::skin-tone-2: deploy check" command: ".buildkite/steps/deploy-test.sh" timeout: 90 @@ -33,6 +31,8 @@ steps: - "indexer.log" - "deploy_contracts.log" - "proxy_program_loader.log" +# env: +# UNITTEST_TESTPATH: proxy.testing.test_indexer_work.CancelTest.test_02_get_code_from_indexer - label: ":coverage: full test suite (FTS)" key: "full_test_suite" @@ -55,6 +55,7 @@ steps: - allure-reports.tar.gz - fts_${BUILDKITE_BUILD_NUMBER}.log - "./logs/*" + depends_on: "create_infrastructure" - wait diff --git a/.buildkite/steps/deploy-test.sh b/.buildkite/steps/deploy-test.sh index e65f820d6..e29f4601e 100755 --- a/.buildkite/steps/deploy-test.sh +++ b/.buildkite/steps/deploy-test.sh @@ -1,6 +1,7 @@ #!/bin/bash set -euo pipefail + wait-for-proxy() { PROXY_URL="$1" @@ -106,6 +107,7 @@ docker run --rm -ti --network=container:proxy \ -e POSTGRES_USER=neon-proxy \ -e POSTGRES_PASSWORD=neon-proxy-pass \ -e POSTGRES_HOST=postgres \ + -e UNITTEST_TESTPATH=${UNITTEST_TESTPATH:=} \ --entrypoint ./proxy/deploy-test.sh \ ${EXTRA_ARGS:-} \ $PROXY_IMAGE diff --git a/Dockerfile b/Dockerfile index 797e18341..dc77491ac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,7 @@ COPY . /opt ARG PROXY_REVISION ARG LOG_CFG=log_cfg.json RUN (cp -f /opt/${LOG_CFG} /opt/log_cfg.json || true) -RUN sed -i 's/NEON_PROXY_REVISION_TO_BE_REPLACED/'"$PROXY_REVISION"'/g' /opt/proxy/neon_rpc_api_model/neon_rpc_api_model.py +RUN sed -i 's/NEON_PROXY_REVISION_TO_BE_REPLACED/'"$PROXY_REVISION"'/g' /opt/proxy/neon_rpc_api_model/neon_rcp_api_worker.py COPY ./proxy/solana-py.patch /opt RUN cd /usr/local/lib/python3.8/dist-packages/ && patch -p0 Optional[str]: + """Gets the predefinded solana url""" + + @abstractmethod + def get_evm_count(self) -> Optional[int]: + """Gets the evm count""" + + +class Config(IConfig): + + def get_solana_url(self) -> Optional[str]: + return os.environ.get("SOLANA_URL", "http://localhost:8899") + + def get_evm_count(self) -> Optional[int]: + return int(os.environ.get("EVM_STEP_COUNT", 750)) diff --git a/proxy/common_neon/data.py b/proxy/common_neon/data.py index 08bfddfcc..6bcc8c8c8 100644 --- a/proxy/common_neon/data.py +++ b/proxy/common_neon/data.py @@ -1,3 +1,15 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Dict, Any + + +@dataclass +class NeonTxExecCfg: + is_underpriced_tx_without_chainid: bool + steps_executed: int + + +NeonEmulatingResult = Dict[str, Any] class NeonTxStatData: diff --git a/proxy/common_neon/emulator_interactor.py b/proxy/common_neon/emulator_interactor.py index bc8f64d05..7945d6016 100644 --- a/proxy/common_neon/emulator_interactor.py +++ b/proxy/common_neon/emulator_interactor.py @@ -9,7 +9,7 @@ from .environment_utils import neon_cli from .errors import EthereumError -from .types import NeonEmulatingResult +from .data import NeonEmulatingResult @logged_group("neon.Proxy") diff --git a/proxy/common_neon/types.py b/proxy/common_neon/types.py deleted file mode 100644 index 265191f8a..000000000 --- a/proxy/common_neon/types.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Dict, Any - - -class Result: - def __init__(self, reason: str = None): - self._reason = reason - - def __bool__(self) -> bool: - return self._reason is None - - def __str__(self) -> str: - return self._reason if self._reason is not None else "" - - -@dataclass -class NeonTxPrecheckResult: - is_underpriced_tx_without_chainid: bool - emulating_result: NeonEmulatingResult - - -NeonEmulatingResult = Dict[str, Any] diff --git a/proxy/common_neon/utils/__init__.py b/proxy/common_neon/utils/__init__.py index 320db8b9f..3361c8947 100644 --- a/proxy/common_neon/utils/__init__.py +++ b/proxy/common_neon/utils/__init__.py @@ -1,2 +1,5 @@ from .utils import * +from .pickable_data_server import AddrPickableDataSrv, PipePickableDataSrv, IPickableDataServerUser, \ + AddrPickableDataClient, PipePickableDataClient + diff --git a/proxy/common_neon/utils/pickable_data_server.py b/proxy/common_neon/utils/pickable_data_server.py new file mode 100644 index 000000000..b8efc2d56 --- /dev/null +++ b/proxy/common_neon/utils/pickable_data_server.py @@ -0,0 +1,199 @@ +from typing import Any, Tuple +from abc import ABC, abstractmethod + +import asyncio +from asyncio import StreamReader, StreamWriter +import socket +import pickle +import struct +from logged_groups import logged_group + + +class IPickableDataServerUser(ABC): + + @abstractmethod + async def on_data_received(self, data: Any) -> Any: + """Gets neon_tx_data from the neon rpc api service worker""" + + +@logged_group("neon.Network") +class PickableDataServer(ABC): + + def __init__(self, *, user: IPickableDataServerUser): + self._user = user + asyncio.get_event_loop().create_task(self.run_server()) + + @abstractmethod + async def run_server(self): + assert False + + async def handle_client(self, reader: StreamReader, writer: StreamWriter): + while True: + try: + self.debug("Got incoming connection. Waiting for pickable data") + data = await self._recv_pickable_data(reader) + result = await self._user.on_data_received(data) + self.debug(f"Encode pickable result_data: {result}") + result_data = encode_pickable(result, self) + self.debug(f"Send result_data: {len(result_data)}, bytes: {result_data.hex()}") + writer.write(result_data) + await writer.drain() + except ConnectionResetError as err: + self.warning(f"Connection reset error: {err}") + break + except asyncio.exceptions.IncompleteReadError as err: + self.error(f"Incomplete read error: {err}") + break + except Exception as err: + self.error(f"Failed to receive data err: {err}") + break + + async def _recv_pickable_data(self, reader: StreamReader): + len_packed: bytes = await read_data_async(self, reader, 4) + payload_len = struct.unpack("!I", len_packed)[0] + self.debug(f"Got payload len_packed: {len_packed.hex()}, that is: {payload_len}") + payload = await read_data_async(self, reader, payload_len) + data = pickle.loads(payload) + self.debug(f"Loaded pickable of type: {type(data)}") + return data + + +@logged_group("neon.MemPool") +class AddrPickableDataSrv(PickableDataServer): + + def __init__(self, *, user: IPickableDataServerUser, address: Tuple[str, int]): + self._address = address + PickableDataServer.__init__(self, user=user) + + async def run_server(self): + host, port = self._address + self.info(f"Listen port: {port} on: {host}") + await asyncio.start_server(self.handle_client, host, port) + + +@logged_group("neon.Network") +class PipePickableDataSrv(PickableDataServer): + + def __init__(self, *, user: IPickableDataServerUser, srv_sock: socket.socket): + self._srv_sock = srv_sock + PickableDataServer.__init__(self, user=user) + + async def run_server(self): + reader, writer = await asyncio.streams.open_connection(sock=self._srv_sock) + await self.handle_client(reader, writer) + + +class PickableDataClient: + + def __init__(self): + self._client_sock: socket.socket = None + self._reader: StreamReader = None + self._writer: StreamWriter = None + + def _set_client_sock(self, client_sock: socket.socket): + self._client_sock = client_sock + + async def async_init(self): + self.info("Async init pickable data client") + reader, writer = await asyncio.open_connection(sock=self._client_sock) + self._reader = reader + self._writer = writer + + def send_data(self, pickable_object: Any): + try: + payload: bytes = encode_pickable(pickable_object, self) + self.debug(f"Send object of type: {type(pickable_object)}, payload: {len(payload)}, bytes: 0x{payload[:15].hex()}") + self._client_sock.sendall(payload) + except BaseException as err: + self.error(f"Failed to send client data: {err}") + raise + try: + self.debug(f"Waiting for answer") + len_packed: bytes = read_data_sync(self, self._client_sock, 4) + data_len = struct.unpack("!I", len_packed)[0] + self.debug(f"Got len_packed bytes: {len_packed.hex()}, that is: {data_len} - bytes to receive") + + data = read_data_sync(self, self._client_sock, data_len) + self.debug(f"Got data: {len(data)}. Load pickled object") + result = pickle.loads(data) + self.debug(f"Got result: {result}") + return result + except BaseException as err: + self.error(f"Failed to receive answer data: {err}") + raise + + async def send_data_async(self, pickable_object): + + try: + self.debug(f"Send pickable_object of type: {type(pickable_object)}") + payload = encode_pickable(pickable_object, self) + self.debug(f"Payload: {len(payload)}, bytes: {payload[:15].hex()}") + self._writer.write(payload) + await self._writer.drain() + + except BaseException as err: + self.error(f"Failed to send client data: {err}") + raise + + try: + self.debug(f"Waiting for answer") + len_packed: bytes = await read_data_async(self, self._reader, 4) + data_len = struct.unpack("!I", len_packed)[0] + data = await read_data_async(self, self._reader, data_len) + self.debug(f"Got data: {len(data)}. Load pickled object") + result = pickle.loads(data) + self.debug(f"Got result: {result}") + return result + + except BaseException as err: + self.error(f"Failed to receive answer data: {err}") + raise + + +@logged_group("neon.Network") +class PipePickableDataClient(PickableDataClient): + + def __init__(self, client_sock: socket.socket): + PickableDataClient.__init__(self) + self._set_client_sock(client_sock=client_sock) + + +@logged_group("neon.Network") +class AddrPickableDataClient(PickableDataClient): + + def __init__(self, addr: Tuple[str, int]): + PickableDataClient.__init__(self) + host, port = addr + client_sock = socket.create_connection((host, port)) + self._set_client_sock(client_sock=client_sock) + + +def encode_pickable(object, logger) -> bytes: + data = pickle.dumps(object) + len_data = struct.pack("!I", len(data)) + logger.debug(f"Len data: {len(len_data)} - bytes, data: {len(data)} - bytes") + return len_data + data + + +async def read_data_async(self, reader: StreamReader, data_len: int) -> bytes: + data = b'' + while len(data) < data_len: + to_be_read = data_len - len(data) + self.debug(f"Reading data: {to_be_read} of: {data_len} - bytes") + chunk = await reader.read(to_be_read) + if not chunk: + raise EOFError(f"Failed to read chunk of data: {data_len}") + self.debug(f"Got chunk of data: {len(chunk)}") + data += chunk + return data + + +def read_data_sync(self, socket: socket.socket, data_len) -> bytes: + data = b'' + while len(data) < data_len: + to_be_read = data_len - len(data) + self.debug(f"Reading data: {to_be_read} of: {data_len} - bytes") + chunk: bytes = socket.recv(to_be_read) + self.debug(f"Got chunk of data: {len(chunk)}") + data += chunk + return data diff --git a/proxy/deploy-test.sh b/proxy/deploy-test.sh index 94ce21517..b0597ab1a 100755 --- a/proxy/deploy-test.sh +++ b/proxy/deploy-test.sh @@ -22,8 +22,11 @@ set ${TESTNAME:=*} export ETH_TOKEN_MINT=$NEON_TOKEN_MINT export TEST_PROGRAM=$(solana address -k /spl/bin/proxy_program-keypair.json) -# python3 -m unittest discover -v -p "test_${TESTNAME}.py" -find . -name "test_${TESTNAME}.py" -printf "%f\n" | sort | parallel --halt now,fail=1 --jobs 4 python3 -m unittest discover -v -p {} +if [[ -z "${UNITTEST_TESTPATH}" ]]; then + find . -name "test_*.py" -printf "%f\n" | sort | parallel --halt now,fail=1 --jobs 4 python3 -m unittest discover -v -p {} +else + python3 -m unittest ${UNITTEST_TESTPATH} +fi echo "Deploy test success" exit 0 diff --git a/proxy/memdb/transactions_db.py b/proxy/memdb/transactions_db.py index cb220314f..e43bc54f5 100644 --- a/proxy/memdb/transactions_db.py +++ b/proxy/memdb/transactions_db.py @@ -83,6 +83,8 @@ def _has_topics(src_topics, dst_topics): return False result_list = [] + indexed_logs = self._db.get_logs(from_block, to_block, addresses, topics, block_hash) + with self._tx_slot.get_lock(): for data in self._tx_by_neon_sign.values(): tx = pickle.loads(data) @@ -97,9 +99,10 @@ def _has_topics(src_topics, dst_topics): continue if len(topics) and (not _has_topics(topics, log['topics'])): continue + if log in indexed_logs: + continue result_list.append(log) - - return result_list + self._db.get_logs(from_block, to_block, addresses, topics, block_hash) + return indexed_logs + result_list def get_sol_sign_list_by_neon_sign(self, neon_sign: str, is_pended_tx: bool, before_slot: int) -> [str]: if not is_pended_tx: diff --git a/proxy/mempool/__init__.py b/proxy/mempool/__init__.py new file mode 100644 index 000000000..84d5d1624 --- /dev/null +++ b/proxy/mempool/__init__.py @@ -0,0 +1,7 @@ +from .mempool_client import MemPoolClient +from .mempool_service import MPService +from .mempool import MemPool +from .mempool_api import * + +MP_SERVICE_PORT = MPService.MP_SERVICE_PORT +MP_SERVICE_HOST = MPService.MP_SERVICE_HOST diff --git a/proxy/mempool/executor_mng.py b/proxy/mempool/executor_mng.py new file mode 100644 index 000000000..3e258bc8e --- /dev/null +++ b/proxy/mempool/executor_mng.py @@ -0,0 +1,101 @@ +import asyncio +import dataclasses +import socket +from abc import ABC, abstractmethod +from collections import deque +from typing import List, Tuple, Deque, Set +from logged_groups import logged_group + +from ..common_neon.config import IConfig +from ..common_neon.utils import PipePickableDataClient + +from .mempool_api import MPRequest, IMPExecutor +from .mempool_executor import MPExecutor + + +class MPExecutorClient(PipePickableDataClient): + + def __init__(self, client_sock: socket.socket): + PipePickableDataClient.__init__(self, client_sock=client_sock) + + +class IMPExecutorMngUser(ABC): + + @abstractmethod + def on_resource_released(self, resource_id: int): + assert False + + +@logged_group("neon.MemPool") +class MPExecutorMng(IMPExecutor): + + BRING_BACK_EXECUTOR_TIMEOUT_SEC = 1800 + + @dataclasses.dataclass + class ExecutorInfo: + executor: MPExecutor + client: MPExecutorClient + id: int + + def __init__(self, user: IMPExecutorMngUser, executor_count: int, config: IConfig): + self.info(f"Initialize executor mng with executor_count: {executor_count}") + self._available_executor_pool: Deque[int] = deque() + self._busy_executor_pool: Set[int] = set() + self._executors: List[MPExecutorMng.ExecutorInfo] = list() + self._user = user + for i in range(executor_count): + executor_info = MPExecutorMng._create_executor(i, config) + self._executors.append(executor_info) + self._available_executor_pool.appendleft(i) + executor_info.executor.start() + + async def async_init(self): + for ex_info in self._executors: + await ex_info.client.async_init() + + def submit_mp_request(self, mp_reqeust: MPRequest) -> Tuple[int, asyncio.Task]: + executor_id, executor = self._get_executor() + tx_hash = "0x" + mp_reqeust.neon_tx.hash_signed().hex() + self.debug(f"Tx: {tx_hash} - scheduled on executor: {executor_id}") + task = asyncio.get_event_loop().create_task(executor.send_data_async(mp_reqeust)) + return executor_id, task + + def is_available(self) -> bool: + return self._has_available() + + def _has_available(self) -> bool: + return len(self._available_executor_pool) > 0 + + def _get_executor(self) -> Tuple[int, MPExecutorClient]: + executor_id = self._available_executor_pool.pop() + self.debug(f"Acquire executor: {executor_id}") + self._busy_executor_pool.add(executor_id) + executor_info = self._executors[executor_id] + return executor_id, executor_info.client + + def on_no_liquidity(self, resource_id: int): + self.debug(f"No liquidity, executor: {resource_id} - will be unblocked in: {MPExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC} sec") + asyncio.get_event_loop().create_task(self._release_executor_later(resource_id)) + + async def _release_executor_later(self, executor_id: int): + await asyncio.sleep(MPExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC) + self.release_resource(executor_id) + + def release_resource(self, resource_id: int): + self.debug(f"Release executor: {resource_id}") + self._busy_executor_pool.remove(resource_id) + self._available_executor_pool.appendleft(resource_id) + self._user.on_resource_released(resource_id) + + @staticmethod + def _create_executor(executor_id: int, config: IConfig) -> ExecutorInfo: + client_sock, srv_sock = socket.socketpair() + executor = MPExecutor(executor_id, srv_sock, config) + client = MPExecutorClient(client_sock) + return MPExecutorMng.ExecutorInfo(executor=executor, client=client, id=executor_id) + + def __del__(self): + for executor_info in self._executors: + executor_info.executor.kill() + self._busy_executor_pool.clear() + self._available_executor_pool.clear() diff --git a/proxy/mempool/mempool.py b/proxy/mempool/mempool.py new file mode 100644 index 000000000..31a9d672e --- /dev/null +++ b/proxy/mempool/mempool.py @@ -0,0 +1,134 @@ +import asyncio +from typing import List, Tuple + +from logged_groups import logged_group + +from .mempool_api import MPRequest, MPResultCode, MPTxResult, IMPExecutor, MPRequestType, MPTxRequest,\ + MPPendingTxCountReq +from .mempool_schedule import MPTxSchedule + + +@logged_group("neon.MemPool") +class MemPool: + + CHECK_TASK_TIMEOUT_SEC = 0.01 + MP_CAPACITY = 4096 + + def __init__(self, executor: IMPExecutor, capacity: int = MP_CAPACITY): + self._tx_schedule = MPTxSchedule(capacity) + self._schedule_cond = asyncio.Condition() + self._processing_tasks: List[Tuple[int, asyncio.Task, MPRequest]] = [] + self._process_tx_results_task = asyncio.get_event_loop().create_task(self.check_processing_tasks()) + self._process_tx_queue_task = asyncio.get_event_loop().create_task(self.process_tx_schedule()) + + self._executor = executor + + async def enqueue_mp_request(self, mp_request: MPRequest): + if mp_request.type == MPRequestType.SendTransaction: + tx_request: MPTxRequest = mp_request + return await self._schedule_mp_tx_request(tx_request) + elif mp_request.type == MPRequestType.GetTrxCount: + pending_count_req: MPPendingTxCountReq = mp_request + return self.get_pending_trx_count(pending_count_req.sender) + + async def _schedule_mp_tx_request(self, mp_request: MPTxRequest): + log_ctx = {"context": {"req_id": mp_request.req_id}} + try: + self._tx_schedule.add_mp_tx_request(mp_request) + count = self.get_pending_trx_count(mp_request.sender_address) + self.debug(f"Got and scheduled mp_tx_request: {mp_request.log_str}, pending in pool: {count}", extra=log_ctx) + except Exception as err: + self.error(f"Failed to schedule mp_tx_request: {mp_request.log_str}. Error: {err}", extra=log_ctx) + finally: + await self._kick_tx_schedule() + + def get_pending_trx_count(self, sender_addr: str) -> int: + return self._tx_schedule.get_pending_trx_count(sender_addr) + + async def process_tx_schedule(self): + while True: + async with self._schedule_cond: + await self._schedule_cond.wait() + self.debug(f"Schedule processing got awake, condition: {self._schedule_cond.__repr__()}") + while self._executor.is_available(): + mp_request: MPTxRequest = self._tx_schedule.acquire_tx_for_execution() + if mp_request is None: + break + + try: + log_ctx = {"context": {"req_id": mp_request.req_id}} + self.debug(f"Got mp_tx_request from schedule: {mp_request.log_str}, left senders in schedule: {len(self._tx_schedule._sender_tx_pools)}", extra=log_ctx) + self.submit_request_to_executor(mp_request) + except Exception as err: + self.error(f"Failed enqueue to execute mp_tx_request: {mp_request.log_str}. Error: {err}") + + def submit_request_to_executor(self, mp_tx_request: MPRequest): + resource_id, task = self._executor.submit_mp_request(mp_tx_request) + self._processing_tasks.append((resource_id, task, mp_tx_request)) + + async def check_processing_tasks(self): + while True: + not_finished_tasks = [] + for resource_id, task, mp_request in self._processing_tasks: + if not task.done(): + not_finished_tasks.append((resource_id, task, mp_request)) + continue + exception = task.exception() + if exception is not None: + log_ctx = {"context": {"req_id": mp_request.req_id}} + self.error(f"Exception during processing request: {exception} - tx will be dropped away", extra=log_ctx) + self._drop_request_away(mp_request) + self._executor.release_resource(resource_id) + continue + + mp_tx_result: MPTxResult = task.result() + assert isinstance(mp_tx_result, MPTxResult), f"Got unexpected result: {mp_tx_result}" + await self._process_mp_result(resource_id, mp_tx_result, mp_request) + + self._processing_tasks = not_finished_tasks + await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC) + + async def _process_mp_result(self, resource_id: int, mp_tx_result: MPTxResult, mp_request: MPTxRequest): + try: + log_fn = self.warning if mp_tx_result.code != MPResultCode.Done else self.debug + log_ctx = {"context": {"req_id": mp_request.req_id}} + log_fn(f"On mp tx result: {mp_tx_result} - of: {mp_request.log_str}", extra=log_ctx) + + if mp_tx_result.code == MPResultCode.BlockedAccount: + self._executor.release_resource(resource_id) + await self.enqueue_mp_request(mp_request) + elif mp_tx_result.code == MPResultCode.NoLiquidity: + self._executor.on_no_liquidity(resource_id) + await self.enqueue_mp_request(mp_request) + elif mp_tx_result.code == MPResultCode.Unspecified: + self._executor.release_resource(resource_id) + self._drop_request_away(mp_request) + elif mp_tx_result.code == MPResultCode.Done: + self._on_request_done(mp_request) + self._executor.release_resource(resource_id) + except Exception as err: + self.error(f"Exception during the result processing: {err}", extra=log_ctx) + finally: + await self._kick_tx_schedule() + + def _on_request_done(self, tx_request: MPTxRequest): + sender = tx_request.sender_address + self._tx_schedule.done(sender, tx_request.nonce) + + count = self.get_pending_trx_count(sender) + log_ctx = {"context": {"req_id": tx_request.req_id}} + self.debug(f"Reqeust done, pending tx count: {count}", extra=log_ctx) + + def _drop_request_away(self, tx_request: MPTxRequest): + self._tx_schedule.drop_request_away(tx_request) + count = self.get_pending_trx_count(tx_request.sender_address) + log_ctx = {"context": {"req_id": tx_request.req_id}} + self.debug(f"Reqeust: {tx_request.log_str} dropped away, pending tx count: {count}", extra=log_ctx) + + async def _kick_tx_schedule(self): + async with self._schedule_cond: + # self.debug(f"Kick the schedule, condition: {self._schedule_cond.__repr__()}") + self._schedule_cond.notify() + + def on_resource_got_available(self, resource_id: int): + asyncio.get_event_loop().create_task(self._kick_tx_schedule()) diff --git a/proxy/mempool/mempool_api.py b/proxy/mempool/mempool_api.py new file mode 100644 index 000000000..a94ea4269 --- /dev/null +++ b/proxy/mempool/mempool_api.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Tuple +from abc import ABC, abstractmethod +from asyncio import Task + +from ..common_neon.eth_proto import Trx as NeonTx +from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult + + +class IMPExecutor(ABC): + + @abstractmethod + def submit_mp_request(self, mp_reqeust: MPRequest) -> Tuple[int, Task]: + pass + + @abstractmethod + def is_available(self) -> bool: + pass + + # TODO: drop it away + @abstractmethod + def on_no_liquidity(self, resource_id: int): + pass + + @abstractmethod + def release_resource(self, resource_id: int): + pass + + +class MPRequestType(IntEnum): + SendTransaction = 0, + GetTrxCount = 1, + Dummy = -1 + + +@dataclass(order=True) +class MPRequest: + req_id: int = field(compare=False) + type: MPRequestType = field(compare=False, default=MPRequestType.Dummy) + + +@dataclass(eq=True, order=True) +class MPTxRequest(MPRequest): + nonce: int = field(compare=True, default=None) + signature: str = field(compare=False, default=None) + neon_tx: NeonTx = field(compare=False, default=None) + neon_tx_exec_cfg: NeonTxExecCfg = field(compare=False, default=None) + emulating_result: NeonEmulatingResult = field(compare=False, default=None) + sender_address: str = field(compare=False, default=None) + gas_price: int = field(compare=False, default=None) + + def __post_init__(self): + self.gas_price = self.neon_tx.gasPrice + self.nonce = self.neon_tx.nonce + self.sender_address = "0x" + self.neon_tx.sender() + self.type = MPRequestType.SendTransaction + hash = "0x" + self.neon_tx.hash_signed().hex() + self.log_str = f"MPTxRequest(hash={hash[:10]}..., sender_address=0x{self.sender_address[:10]}..., nonce={self.nonce}, gas_price={self.gas_price})" + + +@dataclass +class MPPendingTxCountReq(MPRequest): + + sender: str = None + + def __post_init__(self): + self.type = MPRequestType.GetTrxCount + + +class MPResultCode(IntEnum): + Done = 0 + BlockedAccount = 1, + SolanaUnavailable = 2, + NoLiquidity = 3, + Unspecified = 4, + Dummy = -1 + + +@dataclass +class MPTxResult: + code: MPResultCode + data: Any diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py new file mode 100644 index 000000000..9f7ab1d2b --- /dev/null +++ b/proxy/mempool/mempool_client.py @@ -0,0 +1,76 @@ +from __future__ import annotations +import threading +from typing import Callable +from logged_groups import logged_group + +from .mempool_api import MPTxRequest, MPPendingTxCountReq + +from ..common_neon.eth_proto import Trx as NeonTx +from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult +from ..common_neon.utils import AddrPickableDataClient + + +def _guard_conn(method: Callable) -> Callable: + def wrapper(self, *args, **kwargs): + with self._mp_conn_lock: + return method(self, *args, **kwargs) + + return wrapper + + +def _reconnecting(method: Callable) -> Callable: + def wrapper(self, *args, **kwargs): + try: + return method(self, *args, **kwargs) + except (InterruptedError, Exception) as err: + self.error(f"Failed to transfer data, unexpected err: {err}") + self._reconnect_mp() + raise + return wrapper + + +@logged_group("neon.Proxy") +class MemPoolClient: + + RECONNECT_MP_TIME_SEC = 5 + + def __init__(self, host: str, port: int): + self.debug("Init MemPoolClient") + self._mp_conn_lock = threading.Lock() + self._address = (host, port) + self._is_connecting = threading.Event() + self._connect_mp() + + def _reconnect_mp(self): + if self._is_connecting.is_set(): + return + self._is_connecting.set() + self.debug(f"Reconnecting MemPool in: {MemPoolClient.RECONNECT_MP_TIME_SEC} sec.") + threading.Timer(MemPoolClient.RECONNECT_MP_TIME_SEC, self._connect_mp).start() + + @_guard_conn + def _connect_mp(self): + try: + self.debug(f"Connect MemPool: {self._address}") + self._pickable_data_client = AddrPickableDataClient(self._address) + except Exception as err: + self.error(f"Failed to connect MemPool: {self._address}, error: {err}") + self._is_connecting.clear() + self._reconnect_mp() + finally: + self._is_connecting.clear() + + @_guard_conn + @_reconnecting + def send_raw_transaction(self, req_id: int, signature: str, neon_tx: NeonTx, neon_tx_exec_cfg: NeonTxExecCfg, + emulating_result: NeonEmulatingResult): + + mempool_tx_request = MPTxRequest(req_id=req_id, signature=signature, neon_tx=neon_tx, + neon_tx_exec_cfg=neon_tx_exec_cfg, emulating_result=emulating_result) + return self._pickable_data_client.send_data(mempool_tx_request) + + @_guard_conn + @_reconnecting + def get_pending_tx_count(self, req_id: int, sender: str): + mempool_pending_tx_count_req = MPPendingTxCountReq(req_id=req_id, sender=sender) + return self._pickable_data_client.send_data(mempool_pending_tx_count_req) diff --git a/proxy/mempool/mempool_executor.py b/proxy/mempool/mempool_executor.py new file mode 100644 index 000000000..91a86a33f --- /dev/null +++ b/proxy/mempool/mempool_executor.py @@ -0,0 +1,65 @@ +import asyncio +import multiprocessing as mp +import socket + +from logged_groups import logged_group, logging_context + +from ..common_neon.solana_interactor import SolanaInteractor +from ..common_neon.config import IConfig +from ..common_neon.utils import PipePickableDataSrv, IPickableDataServerUser, Any +from ..common_neon.config import Config +from ..memdb.memdb import MemDB + +from .transaction_sender import NeonTxSender +from .operator_resource_list import OperatorResourceList +from .mempool_api import MPRequest, MPTxResult, MPResultCode + + +@logged_group("neon.MemPool") +class MPExecutor(mp.Process, IPickableDataServerUser): + + def __init__(self, executor_id: int, srv_sock: socket.socket, config: IConfig): + self.info(f"Initialize mempool_executor: {executor_id}") + self._id = executor_id + self._srv_sock = srv_sock + self._config = config + self.info(f"Config: {self._config}") + self._event_loop: asyncio.BaseEventLoop + self._solana: SolanaInteractor + self._db: MemDB + self._pickable_data_srv = None + mp.Process.__init__(self) + + def _init_in_proc(self): + self.info(f"Config: {self._config}") + self._event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._event_loop) + self._pickable_data_srv = PipePickableDataSrv(user=self, srv_sock=self._srv_sock) + self._solana = SolanaInteractor(self._config.get_solana_url()) + self._db = MemDB(self._solana) + + def execute_neon_tx(self, mempool_request: MPRequest): + with logging_context(req_id=mempool_request.req_id, exectr=self._id): + try: + self.execute_neon_tx_impl(mempool_request) + except Exception as err: + self.error(f"Failed to execute neon_tx: {err}") + return MPTxResult(MPResultCode.Unspecified, None) + return MPTxResult(MPResultCode.Done, None) + + def execute_neon_tx_impl(self, mempool_tx_cfg: MPRequest): + neon_tx = mempool_tx_cfg.neon_tx + neon_tx_cfg = mempool_tx_cfg.neon_tx_exec_cfg + emulating_result = mempool_tx_cfg.emulating_result + emv_step_count = self._config.get_evm_count() + tx_sender = NeonTxSender(self._db, self._solana, neon_tx, steps=emv_step_count) + with OperatorResourceList(tx_sender): + tx_sender.execute(neon_tx_cfg, emulating_result) + + async def on_data_received(self, data: Any) -> Any: + return self.execute_neon_tx(data) + + def run(self) -> None: + self._config = Config() + self._init_in_proc() + self._event_loop.run_forever() diff --git a/proxy/mempool/mempool_schedule.py b/proxy/mempool/mempool_schedule.py new file mode 100644 index 000000000..c766fa926 --- /dev/null +++ b/proxy/mempool/mempool_schedule.py @@ -0,0 +1,189 @@ +import bisect +from typing import List, Optional, Tuple + +from logged_groups import logged_group + +from .mempool_api import MPTxRequest + + +@logged_group("neon.MemPool") +class MPSenderTxPool: + def __init__(self, sender_address: str = None): + self.sender_address = sender_address + self._txs: List[MPTxRequest] = [] + self._processing_tx: Optional[MPTxRequest] = None + + def __eq__(self, other): + return self.first_tx_gas_price() == other.first_tx_gas_price() + + def __lt__(self, other): + return self.first_tx_gas_price() > other.first_tx_gas_price() + + def add_tx(self, mp_tx_request: MPTxRequest): + + index = bisect.bisect_left(self._txs, mp_tx_request) + if self._processing_tx is not None and mp_tx_request.nonce == self._processing_tx.nonce: + self.warn(f"Failed to replace processing tx: {self._processing_tx.log_str} with: {mp_tx_request.log_str}") + return + + found: MPTxRequest = self._txs[index] if index < len(self._txs) else None + if found is not None and found.nonce == mp_tx_request.nonce: + self.debug(f"Nonce are equal: {found.nonce}, found: {found.log_str}, new: {mp_tx_request.log_str}") + if found.gas_price < mp_tx_request.gas_price: + self._txs[index] = mp_tx_request + return + self._txs.insert(index, mp_tx_request) + self.debug(f"New mp_tx_request: {mp_tx_request.log_str} - inserted at: {index}") + + def get_tx(self): + return None if self.is_empty() else self._txs[0] + + def acquire_tx(self): + if self.is_processing(): + return None + self._processing_tx = self.get_tx() + return self._processing_tx + + def on_processed(self, tx: MPTxRequest): + assert tx == self._processing_tx, f"tx: {tx.log_str} != processing_tx: {self._processing_tx.log_str}" + self._processing_tx = None + self._txs.remove(tx) + + def len(self) -> int: + return len(self._txs) + + def first_tx_gas_price(self): + tx = self.get_tx() + return tx.gas_price if tx is not None else 0 + + def on_tx_done(self, nonce: int): + if self._processing_tx is None: + self.error(f"Failed to finish tx with nonce: {nonce}, processing tx is None") + return + if self._processing_tx.nonce != nonce: + self.error(f"Failed to finish tx, processing tx has different nonce: {self._processing_tx.nonce} than: {nonce}") + return + self._txs.remove(self._processing_tx) + self.debug(f"On tx done: {self._processing_tx.log_str} - removed. The: {self.len()} txs are left") + self._processing_tx = None + + def is_empty(self) -> bool: + return self.len() == 0 + + def is_processing(self) -> bool: + return self._processing_tx is not None + + def drop_last_request(self): + if self.is_empty(): + self.erorr("Failed to drop last request from empty sender tx pool") + return + if self._processing_tx is self._txs[-1]: + self.warning(f"Failed to drop last request away: {self._processing_tx.log_str} - processing") + return + self.debug(f"Remove last mp_tx_request from sender: {self.sender_address} - {self._txs[-1].log_str}") + self._txs = self._txs[:-1] + + def drop_request_away(self, mp_tx_request: MPTxRequest): + self.debug(f"Remove mp_tx_request: {mp_tx_request.log_str}") + nonce = mp_tx_request.nonce + if self._processing_tx is not None and self._processing_tx.nonce == nonce: + self._processing_tx = None + index = bisect.bisect_left(self._txs, mp_tx_request) + if self._txs[index].nonce != nonce: + self.error(f"Failed to drop reqeust away for: {self.sender_address}, not request with nonce: {nonce}") + return + self._txs = self._txs[index:] + self.debug(f"Removed mp_tx_request from sender: {self.sender_address} - {mp_tx_request.log_str}") + + +@logged_group("neon.MemPool") +class MPTxSchedule: + + def __init__(self, capacity: int) -> None: + self._capacity = capacity + self._sender_tx_pools: List[MPSenderTxPool] = [] + + def _pop_sender_txs(self, sender_address: str) -> Optional[MPSenderTxPool]: + for i, sender_tx_pool in enumerate(self._sender_tx_pools): + if sender_tx_pool.sender_address != sender_address: + continue + return self._sender_tx_pools.pop(i) + return None + + def _get_sender_txs(self, sender_address: str) -> Tuple[Optional[MPSenderTxPool], int]: + for i, sender in enumerate(self._sender_tx_pools): + if sender.sender_address != sender_address: + continue + return sender, i + return None, -1 + + def add_mp_tx_request(self, mp_tx_request: MPTxRequest): + self.debug(f"Add mp_tx_request: {mp_tx_request.log_str}") + sender_txs = self._pop_sender_or_create(mp_tx_request.sender_address) + self.debug(f"Got collection for sender: {mp_tx_request.sender_address}, there are already txs: {sender_txs.len()}") + sender_txs.add_tx(mp_tx_request) + bisect.insort_left(self._sender_tx_pools, sender_txs) + + self._check_oversized_and_reduce() + + def get_mp_tx_count(self): + count = 0 + for sender_txs in self._sender_tx_pools: + count += sender_txs.len() + return count + + def _check_oversized_and_reduce(self): + count = self.get_mp_tx_count() + tx_to_remove = count - self._capacity + sender_to_remove = [] + for sender in self._sender_tx_pools[::-1]: + if tx_to_remove <= 0: + break + sender.drop_last_request() + tx_to_remove -= 1 + if sender.len() == 1 and sender.is_processing(): + continue + if sender.is_empty(): + sender_to_remove.append(sender) + for sender in sender_to_remove: + self._sender_tx_pools.remove(sender) + + def _pop_sender_or_create(self, sender_address: str) -> MPSenderTxPool: + sender = self._pop_sender_txs(sender_address) + return MPSenderTxPool(sender_address=sender_address) if sender is None else sender + + def acquire_tx_for_execution(self) -> Optional[MPTxRequest]: + + if len(self._sender_tx_pools) == 0: + return None + + tx: Optional[MPTxRequest] = None + for sender_txs in self._sender_tx_pools: + if sender_txs.is_processing(): + continue + tx = sender_txs.acquire_tx() + break + + return tx + + def done(self, sender_addr: str, nonce: int): + sender = self._pop_sender_txs(sender_addr) + if sender is None: + self.error(f"Failed to make tx done, address: {sender_addr}, nonce: {nonce} - sender not found") + return + sender.on_tx_done(nonce) + if not sender.is_empty(): + bisect.insort_left(self._sender_tx_pools, sender) + + def get_pending_trx_count(self, sender_addr: str) -> int: + sender, _ = self._get_sender_txs(sender_addr) + return 0 if sender is None else sender.len() + + def drop_request_away(self, mp_tx_reqeust: MPTxRequest): + sender, i = self._get_sender_txs(mp_tx_reqeust.sender_address) + if sender is None: + self.warning(f"Failed drop request, no sender by sender_address: {mp_tx_reqeust.sender_address}") + return + sender.drop_request_away(mp_tx_reqeust) + if sender.len() == 0: + self.sender_tx_pools.pop(i) diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py new file mode 100644 index 000000000..896cd5ec3 --- /dev/null +++ b/proxy/mempool/mempool_service.py @@ -0,0 +1,44 @@ +from logged_groups import logged_group +import asyncio +from multiprocessing import Process +from typing import Any + +from ..common_neon.utils.pickable_data_server import AddrPickableDataSrv, IPickableDataServerUser +from ..common_neon.config import IConfig + +from .mempool import MemPool +from .executor_mng import MPExecutorMng, IMPExecutorMngUser + + +@logged_group("neon.MemPool") +class MPService(IPickableDataServerUser, IMPExecutorMngUser): + + MP_SERVICE_PORT = 9091 + MP_SERVICE_HOST = "0.0.0.0" + EXECUTOR_COUNT = 8 + + def __init__(self, config: IConfig): + self.event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.event_loop) + self._mempool_server = None + self._mempool = None + self._mp_executor_mng = None + self._process = Process(target=self.run) + self._config = config + + def start(self): + self.info("Run until complete") + self._process.start() + + async def on_data_received(self, data: Any) -> Any: + return await self._mempool.enqueue_mp_request(data) + + def run(self): + self._mempool_server = AddrPickableDataSrv(user=self, address=(self.MP_SERVICE_HOST, self.MP_SERVICE_PORT)) + self._mp_executor_mng = MPExecutorMng(self, self.EXECUTOR_COUNT, self._config) + self._mempool = MemPool(self._mp_executor_mng) + self.event_loop.run_until_complete(self._mp_executor_mng.async_init()) + self.event_loop.run_forever() + + def on_resource_released(self, resource_id: int): + self._mempool.on_resource_got_available(resource_id) diff --git a/proxy/neon_rpc_api_model/neon_tx_stages.py b/proxy/mempool/neon_tx_stages.py similarity index 97% rename from proxy/neon_rpc_api_model/neon_tx_stages.py rename to proxy/mempool/neon_tx_stages.py index 7f9dfa368..7d1b56207 100644 --- a/proxy/neon_rpc_api_model/neon_tx_stages.py +++ b/proxy/mempool/neon_tx_stages.py @@ -29,7 +29,7 @@ def build(self): pass -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCancelTxStage(NeonTxStage, abc.ABC): NAME = 'cancelWithNonce' @@ -78,7 +78,7 @@ def _create_account_with_seed(self): return self.s.builder.create_account_with_seed_instruction(self.sol_account, self._seed, self.balance, self.size) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreateAccountTxStage(NeonTxStage): NAME = 'createNeonAccount' @@ -98,7 +98,7 @@ def build(self): self.tx.add(self._create_account()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreateERC20TxStage(NeonTxStage, abc.ABC): NAME = 'createERC20Account' @@ -124,7 +124,7 @@ def build(self): self.tx.add(self._create_erc20_account()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreateContractTxStage(NeonCreateAccountWithSeedStage, abc.ABC): NAME = 'createNeonContract' @@ -150,7 +150,7 @@ def build(self): self.tx.add(self._create_account()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonResizeContractTxStage(NeonCreateAccountWithSeedStage, abc.ABC): NAME = 'resizeNeonContract' diff --git a/proxy/neon_rpc_api_model/operator_resource_list.py b/proxy/mempool/operator_resource_list.py similarity index 97% rename from proxy/neon_rpc_api_model/operator_resource_list.py rename to proxy/mempool/operator_resource_list.py index 4eb3b7c20..8334938b6 100644 --- a/proxy/neon_rpc_api_model/operator_resource_list.py +++ b/proxy/mempool/operator_resource_list.py @@ -23,9 +23,7 @@ from ..common_neon.environment_data import EVM_LOADER_ID, PERM_ACCOUNT_LIMIT, RECHECK_RESOURCE_LIST_INTERVAL, \ MIN_OPERATOR_BALANCE_TO_WARN, MIN_OPERATOR_BALANCE_TO_ERR -## TODO: DIP corruption, get rid of back dependency -# from .transaction_sender import NeonTxSender -from .neon_tx_stages import NeonCancelTxStage, NeonCreateAccountTxStage, NeonCreateAccountWithSeedStage +from ..mempool.neon_tx_stages import NeonCancelTxStage, NeonCreateAccountTxStage, NeonCreateAccountWithSeedStage class OperatorResourceInfo: @@ -44,7 +42,7 @@ def secret_key(self) -> bytes: return self.signer.secret_key() -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class OperatorResourceList: # These variables are global for class, they will be initialized one time _manager = mp.Manager() @@ -284,7 +282,7 @@ def free_resource_info(self): self._free_resource_list.append(resource.idx) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreatePermAccount(NeonCreateAccountWithSeedStage, abc.ABC): NAME = 'createPermAccount' diff --git a/proxy/neon_rpc_api_model/transaction_sender.py b/proxy/mempool/transaction_sender.py similarity index 93% rename from proxy/neon_rpc_api_model/transaction_sender.py rename to proxy/mempool/transaction_sender.py index e3df2499f..055cfc12f 100644 --- a/proxy/neon_rpc_api_model/transaction_sender.py +++ b/proxy/mempool/transaction_sender.py @@ -5,15 +5,14 @@ import time from logged_groups import logged_group -from typing import Dict, Optional, Any +from typing import Dict, Optional from solana.transaction import AccountMeta, Transaction, PublicKey from solana.blockhash import Blockhash -from .neon_tx_stages import NeonCreateAccountTxStage, NeonCreateERC20TxStage, NeonCreateContractTxStage, \ - NeonResizeContractTxStage +from ..mempool.neon_tx_stages import NeonCreateAccountTxStage, NeonCreateERC20TxStage, NeonCreateContractTxStage, \ + NeonResizeContractTxStage -from .operator_resource_list import OperatorResourceInfo from ..common_neon.compute_budget import TransactionWithComputeBudget from ..common_neon.neon_instruction import NeonInstruction as NeonIxBuilder from ..common_neon.solana_interactor import SolanaInteractor @@ -22,14 +21,16 @@ from ..common_neon.eth_proto import Trx as EthTx from ..common_neon.utils import NeonTxResultInfo, NeonTxInfo from ..common_neon.errors import EthereumError -from ..common_neon.types import NeonTxPrecheckResult, NeonEmulatingResult +from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult from ..common_neon.environment_data import RETRY_ON_FAIL from ..common_neon.elf_params import ElfParams from ..memdb.memdb import MemDB, NeonPendingTxInfo from ..common_neon.utils import get_holder_msg +from .operator_resource_list import OperatorResourceInfo + -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonTxSender: def __init__(self, db: MemDB, solana: SolanaInteractor, eth_tx: EthTx, steps: int): self._db = db @@ -61,10 +62,10 @@ def __init__(self, db: MemDB, solana: SolanaInteractor, eth_tx: EthTx, steps: in self._create_account_list = [] self._eth_meta_dict: Dict[str, AccountMeta] = dict() - def execute(self, precheck_result: NeonTxPrecheckResult) -> NeonTxResultInfo: + def execute(self, exec_cfg: NeonTxExecCfg, emulating_result: NeonEmulatingResult) -> NeonTxResultInfo: self._validate_pend_tx() - self._prepare_execution(precheck_result.emulating_result) - return self._execute(precheck_result) + self._prepare_execution(emulating_result) + return self._execute(exec_cfg) def set_resource(self, resource: Optional[OperatorResourceInfo]): self.resource = resource @@ -82,11 +83,11 @@ def _validate_pend_tx(self): self._pending_tx = NeonPendingTxInfo(neon_sign=self.neon_sign, operator=operator, slot=0) self._pend_tx_into_db(self.solana.get_recent_blockslot()) - def _execute(self, precheck_result: NeonTxPrecheckResult): + def _execute(self, exec_cfg: NeonTxExecCfg): for Strategy in [SimpleNeonTxStrategy, IterativeNeonTxStrategy, HolderNeonTxStrategy, NoChainIdNeonTxStrategy]: try: - strategy = Strategy(precheck_result, self) + strategy = Strategy(exec_cfg, self) if not strategy.is_valid: self.debug(f'Skip strategy {Strategy.NAME}: {strategy.error}') continue @@ -204,12 +205,12 @@ def done_account_tx_list(self, skip_create_accounts=False): self.create_account_tx.instructions.clear() -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class BaseNeonTxStrategy(metaclass=abc.ABCMeta): NAME = 'UNKNOWN STRATEGY' - def __init__(self, precheck_result: NeonTxPrecheckResult, neon_tx_sender: NeonTxSender): - self._precheck_result = precheck_result + def __init__(self, exec_cfg: NeonTxExecCfg, neon_tx_sender: NeonTxSender): + self._neon_tx_exec_cfg = exec_cfg self.is_valid = False self.error = None self.s = neon_tx_sender @@ -251,14 +252,14 @@ def _validate_txsize(self) -> bool: raise def _validate_gas_limit(self): - if not self._precheck_result.is_underpriced_tx_without_chainid: + if not self._neon_tx_exec_cfg.is_underpriced_tx_without_chainid: return True self.error = "Underpriced transaction without chain-id" return False -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class SimpleNeonTxSender(SolTxListSender): def __init__(self, strategy: BaseNeonTxStrategy, *args, **kwargs): SolTxListSender.__init__(self, *args, **kwargs) @@ -281,7 +282,7 @@ def _on_post_send(self): raise RuntimeError('Run out of attempts to execute transaction') -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class SimpleNeonTxStrategy(BaseNeonTxStrategy, abc.ABC): NAME = 'CallFromRawEthereumTX' IS_SIMPLE = True @@ -302,7 +303,7 @@ def _validate(self) -> bool: return self._validate_txsize() def _validate_steps(self) -> bool: - steps_emulated = self._precheck_result.emulating_result["steps_executed"] + steps_emulated = self._neon_tx_exec_cfg.steps_executed if steps_emulated > self.steps: self.error = 'Too big number of EVM steps' return False @@ -328,7 +329,7 @@ def execute(self) -> (NeonTxResultInfo, [str]): return tx_sender.neon_res, tx_sender.success_sign_list -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class IterativeNeonTxSender(SimpleNeonTxSender): def __init__(self, *args, **kwargs): SimpleNeonTxSender.__init__(self, *args, **kwargs) @@ -442,7 +443,7 @@ def _on_post_send(self): self._tx_list.append(self._strategy.build_tx()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class IterativeNeonTxStrategy(BaseNeonTxStrategy, abc.ABC): NAME = 'PartialCallOrContinueFromRawEthereumTX' IS_SIMPLE = False @@ -457,7 +458,7 @@ def _validate(self) -> bool: self._validate_gas_limit()) def _validate_evm_steps(self): - if self._precheck_result.emulating_result["steps_executed"] > (self.s.steps * 25): + if self._neon_tx_exec_cfg.steps_executed > (self.s.steps * 25): self.error = 'Big number of EVM steps' return False return True @@ -479,7 +480,7 @@ def execute(self) -> (NeonTxResultInfo, [str]): SolTxListSender(self.s, tx_list, self._preparation_txs_name).send(signer) self.s.done_account_tx_list() - steps_emulated = self._precheck_result.emulating_result["steps_executed"] + steps_emulated = self._neon_tx_exec_cfg.steps_executed cnt = math.ceil(steps_emulated / self.steps) cnt = math.ceil(steps_emulated / (self.steps - cnt)) if steps_emulated > 200: @@ -491,7 +492,7 @@ def execute(self) -> (NeonTxResultInfo, [str]): return tx_sender.neon_res, tx_sender.success_sign_list -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class HolderNeonTxStrategy(IterativeNeonTxStrategy, abc.ABC): NAME = 'ExecuteTrxFromAccountDataIterativeOrContinue' @@ -531,7 +532,7 @@ def _build_preparation_tx_list(self) -> [TransactionWithComputeBudget]: return tx_list -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NoChainIdNeonTxStrategy(HolderNeonTxStrategy, abc.ABC): NAME = 'ExecuteTrxFromAccountDataIterativeOrContinueNoChainId' @@ -539,7 +540,7 @@ def __init__(self, *args, **kwargs): HolderNeonTxStrategy.__init__(self, *args, **kwargs) def _validate(self) -> bool: - if not self._precheck_result.is_underpriced_tx_without_chainid: + if not self._neon_tx_exec_cfg.is_underpriced_tx_without_chainid: self.error = 'Normal transaction' return False diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py index 48b832af9..32f29530f 100644 --- a/proxy/neon_proxy_app.py +++ b/proxy/neon_proxy_app.py @@ -1,10 +1,17 @@ from .proxy import entry_point +from .mempool.mempool_service import MPService from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer +from .common_neon.config import Config class NeonProxyApp: + def __init__(self): + self._config = Config() + self._mempool_service = MPService(self._config) + def start(self): + self._mempool_service.start() PrometheusProxyServer() entry_point() diff --git a/proxy/neon_rpc_api_model/__init__.py b/proxy/neon_rpc_api_model/__init__.py index 16b7421f3..7462b84f6 100644 --- a/proxy/neon_rpc_api_model/__init__.py +++ b/proxy/neon_rpc_api_model/__init__.py @@ -1 +1 @@ -from . neon_rpc_api_model import NeonRpcApiModel, NEON_PROXY_PKG_VERSION, NEON_PROXY_REVISION +from .neon_rcp_api_worker import NeonRpcApiWorker, NEON_PROXY_PKG_VERSION, NEON_PROXY_REVISION diff --git a/proxy/neon_rpc_api_model/neon_rpc_api_model.py b/proxy/neon_rpc_api_model/neon_rcp_api_worker.py similarity index 91% rename from proxy/neon_rpc_api_model/neon_rpc_api_model.py rename to proxy/neon_rpc_api_model/neon_rcp_api_worker.py index 4c6044bc1..b4e4de513 100644 --- a/proxy/neon_rpc_api_model/neon_rpc_api_model.py +++ b/proxy/neon_rpc_api_model/neon_rcp_api_worker.py @@ -1,33 +1,32 @@ import json import multiprocessing import traceback - import eth_utils -from typing import Optional, Union + +from typing import Optional, Union, Tuple import sha3 -from logged_groups import logged_group +from logged_groups import logged_group, LogMng from web3.auto import w3 from ..common_neon.address import EthereumAddress -from ..common_neon.emulator_interactor import call_emulated, call_trx_emulated +from ..common_neon.emulator_interactor import call_emulated from ..common_neon.errors import EthereumError, InvalidParamError, PendingTxError from ..common_neon.estimate import GasEstimate from ..common_neon.eth_proto import Trx as EthTrx from ..common_neon.keys_storage import KeyStorage from ..common_neon.solana_interactor import SolanaInteractor from ..common_neon.utils import SolanaBlockInfo -from ..common_neon.types import NeonTxPrecheckResult, NeonEmulatingResult +from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult +from ..common_neon.gas_price_calculator import GasPriceCalculator from ..common_neon.elf_params import ElfParams from ..common_neon.environment_utils import neon_cli from ..common_neon.environment_data import SOLANA_URL, PP_SOLANA_URL, EVM_STEP_COUNT, USE_EARLIEST_BLOCK_IF_0_PASSED, \ PYTH_MAPPING_ACCOUNT from ..memdb.memdb import MemDB -from ..common_neon.gas_price_calculator import GasPriceCalculator from ..statistics_exporter.proxy_metrics_interface import StatisticsExporter +from ..mempool import MemPoolClient, MP_SERVICE_HOST, MP_SERVICE_PORT -from .transaction_sender import NeonTxSender -from .operator_resource_list import OperatorResourceList from .transaction_validator import NeonTxValidator NEON_PROXY_PKG_VERSION = '0.7.21-dev' @@ -44,13 +43,14 @@ def default(self, obj): @logged_group("neon.Proxy") -class NeonRpcApiModel: +class NeonRpcApiWorker: proxy_id_glob = multiprocessing.Value('i', 0) def __init__(self): self._solana = SolanaInteractor(SOLANA_URL) self._db = MemDB(self._solana) self._stat_exporter: Optional[StatisticsExporter] = None + self._mempool_client = MemPoolClient(MP_SERVICE_HOST, MP_SERVICE_PORT) if PP_SOLANA_URL == SOLANA_URL: self.gas_price_calculator = GasPriceCalculator(self._solana, PYTH_MAPPING_ACCOUNT) @@ -215,8 +215,8 @@ def eth_getBalance(self, account: str, tag: str) -> str: return hex(0) return hex(neon_account_info.balance) - except (Exception,): - # self.debug(f"eth_getBalance: Can't get account info: {err}") + except (Exception,) as err: + self.debug(f"eth_getBalance: Can't get account info: {err}") return hex(0) def eth_getLogs(self, obj): @@ -384,11 +384,21 @@ def eth_call(self, obj: dict, tag: str) -> str: def eth_getTransactionCount(self, account: str, tag: str) -> str: self._validate_block_tag(tag) - account = self._normalize_account(account) + account = self._normalize_account(account).lower() try: + self.debug(f"Get transaction count. Account: {account}, tag: {tag}") neon_account_info = self._solana.get_neon_account_info(account) - return hex(neon_account_info.trx_count) + + pending_trx_count = 0 + if tag == "pending": + req_id = LogMng.get_logging_context().get("req_id") + pending_trx_count = self._mempool_client.get_pending_tx_count(req_id=req_id, sender=account) + self.debug(f"Pending tx count for: {account} - is: {pending_trx_count}") + + trx_count = neon_account_info.trx_count + pending_trx_count + + return hex(trx_count) except (Exception,): # self.debug(f"eth_getTransactionCount: Can't get account info: {err}") return hex(0) @@ -479,33 +489,35 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str: self._stat_tx_begin() try: - neon_tx_precheck_result = self.precheck(trx) - - tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT) - with OperatorResourceList(tx_sender): - tx_sender.execute(neon_tx_precheck_result) + neon_tx_cfg, emulating_result = self.precheck(trx) self._stat_tx_success() + req_id = LogMng.get_logging_context().get("req_id") + + self._mempool_client.send_raw_transaction(req_id=req_id, + signature=eth_signature, + neon_tx=trx, + neon_tx_exec_cfg=neon_tx_cfg, + emulating_result=emulating_result) return eth_signature - except PendingTxError as err: + except PendingTxError: self._stat_tx_failed() - self.debug(f'{err}') + self.error(f'Failed to process eth_sendRawTransaction, PendingTxError') return eth_signature - except EthereumError: + except EthereumError as err: + self.error(f'Failed to process eth_sendRawTransaction, EthereumError: {err}') self._stat_tx_failed() raise - except Exception: + except Exception as err: + self.error(f"Failed to process eth_sendRawTransaction, Error: {err}") self._stat_tx_failed() raise - def precheck(self, neon_trx: EthTrx) -> NeonTxPrecheckResult: - + def precheck(self, neon_trx: EthTrx) -> Tuple[NeonTxExecCfg, NeonEmulatingResult]: min_gas_price = self.gas_price_calculator.get_min_gas_price() neon_validator = NeonTxValidator(self._solana, neon_trx, min_gas_price) - precheck_result = neon_validator.precheck() - - return precheck_result + return neon_validator.precheck() def _stat_tx_begin(self): self._stat_exporter.stat_commit_tx_begin() diff --git a/proxy/neon_rpc_api_model/transaction_validator.py b/proxy/neon_rpc_api_model/transaction_validator.py index 8c99048b5..d85be1b23 100644 --- a/proxy/neon_rpc_api_model/transaction_validator.py +++ b/proxy/neon_rpc_api_model/transaction_validator.py @@ -1,5 +1,5 @@ from __future__ import annotations - +from typing import Tuple from logged_groups import logged_group from ..common_neon.eth_proto import Trx as EthTx @@ -9,12 +9,12 @@ from ..common_neon.solana_receipt_parser import SolReceiptParser from ..common_neon.solana_interactor import SolanaInteractor from ..common_neon.estimate import GasEstimate +from ..common_neon.emulator_interactor import call_trx_emulated from ..common_neon.elf_params import ElfParams from ..common_neon.environment_data import ACCOUNT_PERMISSION_UPDATE_INT, ALLOW_UNDERPRICED_TX_WITHOUT_CHAINID -from ..common_neon.emulator_interactor import call_trx_emulated -from ..common_neon.types import NeonTxPrecheckResult, NeonEmulatingResult +from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult @logged_group("neon.Proxy") @@ -58,16 +58,16 @@ def is_underpriced_tx_without_chainid(self) -> bool: return False return (self._tx.gasPrice < self._min_gas_price) or (self._tx.gasLimit < self._estimated_gas) - def precheck(self) -> NeonTxPrecheckResult: + def precheck(self) -> Tuple[NeonTxExecCfg, NeonEmulatingResult]: try: self._prevalidate_tx() emulating_result: NeonEmulatingResult = call_trx_emulated(self._tx) self._prevalidate_emulator(emulating_result) is_underpriced_tx_without_chainid = self.is_underpriced_tx_without_chainid() - precheck_result = NeonTxPrecheckResult(emulating_result=emulating_result, - is_underpriced_tx_without_chainid=is_underpriced_tx_without_chainid) - return precheck_result + neon_tx_exec_cfg = NeonTxExecCfg(steps_executed=emulating_result["steps_executed"], + is_underpriced_tx_without_chainid=is_underpriced_tx_without_chainid) + return neon_tx_exec_cfg, emulating_result except Exception as e: self.extract_ethereum_error(e) @@ -75,16 +75,15 @@ def precheck(self) -> NeonTxPrecheckResult: def _prevalidate_tx(self): self._prevalidate_whitelist() - self._prevalidate_tx_nonce() self._prevalidate_tx_gas() self._prevalidate_tx_chain_id() self._prevalidate_tx_size() self._prevalidate_sender_balance() + self._prevalidate_underpriced_tx_without_chainid() def _prevalidate_emulator(self, emulator_json: dict): self._prevalidate_gas_usage(emulator_json) self._prevalidate_account_sizes(emulator_json) - self._prevalidate_underpriced_tx_without_chainid() def extract_ethereum_error(self, e: Exception): receipt_parser = SolReceiptParser(e) @@ -107,6 +106,7 @@ def _prevalidate_tx_gas(self): raise EthereumError(message='gas uint64 overflow') if (self._tx_gas_limit * self._tx.gasPrice) > (self.MAX_U256 - 1): raise EthereumError(message='max fee per gas higher than 2^256-1') + if self._tx.gasPrice >= self._min_gas_price: return @@ -123,17 +123,6 @@ def _prevalidate_tx_size(self): if len(self._tx.callData) > (128 * 1024 - 1024): raise EthereumError(message='transaction size is too big') - def _prevalidate_tx_nonce(self): - if not self._neon_account_info: - return - - tx_nonce = int(self._tx.nonce) - if self.MAX_U64 not in (self._neon_account_info.trx_count, tx_nonce): - if tx_nonce == self._neon_account_info.trx_count: - return - - self._raise_nonce_error(self._neon_account_info.trx_count, tx_nonce) - def _prevalidate_sender_eoa(self): if not self._neon_account_info: return @@ -196,7 +185,6 @@ def _prevalidate_account_sizes(emulator_json: dict): raise EthereumError(f"contract {account_desc['address']} " + f"requests a size increase to more than 9.5Mb") - def _raise_nonce_error(self, account_tx_count: int, tx_nonce: int): if self.MAX_U64 in (account_tx_count, tx_nonce): message = 'nonce has max value' diff --git a/proxy/plugin/neon_rpc_api_plugin.py b/proxy/plugin/neon_rpc_api_plugin.py index 76201a11f..f753e0ebe 100644 --- a/proxy/plugin/neon_rpc_api_plugin.py +++ b/proxy/plugin/neon_rpc_api_plugin.py @@ -25,7 +25,7 @@ from ..common_neon.solana_receipt_parser import SolTxError from ..common_neon.errors import EthereumError from ..common_neon.environment_data import ENABLE_PRIVATE_API -from ..neon_rpc_api_model import NeonRpcApiModel +from ..neon_rpc_api_model import NeonRpcApiWorker from ..statistics_exporter.prometheus_proxy_exporter import PrometheusExporter modelInstanceLock = threading.Lock() @@ -54,7 +54,7 @@ def getModel(cls): global modelInstance with modelInstanceLock: if modelInstance is None: - modelInstance = NeonRpcApiModel() + modelInstance = NeonRpcApiWorker() return modelInstance def routes(self) -> List[Tuple[int, str]]: diff --git a/proxy/testing/test_eth_getLogs.py b/proxy/testing/test_eth_getLogs.py index 12795d8d2..a92e212d7 100644 --- a/proxy/testing/test_eth_getLogs.py +++ b/proxy/testing/test_eth_getLogs.py @@ -46,6 +46,7 @@ class Test_eth_getLogs(unittest.TestCase): + @classmethod def setUpClass(cls): print("\n\n") @@ -102,10 +103,9 @@ def commit_transactions(self): self.commit_two_event_trx(self, 5, 6) self.commit_no_event_trx(self, 7, 8) self.commit_no_event_trx(self, 9, 0) - pass def commit_one_event_trx(self, x, y) -> None: - print("\ncommit_one_event_trx") + print(f"\ncommit_one_event_trx. x: {x}, y: {y}") right_nonce = proxy.eth.get_transaction_count(proxy.eth.default_account) trx_store = self.storage_contract.functions.addReturnEvent(x, y).buildTransaction({'nonce': right_nonce}) trx_store_signed = proxy.eth.account.sign_transaction(trx_store, eth_account.key) @@ -120,7 +120,7 @@ def commit_one_event_trx(self, x, y) -> None: self.topics.append(topic.hex()) def commit_two_event_trx(self, x, y) -> None: - print("\ncommit_two_event_trx") + print(f"\ncommit_two_event_trx. x: {x}, y: {y}") right_nonce = proxy.eth.get_transaction_count(proxy.eth.default_account) trx_store = self.storage_contract.functions.addReturnEventTwice(x, y).buildTransaction({'nonce': right_nonce}) trx_store_signed = proxy.eth.account.sign_transaction(trx_store, eth_account.key) @@ -146,7 +146,6 @@ def commit_no_event_trx(self, x, y) -> None: self.block_hashes_no_event.append(trx_store_receipt['blockHash'].hex()) self.block_numbers_no_event.append(hex(trx_store_receipt['blockNumber'])) - def test_get_logs_by_blockHash(self): print("\ntest_get_logs_by_blockHash") receipts = proxy.eth.get_logs({'blockHash': self.block_hashes[0]}) @@ -155,24 +154,25 @@ def test_get_logs_by_blockHash(self): def test_get_no_logs_by_blockHash(self): print("\ntest_get_no_logs_by_blockHash") - receipts = proxy.eth.get_logs({'blockHash': self.block_hashes_no_event[0]}) + receipts = proxy.eth.get_logs({'blockHash': self.block_hashes_no_event[0], + 'address': self.storage_contract.address}) print('receipts: ', receipts) self.assertEqual(len(receipts), 0) def test_get_logs_by_fromBlock(self): - print("\ntest_get_logs_by_fromBlock") - receipts = proxy.eth.get_logs({'fromBlock': self.block_numbers[2]}) + from_block = self.block_numbers[2] + print(f"\ntest_get_logs_by_fromBlock: {from_block}, by storage contract address: {self.storage_contract.address}") + receipts = proxy.eth.get_logs({'fromBlock': from_block, + 'address': self.storage_contract.address}) print('receipts: ', receipts) self.assertEqual(len(receipts), 4) def test_get_logs_complex_request(self): print("\ntest_get_logs_complex_request") - receipts = proxy.eth.get_logs({ - 'fromBlock': 0, - 'toBlock': 'latest', - 'address': self.storage_contract.address, - 'topics': self.topics, - }) + receipts = proxy.eth.get_logs({'fromBlock': 0, + 'toBlock': 'latest', + 'address': self.storage_contract.address, + 'topics': self.topics}) print('receipts: ', receipts) self.assertEqual(len(receipts), 6) @@ -182,5 +182,6 @@ def test_get_logs_by_address(self): print('receipts: ', receipts) self.assertEqual(len(receipts), 6) + if __name__ == '__main__': unittest.main() diff --git a/proxy/testing/test_eth_sendRawTransaction.py b/proxy/testing/test_eth_sendRawTransaction.py index 109c1cf3c..2b8caa5e4 100644 --- a/proxy/testing/test_eth_sendRawTransaction.py +++ b/proxy/testing/test_eth_sendRawTransaction.py @@ -222,34 +222,6 @@ def test_03_execute_with_low_gas(self): message = 'gas limit reached' self.assertEqual(response['message'][:len(message)], message) - # @unittest.skip("a.i.") - def test_04_execute_with_bad_nonce(self): - test_nonce_list = [ - ('grade_up_one', 1, 'nonce too high:'), - ('grade_down_one', -1, 'nonce too low: ') - ] - for name, offset, message in test_nonce_list: - with self.subTest(name=name): - print("\ntest_04_execute_with_bad_nonce {} offsets".format(offset)) - bad_nonce = offset + proxy.eth.get_transaction_count(proxy.eth.default_account) - trx_store = self.storage_contract.functions.store(147).buildTransaction({'nonce': bad_nonce}) - print('trx_store:', trx_store) - trx_store_signed = proxy.eth.account.sign_transaction(trx_store, eth_account.key) - print('trx_store_signed:', trx_store_signed) - try: - trx_store_hash = proxy.eth.send_raw_transaction(trx_store_signed.rawTransaction) - print('trx_store_hash:', trx_store_hash) - self.assertTrue(False) - except Exception as e: - print('type(e):', type(e)) - print('e:', e) - response = json.loads(str(e).replace('\'', '\"').replace('None', 'null')) - print('response:', response) - print('code:', response['code']) - self.assertEqual(response['code'], -32002) - print('message:', response['message']) - self.assertEqual(response['message'][:len(message)], message) - # @unittest.skip("a.i.") def test_05_transfer_one_gwei(self): print("\ntest_05_transfer_one_gwei") diff --git a/proxy/testing/test_mempool.py b/proxy/testing/test_mempool.py new file mode 100644 index 000000000..43c5e9b5f --- /dev/null +++ b/proxy/testing/test_mempool.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +import asyncio +import logging +from random import randint + +import secrets + +from web3 import Web3, Account +from typing import Tuple, Any, List, Dict + +import unittest +from unittest.mock import patch, MagicMock, call + + +from ..mempool.mempool import MemPool, IMPExecutor +from ..mempool.mempool_api import NeonTxExecCfg, MPRequest, MPTxRequest +from ..mempool.mempool_schedule import MPTxSchedule, MPSenderTxPool +from ..common_neon.eth_proto import Trx as NeonTx + +from ..mempool.mempool_api import MPTxResult, MPResultCode + + +def create_account() -> Account: + private_key = "0x" + secrets.token_hex(32) + return Account.from_key(private_key) + + +def get_transfer_mp_request(*, req_id: str, nonce: int, gas: int, gasPrice: int, from_acc: Account = None, + to_acc: Account = None, value: int = 0, data: bytes = b'') -> MPTxRequest: + if from_acc is None: + from_acc = create_account() + + if to_acc is None: + to_acc = create_account() + to_addr = to_acc.address + w3 = Web3() + signed_tx_data = w3.eth.account.sign_transaction( + dict(nonce=nonce, chainId=111, gas=gas, gasPrice=gasPrice, to=to_addr, value=value, data=data), + from_acc.key + ) + signature = signed_tx_data.hash.hex() + neon_tx = NeonTx.fromString(bytearray(signed_tx_data.rawTransaction)) + tx_cfg = NeonTxExecCfg(is_underpriced_tx_without_chainid=False, steps_executed=100) + mp_tx_request = MPTxRequest(req_id=req_id, signature=signature, neon_tx=neon_tx, neon_tx_exec_cfg=tx_cfg, + emulating_result=dict()) + return mp_tx_request + + +class MockTask: + + def __init__(self, result: Any, is_done: bool = True, exception: Exception = None): + self._result = result + self._is_done = is_done + self._exception = exception + + def done(self): + return self._is_done + + def result(self): + return self._result + + def exception(self): + return self._exception + + +class MockMPExecutor(IMPExecutor): + + def submit_mp_request(self, mp_reqeust: MPRequest) -> Tuple[int, MockTask]: + return 1, MockTask(MPTxResult(MPResultCode.Done, None)) + + def is_available(self) -> bool: + return False + + def on_no_liquidity(self, resource_id: int): + pass + + def release_resource(self, resource_id: int): + pass + + +class TestMemPool(unittest.IsolatedAsyncioTestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.turn_logger_off() + + @classmethod + def turn_logger_off(cls) -> None: + neon_logger = logging.getLogger("neon.MemPool") + neon_logger.setLevel(logging.ERROR) + + async def asyncSetUp(self): + self._executor = MockMPExecutor() + self._mempool = MemPool(self._executor) + + @patch.object(MockMPExecutor, "submit_mp_request") + @patch.object(MockMPExecutor, "is_available", return_value=True) + async def test_single_sender_single_tx(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if an enqueued mp_tx_request gets in effect""" + mp_tx_request = get_transfer_mp_request(req_id="0000001", nonce=0, gasPrice=30000, gas=987654321, value=1, data=b'') + await self._mempool.enqueue_mp_request(mp_tx_request) + await asyncio.sleep(0) + + submit_mp_request_mock.assert_called_once() + submit_mp_request_mock.assert_called_with(mp_tx_request) + + @patch.object(MockMPExecutor, "submit_mp_request", return_value=(1, MockTask(MPTxResult(MPResultCode.Done, None)))) + @patch.object(MockMPExecutor, "is_available", return_value=False) + async def test_single_sender_couple_txs(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if an enqueued mp_tx_requests get in effect in the right order""" + from_acc = create_account() + to_acc = create_account() + req_data = [dict(req_id="0000000", nonce=0, gasPrice=30000, gas=987654321, value=1, from_acc=from_acc, to_acc=to_acc), + dict(req_id="0000001", nonce=1, gasPrice=29000, gas=987654321, value=1, from_acc=from_acc, to_acc=to_acc)] + requests = await self._enqueue_requests(req_data) + await asyncio.sleep(0) + submit_mp_request_mock.assert_not_called() + is_available_mock.return_value = True + self._mempool.on_resource_got_available(1) + await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC * 10) + + submit_mp_request_mock.assert_has_calls([call(requests[0]), call(requests[1])]) + + @patch.object(MockMPExecutor, "submit_mp_request", return_value=(1, MockTask(MPTxResult(MPResultCode.Done, None)))) + @patch.object(MockMPExecutor, "is_available", return_value=False) + async def test_2_senders_4_txs(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if an enqueued mp_tx_request from different senders gets in effect in the right order""" + acc = [create_account() for i in range(3)] + req_data = [dict(req_id="000", nonce=0, gasPrice=30000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[2]), + dict(req_id="001", nonce=1, gasPrice=21000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[2]), + dict(req_id="002", nonce=0, gasPrice=40000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2]), + dict(req_id="003", nonce=1, gasPrice=25000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2])] + requests = await self._enqueue_requests(req_data) + is_available_mock.return_value = True + self._mempool.on_resource_got_available(1) + await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC * 2) + + submit_mp_request_mock.assert_has_calls([call(requests[2]), call(requests[0]), call(requests[3]), call(requests[1])]) + + @patch.object(MockMPExecutor, "submit_mp_request") + @patch.object(MockMPExecutor, "is_available") + async def test_mp_waits_for_previous_tx_done(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if an enqueued mp_tx_request waits for the previous one from the same sender""" + submit_mp_request_mock.return_value = (1, MockTask(None, is_done=False)) + is_available_mock.return_value = False + acc_0 = create_account() + acc_1 = create_account() + req_data = [dict(req_id="000", nonce=0, gasPrice=10000, gas=1000, value=1, from_acc=acc_0, to_acc=acc_1), + dict(req_id="001", nonce=1, gasPrice=10000, gas=1500, value=2, from_acc=acc_0, to_acc=acc_1)] + requests = await self._enqueue_requests(req_data) + is_available_mock.return_value = True + for i in range(2): + await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC) + self._mempool.on_resource_got_available(1) + submit_mp_request_mock.assert_called_once_with(requests[0]) + + @patch.object(MockMPExecutor, "submit_mp_request") + @patch.object(MockMPExecutor, "is_available") + async def test_subst_with_higher_gas_price(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if the transaction with the same nonce but the higher gasPrice substitutes the current one""" + from_acc = create_account() + base_request = get_transfer_mp_request(req_id="0", from_acc=from_acc, nonce=0, gasPrice=30000, gas=987654321, value=1, data=b'') + await self._mempool._schedule_mp_tx_request(base_request) + subst_request = get_transfer_mp_request(req_id="1", from_acc=from_acc, nonce=0, gasPrice=40000, gas=987654321, value=2, data=b'') + await self._mempool._schedule_mp_tx_request(subst_request) + is_available_mock.return_value = True + self._mempool.on_resource_got_available(1) + await asyncio.sleep(0) + submit_mp_request_mock.assert_called_once() + submit_mp_request_mock.assert_called_with(subst_request) + + @patch.object(MockMPExecutor, "submit_mp_request") + @patch.object(MockMPExecutor, "is_available") + async def test_subst_with_lower_gas_price(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if the transaction with the same nonce but the lower gasPrice is ignored""" + from_acc = create_account() + base_request = get_transfer_mp_request(req_id="0", from_acc=from_acc, nonce=0, gasPrice=40000, gas=987654321, value=1, data=b'') + await self._mempool._schedule_mp_tx_request(base_request) + subst_request = get_transfer_mp_request(req_id="1", from_acc=from_acc, nonce=0, gasPrice=30000, gas=987654321, value=2, data=b'') + await self._mempool._schedule_mp_tx_request(subst_request) + is_available_mock.return_value = True + self._mempool.on_resource_got_available(1) + await asyncio.sleep(0) + submit_mp_request_mock.assert_called_once() + submit_mp_request_mock.assert_called_with(base_request) + + @patch.object(MockMPExecutor, "is_available") + async def test_check_pending_tx_count(self, is_available_mock: MagicMock): + """Checks if all incoming mp_tx_requests those are not processed are counted as pending""" + acc = [create_account() for i in range(3)] + req_data = [dict(req_id="000", nonce=0, gasPrice=30000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[2]), + dict(req_id="001", nonce=1, gasPrice=21000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[2]), + dict(req_id="002", nonce=0, gasPrice=40000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2]), + dict(req_id="003", nonce=1, gasPrice=25000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2]), + dict(req_id="004", nonce=2, gasPrice=25000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2])] + requests = await self._enqueue_requests(req_data) + acc_0_count = self._mempool.get_pending_trx_count(requests[0].sender_address) + self.assertEqual(acc_0_count, 2) + acc_1_count = self._mempool.get_pending_trx_count(requests[3].sender_address) + self.assertEqual(acc_1_count, 3) + is_available_mock.return_value = True + self._mempool.on_resource_got_available(1) + await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC) + acc_1_count = self._mempool.get_pending_trx_count(requests[3].sender_address) + self.assertEqual(acc_1_count, 2) + + @patch.object(MockMPExecutor, "submit_mp_request", return_value=(1, MockTask(MPTxResult(MPResultCode.Done, None)))) + @patch.object(MockMPExecutor, "is_available") + async def test_over_9000_transfers(self, is_available_mock: MagicMock, submit_mp_request_mock: MagicMock): + """Checks if all mp_tx_requests are processed by the MemPool""" + acc_count_max = 1_000 + from_acc_count = 10 + sleep_sec = 2 + nonce_count = 100 + req_count = from_acc_count * nonce_count + acc = [create_account() for i in range(acc_count_max)] + for acc_i in range(0, from_acc_count): + nonces = [i for i in range(0, nonce_count)] + while len(nonces) > 0: + index = randint(0, len(nonces) - 1) + nonce = nonces.pop(index) + request = get_transfer_mp_request(from_acc=acc[acc_i], to_acc=acc[randint(0, acc_count_max-1)], + req_id=str(acc_i) + " " + str(nonce), nonce=nonce, + gasPrice=randint(50000, 100000), gas=randint(4000, 10000)) + await self._mempool.enqueue_mp_request(request) + is_available_mock.return_value = True + self._mempool.on_resource_got_available(1) + await asyncio.sleep(sleep_sec) + for ac in acc[:from_acc_count]: + acc_nonce = 0 + for call in submit_mp_request_mock.call_args_list: + request = call.args[0] + if ac.address.lower() == request.sender_address: + self.assertEqual(request.nonce, acc_nonce) + acc_nonce += 1 + + self.assertEqual(submit_mp_request_mock.call_count, req_count) + + async def _enqueue_requests(self, req_data: List[Dict[str, Any]]) -> List[MPTxRequest]: + requests = [get_transfer_mp_request(**req) for req in req_data] + for req in requests: + await self._mempool.enqueue_mp_request(req) + return requests + + +class TestMPSchedule(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.turn_logger_off() + + @classmethod + def turn_logger_off(cls) -> None: + neon_logger = logging.getLogger("neon.MemPool") + neon_logger.setLevel(logging.ERROR) + + def test_capacity_oversized_simple(self): + """Checks if mp_schedule doesn't get oversized in simple way""" + mp_schedule_capacity = 3 + schedule = MPTxSchedule(mp_schedule_capacity) + acc = [create_account() for i in range(3)] + req_data = [dict(req_id="000", nonce=0, gasPrice=30000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[1]), + dict(req_id="001", nonce=0, gasPrice=25000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2]), + dict(req_id="002", nonce=1, gasPrice=30000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[2]), + dict(req_id="003", nonce=1, gasPrice=25000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[1]), + dict(req_id="004", nonce=2, gasPrice=25000, gas=1000, value=1, from_acc=acc[1], to_acc=acc[2]), + dict(req_id="005", nonce=0, gasPrice=50000, gas=1000, value=1, from_acc=acc[2], to_acc=acc[1]), + dict(req_id="006", nonce=1, gasPrice=50000, gas=1000, value=1, from_acc=acc[2], to_acc=acc[1]), + ] + self.requests = [get_transfer_mp_request(**req) for req in req_data] + for request in self.requests: + schedule.add_mp_tx_request(request) + self.assertEqual(2, len(schedule._sender_tx_pools)) + self.assertEqual(1, schedule.get_pending_trx_count(acc[0].address.lower())) + self.assertEqual(0, schedule.get_pending_trx_count(acc[1].address.lower())) + self.assertEqual(2, schedule.get_pending_trx_count(acc[2].address.lower())) + + def test_capacity_oversized(self): + """Checks if mp_schedule doesn't get oversized with a quite big set of mp_tx_requests""" + + acc_count_max = 10 + from_acc_count = 5 + nonce_count = 1000 + mp_schedule_capacity = 4000 + schedule = MPTxSchedule(mp_schedule_capacity) + acc = [create_account() for i in range(acc_count_max)] + for acc_i in range(0, from_acc_count): + nonces = [i for i in range(0, nonce_count)] + while len(nonces) > 0: + index = randint(0, len(nonces) - 1) + nonce = nonces.pop(index) + request = get_transfer_mp_request(from_acc=acc[acc_i], to_acc=acc[randint(0, acc_count_max-1)], + req_id=str(acc_i) + " " + str(nonce), nonce=nonce_count - nonce - 1, + gasPrice=randint(50000, 100000), gas=randint(4000, 10000)) + schedule.add_mp_tx_request(request) + self.assertEqual(mp_schedule_capacity, schedule.get_mp_tx_count()) + + +class TestMPSenderTxPool(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.turn_logger_off() + + @classmethod + def turn_logger_off(cls) -> None: + neon_logger = logging.getLogger("neon.MemPool") + neon_logger.setLevel(logging.ERROR) + + def setUp(self) -> None: + self._pool = MPSenderTxPool() + acc = [create_account() for i in range(2)] + req_data = [dict(req_id="000", nonce=3, gasPrice=30000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[1]), + dict(req_id="001", nonce=1, gasPrice=21000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[1]), + dict(req_id="002", nonce=0, gasPrice=40000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[1]), + dict(req_id="003", nonce=2, gasPrice=25000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[1]), + dict(req_id="004", nonce=4, gasPrice=25000, gas=1000, value=1, from_acc=acc[0], to_acc=acc[1])] + self._requests = [get_transfer_mp_request(**req) for req in req_data] + for request in self._requests: + self._pool.add_tx(request) + + def test_drop_last_request(self): + """Checks if transaction pool drops the request with highest nonce properly""" + self._pool.drop_last_request() + self.assertEqual(self._pool.len(), 4) + self.assertEqual(self._pool.get_tx(), self._requests[2]) + self.assertEqual(self._pool._txs[-1], self._requests[0]) + + def test_drop_last_request_if_processing(self): + """Checks if transaction pool doesn't drop the reqeust with the highest nonce if it's in process""" + tx = self._pool.acquire_tx() + self.assertIs(tx, self._requests[2]) + with self.assertLogs("neon.MemPool", logging.WARNING) as logs: + for i in range(0, 5): + self._pool.drop_last_request() + self.assertEqual(1, len(logs.records)) + self.assertEqual(f"Failed to drop last request away: {tx.log_str} - processing", logs.records[0].msg) + diff --git a/proxy/testing/test_neon_rpc_api.py b/proxy/testing/test_neon_rpc_api.py index ee4342af7..ebc9c5dc3 100644 --- a/proxy/testing/test_neon_rpc_api.py +++ b/proxy/testing/test_neon_rpc_api.py @@ -1,13 +1,13 @@ import unittest from logged_groups import logged_group -from ..neon_rpc_api_model import NeonRpcApiModel +from ..neon_rpc_api_model import NeonRpcApiWorker @logged_group("neon.TestCases") class SolanaContractTests(unittest.TestCase): def setUp(self): - self.model = NeonRpcApiModel() + self.model = NeonRpcApiWorker() self.owner = '0xc1566af4699928fdf9be097ca3dc47ece39f8f8e' self.token1 = '0x49a449cd7fd8fbcf34d103d98f2c05245020e35b' diff --git a/proxy/testing/test_neon_tx_sender.py b/proxy/testing/test_neon_tx_sender.py index 1ce9a14b9..238ae92d0 100644 --- a/proxy/testing/test_neon_tx_sender.py +++ b/proxy/testing/test_neon_tx_sender.py @@ -5,10 +5,11 @@ from unittest.mock import Mock from ..common_neon.eth_proto import Trx as EthTrx -from ..neon_rpc_api_model.transaction_sender import NeonTxSender from ..common_neon.solana_interactor import SolanaInteractor from ..memdb.memdb import MemDB -from ..neon_rpc_api_model.operator_resource_list import OperatorResourceList + +from ..mempool.operator_resource_list import OperatorResourceList +from ..mempool.transaction_sender import NeonTxSender @logged_groups.logged_group("neon.TestCases") @@ -43,10 +44,10 @@ def test_01_validate_execution_when_not_enough_sols(self): self._resource_list._min_operator_balance_to_warn.side_effect = [1_049_000_000 * 1_000_000_000 * 1_000_000_000 * 2, 1_000_000_000 * 2] self._resource_list._min_operator_balance_to_err.side_effect = [1_049_000_000 * 1_000_000_000 * 1_000_000_000, 1_000_000_000] - with self.assertLogs('neon', level='ERROR') as logs: + with self.assertLogs('neon.MemPool', level='ERROR') as logs: with self._resource_list: print('logs.output:', str(logs.output)) - self.assertRegex(str(logs.output), 'ERROR:neon.Proxy:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') + self.assertRegex(str(logs.output), 'ERROR:neon.MemPool:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') # @unittest.skip("a.i.") def test_02_validate_warning_when_little_sols(self): @@ -59,10 +60,10 @@ def test_02_validate_warning_when_little_sols(self): self._resource_list._min_operator_balance_to_warn.side_effect = [1_049_000_000 * 1_000_000_000 * 1_000_000_000, 1_000_000_000 * 2] self._resource_list._min_operator_balance_to_err.side_effect = [1_049_049_000, 1_000_000_000] - with self.assertLogs('neon', level='WARNING') as logs: + with self.assertLogs('neon.MemPool', level='WARNING') as logs: with self._resource_list: print('logs.output:', str(logs.output)) - self.assertRegex(str(logs.output), 'WARNING:neon.Proxy:Operator account [A-Za-z0-9]{40,}:[0-9]+ SOLs are running out; balance = [0-9]+; min_operator_balance_to_warn = 1049000000000000000000000000; min_operator_balance_to_err = 1049049000;') + self.assertRegex(str(logs.output), 'WARNING:neon.MemPool:Operator account [A-Za-z0-9]{40,}:[0-9]+ SOLs are running out; balance = [0-9]+; min_operator_balance_to_warn = 1049000000000000000000000000; min_operator_balance_to_err = 1049049000;') # @unittest.skip("a.i.") def test_03_validate_execution_when_not_enough_sols_for_all_operator_accounts(self): @@ -77,11 +78,11 @@ def test_03_validate_execution_when_not_enough_sols_for_all_operator_accounts(se self._resource_list._min_operator_balance_to_warn.return_value = 1_049_000_000 * 1_000_000_000 * 1_000_000_000 * 2 self._resource_list._min_operator_balance_to_err.return_value = 1_049_000_000 * 1_000_000_000 * 1_000_000_000 - with self.assertLogs('neon', level='ERROR') as logs: + with self.assertLogs('neon.MemPool', level='ERROR') as logs: with self.assertRaises(RuntimeError, msg='Operator has NO resources!'): with self._resource_list: pass print('logs.output:', str(logs.output)) - self.assertRegex(str(logs.output), 'ERROR:neon.Proxy:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') + self.assertRegex(str(logs.output), 'ERROR:neon.MemPool:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') diff --git a/proxy/testing/test_pickable_data_transfer.py b/proxy/testing/test_pickable_data_transfer.py new file mode 100644 index 000000000..e69de29bb diff --git a/requirements.txt b/requirements.txt index 56809c7b1..7fce90475 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ ethereum py-solc-x==1.1.0 flask prometheus_client==0.13.1 -git+https://github.com/neonlabsorg/python-logged-groups.git@2.1.4 +git+https://github.com/neonlabsorg/python-logged-groups.git@2.1.5