Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#714 refactoring before extract validation #733

Merged
merged 14 commits into from
Apr 13, 2022
Merged
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ COPY . /opt
ARG PROXY_REVISION
ARG LOG_CFG=log_cfg.json
RUN (cp -f /opt/${LOG_CFG} /opt/log_cfg.json || true)
RUN sed -i 's/NEON_PROXY_REVISION_TO_BE_REPLACED/'"$PROXY_REVISION"'/g' /opt/proxy/plugin/solana_rest_api.py
RUN sed -i 's/NEON_PROXY_REVISION_TO_BE_REPLACED/'"$PROXY_REVISION"'/g' /opt/proxy/plugin/neon_rpc_api_plugin.py

COPY ./proxy/solana-py.patch /opt
RUN cd /usr/local/lib/python3.8/dist-packages/ && patch -p0 </opt/solana-py.patch
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ For run internal implementation for Ethereum tokens start proxy with:
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python3 -m proxy --hostname 127.0.0.1 --port 9090 --enable-web-server --plugins proxy.plugin.SolanaProxyPlugin --num-workers=1
python3 -m proxy --hostname 127.0.0.1 --port 9090 --enable-web-server --plugins proxy.plugin.NeonRpcApiPlugin --num-workers=1
```
(for more information see [From command line using repo source](#from-command-line-using-repo-source)).

Expand Down
10 changes: 4 additions & 6 deletions proxy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
:license: BSD, see LICENSE for more details.
"""

from .proxy import entry_point

import os
from .neon_proxy_app import NeonProxyApp
from .indexer.indexer_app import run_indexer


Expand All @@ -25,8 +26,5 @@
print("Will run in indexer mode")
run_indexer(solana_url)
else:
from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer
PrometheusProxyServer()

print("Will run in proxy mode")
entry_point()
neon_proxy_app = NeonProxyApp()
neon_proxy_app.start()
1 change: 1 addition & 0 deletions proxy/common_neon/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .types import Result
22 changes: 0 additions & 22 deletions proxy/common_neon/eth_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,3 @@ def contract(self) -> Optional[str]:
return None
contract_addr = rlp.encode((self._sender(), self.nonce))
return keccak_256(contract_addr).digest()[-20:].hex()


#class JsonEncoder(json.JSONEncoder):
# def default(self, obj):
# if isinstance(obj, bytes):
# return obj.hex()
# return json.JSONEncoder.default(self.obj)
#
#trx = '0xf8af098539f98e7a0082bfd194b80102fd2d3d1be86823dd36f9c783ad0ee7d89880b844a9059cbb000000000000000000000000c1566af4699928fdf9be097ca3dc47ece39f8f8e00000000000000000000000000000000000000000000000000000000000000328602e92be91e86a0e2c683a38606033cf416cca55575b4080465f1a275aff080b2af1a264b24d56ca02e48a4cb63d8549610d070b02e272ab6a3a680e677c7d7f51045a9cbcf218f0d'
#trx = '0xf8af098539f98e7a0082bfd194b80102fd2d3d1be86823dd36f9c783ad0ee7d89880b844a9059cbb000000000000000000000000c1566af4699928fdf9be097ca3dc47ece39f8f8e00000000000000000000000000000000000000000000000000000000000000328602e92be91e86a0e2c683a38606033cf416cca55575b4080465f1a275aff080b2af1a264b24d56ca02e48a4cb63d8549610d070b02e272ab6a3a680e677c7d7f51045a9cbcf218f0d'
#trx = '0xf87202853946be1c0082520894c1566af4699928fdf9be097ca3dc47ece39f8f8e880de0b6b3a7640000808602e92be91e85a06f350382938df92b987681de78d81f0490ee1d26b18ea968ae42ee4a800711a6a0641672e91b735bd6badd2c51b6a6ecdcd740b78c8bf581aa3f1431cd0f8c02f3'
#
#_trx = Trx.fromString(bytearray.fromhex(trx[2:]))
#print(json.dumps(_trx.__dict__, cls=JsonEncoder, indent=3))
#print(str(_trx))
#print(trx[2:])
#
#msgHash = _trx.hash()
#sig = keys.Signature(vrs=[1 if _trx.v%2==0 else 0, _trx.r, _trx.s])
#pub = sig.recover_public_key_from_msg_hash(msgHash)
#print('SENDER', pub.to_canonical_address().hex())
#print("VERIFY", sig.verify_msg_hash(msgHash, pub))
9 changes: 9 additions & 0 deletions proxy/common_neon/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Result:
def __init__(self, reason: str = None):
self._reason = reason

def __bool__(self) -> bool:
return self._reason is None

def __str__(self) -> str:
return self._reason if self._reason is not None else ""
3 changes: 3 additions & 0 deletions proxy/common_neon/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation
from .utils import *

117 changes: 117 additions & 0 deletions proxy/common_neon/utils/queue_based_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import abc
import multiprocessing as mp
import os
import queue
import signal

from multiprocessing.managers import BaseManager
from dataclasses import dataclass, astuple, field
from typing import Tuple, Dict, Any

from logged_groups import logged_group

from ..types import Result


@dataclass
class ServiceInvocation:
method_name: str = None
args: Tuple[Any] = field(default_factory=tuple)
kwargs: Dict[str, Any] = field(default_factory=dict)


@logged_group("neon")
class QueueBasedServiceClient:

def __init__(self, host: str, port: int):
class MemPoolQueueManager(BaseManager):
pass

MemPoolQueueManager.register('get_queue')
queue_manager = MemPoolQueueManager(address=(host, port), authkey=b'abracadabra')
queue_manager.connect()
self._queue = queue_manager.get_queue()

def invoke(self, method_name, *args, **kwargs) -> Result:
try:
self._invoke_impl(method_name, *args, **kwargs)
except queue.Full:
self.error(f"Failed to invoke the method: {method_name}, queue is full")
return Result("Mempool queue full")
return Result()

def _invoke_impl(self, method_name, *args, **kwargs):
invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs)
self._queue.put(invocation)


@logged_group("neon")
class QueueBasedService(abc.ABC):

QUEUE_TIMEOUT_SEC = 0.4
BREAK_PROC_INVOCATION = 0
JOIN_PROC_TIMEOUT_SEC = 5

def __init__(self, *, port: int, is_background: bool):
self._port = port
self._is_back_ground = is_background
self._timeout = self.QUEUE_TIMEOUT_SEC

class MemPoolQueueManager(BaseManager):
pass

self._queue = mp.Queue()
MemPoolQueueManager.register("get_queue", callable=lambda: self._queue)
self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra')
self._mempool_server = self._queue_manager.get_server()
self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_listen_proc")
self._queue_process = mp.Process(target=self.run, name="mempool_queue_proc")

pid = os.getpid()
signal.signal(signal.SIGINT, lambda sif, frame: self.finish() if os.getpid() == pid else 0)

def start(self):
self.info(f"Starting queue server: {self._port}")
self._mempool_server_process.start()
self._queue_process.start()
if not self._is_back_ground:
self._queue_process.join()

def run(self):
self.service_process_init()
while True:
try:
if not self._run_impl():
break
except queue.Empty:
self.do_extras()

def _run_impl(self) -> bool:
invocation = self._queue.get(block=True, timeout=self._timeout)
if invocation == self.BREAK_PROC_INVOCATION:
return False
self.dispatch(invocation)
return True

def dispatch(self, invocation: ServiceInvocation):
method_name, args, kwargs = astuple(invocation)
handler = getattr(self, method_name, None)
if handler is None:
raise NotImplementedError(f"Process has no handler for {handler}")
handler(*args, **kwargs)

def finish(self):
self.info("Finishing the queue and listening processes")
self._mempool_server_process.terminate()
if not self._queue_process.is_alive():
return
self._queue.put_nowait(self.BREAK_PROC_INVOCATION)
self._queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC)

@abc.abstractmethod
def do_extras(self):
assert "To be implemented in derived class"

@abc.abstractmethod
def service_process_init(self):
assert "To be implemented in derived class"
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from eth_utils import big_endian_to_int

from ..environment import EVM_LOADER_ID
#TODO: move it out from here
from ...environment import EVM_LOADER_ID

from ..common_neon.eth_proto import Trx as EthTx
from ..eth_proto import Trx as EthTx


def str_fmt_object(obj):
Expand Down
3 changes: 3 additions & 0 deletions proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .mempool_service import MemPoolService
from .mempool_client import MemPoolClient
from .mem_pool import MemPool
35 changes: 35 additions & 0 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from logged_groups import logged_group
from multiprocessing import Pool


@logged_group("neon.MemPool")
class MemPool:

POOL_PROC_COUNT = 20

def __init__(self):
self._pool = Pool(processes=self.POOL_PROC_COUNT)

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._pool.apply_async(func=self._on_eth_send_raw_transaction_impl, args=(eth_trx_hash,),
callback=self.on_eth_send_raw_transaction_callback, error_callback=self.error_callback)

def error_callback(self, error):
self.error("Failed to invoke on worker process: ", error)

def on_eth_send_raw_transaction_callback(self, result):
pass

def _on_eth_send_raw_transaction_impl(self, eth_trx_hash):
self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}")

def do_extras(self):
pass

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['_pool']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)
20 changes: 20 additions & 0 deletions proxy/mempool/mempool_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from logged_groups import logged_group

from ..common_neon.utils import QueueBasedServiceClient
from ..common_neon import Result

from . import MemPoolService


@logged_group("neon.Proxy")
class MemPoolClient(QueueBasedServiceClient):

MEM_POOL_SERVICE_HOST = "127.0.0.1"

def __init__(self):
port, host = (MemPoolService.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST)
self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}")
QueueBasedServiceClient.__init__(self, host, port)

def on_eth_send_raw_transaction(self, eth_trx_signature) -> Result:
return self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature)
26 changes: 26 additions & 0 deletions proxy/mempool/mempool_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from logged_groups import logged_group

from ..common_neon.utils import QueueBasedService

from .mem_pool import MemPool


@logged_group("neon.MemPool")
class MemPoolService(QueueBasedService):

MEM_POOL_SERVICE_PORT = 9091

def __init__(self, *, is_background: bool):
QueueBasedService.__init__(self, port=self.MEM_POOL_SERVICE_PORT, is_background=is_background)
self._mem_pool = None

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._mem_pool.on_eth_send_raw_transaction(eth_trx_hash=eth_trx_hash)

# QueueBasedService abstracts

def service_process_init(self):
self._mem_pool = MemPool()

def do_extras(self):
self._mem_pool.do_extras()
14 changes: 14 additions & 0 deletions proxy/neon_proxy_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .proxy import entry_point
from .mempool.mempool_service import MemPoolService
from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer


class NeonProxyApp:

def __init__(self):
self._mempool_service = MemPoolService(is_background=True)
afalaleev marked this conversation as resolved.
Show resolved Hide resolved

def start(self):
PrometheusProxyServer()
self._mempool_service.start()
afalaleev marked this conversation as resolved.
Show resolved Hide resolved
entry_point()
1 change: 1 addition & 0 deletions proxy/neon_rpc_api_model/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . neon_rpc_api_model import NeonRpcApiModel, NEON_PROXY_PKG_VERSION, NEON_PROXY_REVISION
Loading