Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#712 bring historical eth model into separate location #734

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions proxy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
:license: BSD, see LICENSE for more details.
"""

from .proxy import entry_point

import os
from .neon_proxy_app import NeonProxyApp
from .indexer.indexer_app import run_indexer


Expand All @@ -25,8 +26,9 @@
print("Will run in indexer mode")
run_indexer(solana_url)
else:
from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer
PrometheusProxyServer()
neon_proxy_app = NeonProxyApp()
neon_proxy_app.start()




print("Will run in proxy mode")
entry_point()
1 change: 1 addition & 0 deletions proxy/common_neon/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .types import Result
9 changes: 9 additions & 0 deletions proxy/common_neon/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Result:
def __init__(self, reason: str = None):
self._reason = reason

def __bool__(self) -> bool:
return self._reason is None

def __str__(self) -> str:
return self._reason if self._reason is not None else ""
3 changes: 3 additions & 0 deletions proxy/common_neon/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation
from .utils import *

117 changes: 117 additions & 0 deletions proxy/common_neon/utils/queue_based_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import abc
import multiprocessing as mp
import os
import queue
import signal

from multiprocessing.managers import BaseManager
from dataclasses import dataclass, astuple, field
from typing import Tuple, Dict, Any

from logged_groups import logged_group

from ..types import Result


@dataclass
class ServiceInvocation:
method_name: str = None
args: Tuple[Any] = field(default_factory=tuple)
kwargs: Dict[str, Any] = field(default_factory=dict)


@logged_group("neon")
class QueueBasedServiceClient:

def __init__(self, host: str, port: int):
class MemPoolQueueManager(BaseManager):
pass

MemPoolQueueManager.register('get_queue')
queue_manager = MemPoolQueueManager(address=(host, port), authkey=b'abracadabra')
queue_manager.connect()
self._queue = queue_manager.get_queue()

def invoke(self, method_name, *args, **kwargs) -> Result:
try:
self._invoke_impl(method_name, *args, **kwargs)
except queue.Full:
self.error(f"Failed to invoke the method: {method_name}, queue is full")
return Result("Mempool queue full")
return Result()

def _invoke_impl(self, method_name, *args, **kwargs):
invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs)
self._queue.put(invocation)


@logged_group("neon")
class QueueBasedService(abc.ABC):

QUEUE_TIMEOUT_SEC = 0.4
BREAK_PROC_INVOCATION = 0
JOIN_PROC_TIMEOUT_SEC = 5

def __init__(self, *, port: int, is_background: bool):
self._port = port
self._is_back_ground = is_background
self._timeout = self.QUEUE_TIMEOUT_SEC

class MemPoolQueueManager(BaseManager):
pass

self._queue = mp.Queue()
MemPoolQueueManager.register("get_queue", callable=lambda: self._queue)
self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra')
self._mempool_server = self._queue_manager.get_server()
self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_listen_proc")
self._queue_process = mp.Process(target=self.run, name="mempool_queue_proc")

pid = os.getpid()
signal.signal(signal.SIGINT, lambda sif, frame: self.finish() if os.getpid() == pid else 0)

def start(self):
self.info(f"Starting queue server: {self._port}")
self._mempool_server_process.start()
self._queue_process.start()
if not self._is_back_ground:
self._queue_process.join()

def run(self):
self.service_process_init()
while True:
try:
if not self._run_impl():
break
except queue.Empty:
self.do_extras()

def _run_impl(self) -> bool:
invocation = self._queue.get(block=True, timeout=self._timeout)
if invocation == self.BREAK_PROC_INVOCATION:
return False
self.dispatch(invocation)
return True

def dispatch(self, invocation: ServiceInvocation):
method_name, args, kwargs = astuple(invocation)
handler = getattr(self, method_name, None)
if handler is None:
raise NotImplementedError(f"Process has no handler for {handler}")
handler(*args, **kwargs)

def finish(self):
self.info("Finishing the queue and listening processes")
self._mempool_server_process.terminate()
if not self._queue_process.is_alive():
return
self._queue.put_nowait(self.BREAK_PROC_INVOCATION)
self._queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC)

@abc.abstractmethod
def do_extras(self):
assert "To be implemented in derived class"

@abc.abstractmethod
def service_process_init(self):
assert "To be implemented in derived class"
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from eth_utils import big_endian_to_int

from ..environment import EVM_LOADER_ID
#TODO: move it out from here
from ...environment import EVM_LOADER_ID

from ..common_neon.eth_proto import Trx as EthTx
from ..eth_proto import Trx as EthTx


def str_fmt_object(obj):
Expand Down
3 changes: 3 additions & 0 deletions proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .mempool_service import MemPoolService
from .mempool_client import MemPoolClient
from .mem_pool import MemPool
35 changes: 35 additions & 0 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from logged_groups import logged_group
from multiprocessing import Pool


@logged_group("neon.MemPool")
class MemPool:

POOL_PROC_COUNT = 20

def __init__(self):
self._pool = Pool(processes=self.POOL_PROC_COUNT)

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._pool.apply_async(func=self._on_eth_send_raw_transaction_impl, args=(eth_trx_hash,),
callback=self.on_eth_send_raw_transaction_callback, error_callback=self.error_callback)

def error_callback(self, error):
self.error("Failed to invoke on worker process: ", error)

def on_eth_send_raw_transaction_callback(self, result):
pass

def _on_eth_send_raw_transaction_impl(self, eth_trx_hash):
self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}")

def do_extras(self):
pass

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['_pool']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)
20 changes: 20 additions & 0 deletions proxy/mempool/mempool_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from logged_groups import logged_group

from ..common_neon.utils import QueueBasedServiceClient
from ..common_neon import Result

from . import MemPoolService


@logged_group("neon.Proxy")
class MemPoolClient(QueueBasedServiceClient):

MEM_POOL_SERVICE_HOST = "127.0.0.1"

def __init__(self):
port, host = (MemPoolService.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST)
self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}")
QueueBasedServiceClient.__init__(self, host, port)

def on_eth_send_raw_transaction(self, eth_trx_signature) -> Result:
return self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature)
26 changes: 26 additions & 0 deletions proxy/mempool/mempool_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from logged_groups import logged_group

from ..common_neon.utils import QueueBasedService

from .mem_pool import MemPool


@logged_group("neon.MemPool")
class MemPoolService(QueueBasedService):

MEM_POOL_SERVICE_PORT = 9091

def __init__(self, *, is_background: bool):
QueueBasedService.__init__(self, port=self.MEM_POOL_SERVICE_PORT, is_background=is_background)
self._mem_pool = None

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._mem_pool.on_eth_send_raw_transaction(eth_trx_hash=eth_trx_hash)

# QueueBasedService abstracts

def service_process_init(self):
self._mem_pool = MemPool()

def do_extras(self):
self._mem_pool.do_extras()
14 changes: 14 additions & 0 deletions proxy/neon_proxy_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .proxy import entry_point
from .mempool.mempool_service import MemPoolService
from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer


class NeonProxyApp:

def __init__(self):
self._mempool_service = MemPoolService(is_background=True)

def start(self):
PrometheusProxyServer()
self._mempool_service.start()
entry_point()
8 changes: 5 additions & 3 deletions proxy/plugin/solana_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import sha3

from logged_groups import logged_group, logging_context
from typing import Optional, Union
from typing import Union

from ..common.utils import build_http_response
from ..http.codes import httpStatusCodes
from ..http.parser import HttpParser
from ..http.websocket import WebsocketFrame
from ..http.server import HttpWebServerBasePlugin, httpProtocolTypes
from typing import Dict, List, Tuple, Optional
from typing import List, Tuple, Optional

from ..common_neon.transaction_sender import NeonTxSender
from ..common_neon.solana_interactor import SolanaInteractor
Expand All @@ -35,10 +35,10 @@
from ..common_neon.estimate import GasEstimate
from ..common_neon.utils import SolanaBlockInfo
from ..common_neon.keys_storage import KeyStorage
from ..mempool import MemPoolClient
from ..environment import SOLANA_URL, PP_SOLANA_URL, PYTH_MAPPING_ACCOUNT, EVM_STEP_COUNT, CHAIN_ID, ENABLE_PRIVATE_API
from ..environment import NEON_EVM_VERSION, NEON_EVM_REVISION
from ..environment import neon_cli
from ..environment import get_solana_accounts
from ..memdb.memdb import MemDB
from .gas_price_calculator import GasPriceCalculator
from ..common_neon.eth_proto import Trx as EthTrx
Expand All @@ -61,6 +61,7 @@ class EthereumModel:
def __init__(self):
self._solana = SolanaInteractor(SOLANA_URL)
self._db = MemDB(self._solana)
self._mempool_client = MemPoolClient()

if PP_SOLANA_URL == SOLANA_URL:
self.gas_price_calculator = GasPriceCalculator(self._solana, PYTH_MAPPING_ACCOUNT)
Expand Down Expand Up @@ -457,6 +458,7 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str:
tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT)
tx_sender.execute()
self._stat_tx_success()
self._mempool_client.on_eth_send_raw_transaction(eth_signature)
return eth_signature

except PendingTxError as err:
Expand Down