Skip to content

Commit

Permalink
#713 send tx from mempool worker (#754)
Browse files Browse the repository at this point in the history
  • Loading branch information
rozhkovdmitrii authored Apr 21, 2022
1 parent 99de774 commit 9ec72ec
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 121 deletions.
27 changes: 27 additions & 0 deletions proxy/common_neon/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from abc import ABC, abstractmethod
from typing import Optional
import os


class IConfig(ABC):

@abstractmethod
def get_solana_url(self) -> Optional[str]:
"""Gets the predefinded solana url"""

@abstractmethod
def get_evm_count(self) -> Optional[int]:
"""Gets the evm count"""


class Config(IConfig):

def __init__(self):
from ..environment import read_elf_params, ELF_PARAMS
read_elf_params(ELF_PARAMS)

def get_solana_url(self) -> Optional[str]:
return os.environ.get("SOLANA_URL", "http://localhost:8899")

def get_evm_count(self) -> Optional[int]:
return int(os.environ.get("EVM_STEP_COUNT", 750))
4 changes: 2 additions & 2 deletions proxy/common_neon/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@


@dataclass
class NeonTxPrecheckResult:
class NeonTxExecCfg:
is_underpriced_tx_without_chainid: bool
emulating_result: NeonEmulatingResult
steps_executed: int


NeonEmulatingResult = Dict[str, Any]
Expand Down
3 changes: 2 additions & 1 deletion proxy/common_neon/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .utils import *

from .pickable_data_server import PickableDataServer, PickableDataServerUser, PickableDataClient
from .pickable_data_server import AddrPickableDataSrv, PipePickableDataSrv, PickableDataServerUser, \
AddrPickableDataClient, PipePickableDataClient

154 changes: 111 additions & 43 deletions proxy/common_neon/utils/pickable_data_server.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,152 @@
from typing import Any, Tuple
from abc import ABC, abstractmethod

import asyncio
from asyncio import StreamReader, StreamWriter
import socket
import pickle
import struct
from typing import Any
from logged_groups import logged_group


class PickableDataServerUser(ABC):

@abstractmethod
def on_data_received(self, data: Any):
def on_data_received(self, data: Any) -> Any:
"""Gets neon_tx_data from the neon rpc api service worker"""


def encode_pickable(object) -> bytes:
data = pickle.dumps(object)
len_data = struct.pack("!I", len(data))
return len_data + data


@logged_group("neon.MemPool")
class PickableDataServer(ABC):

def __init__(self, *, user: PickableDataServerUser, host: str, port: int):
def __init__(self, *, user: PickableDataServerUser):
self._user = user
self._port = port
self._host = host
asyncio.get_event_loop().create_task(self.run_server())

@abstractmethod
async def run_server(self):
assert False

async def handle_client(self, client):
loop = asyncio.get_event_loop()
peer_name = client.getpeername()
self.debug(f"Got new incoming connection: {peer_name}")
async def handle_client(self, reader: StreamReader, writer: StreamWriter):
while True:
try:
len_packed: bytes = await loop.sock_recv(client, 4)
if len(len_packed) == 0:
break
# TODO: all the data can be received by parts, handle it
payload_len_data = struct.unpack("!I", len_packed[:4])[0]
payload = await loop.sock_recv(client, payload_len_data)
data = pickle.loads(payload)
self._user.on_data_received(data)
data = await self._recv_pickable_data(reader)
result = self._user.on_data_received(data)
result_data = encode_pickable(result)
writer.write(result_data)
await writer.drain()
except ConnectionResetError:
self.error(f"Client connection: {peer_name} - has been interrupted")
self.error(f"Client connection has been closed")
break
except Exception as err:
self.error(f"Failed to receive data over: {peer_name} - err: {err}")
continue
client.close()
self.error(f"Failed to receive data err: {err}, type: {type(err)}")
break

async def _recv_pickable_data(self, reader: StreamReader):
len_packed: bytes = await reader.readexactly(4)
if len(len_packed) == 0:
raise ConnectionResetError()
payload_len_data = struct.unpack("!I", len_packed)[0]
payload = await reader.readexactly(payload_len_data)
data = pickle.loads(payload)

return data


class AddrPickableDataSrv(PickableDataServer):

def __init__(self, *, user: PickableDataServerUser, address: Tuple[str, int]):
self._address = address
PickableDataServer.__init__(self, user=user)

async def run_server(self):
self.info(f"Listen port: {self._port} on: {self._host}")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self._host, self._port))
server.listen(8)
server.setblocking(False)
host, port = self._address
self.info(f"Listen port: {port} on: {host}")
await asyncio.start_server(self.handle_client, host, port)

loop = asyncio.get_event_loop()
while True:
client, _ = await loop.sock_accept(server)
loop.create_task(self.handle_client(client))

class PipePickableDataSrv(PickableDataServer):

def __init__(self, *, user: PickableDataServerUser, srv_sock: socket.socket):
self._srv_sock = srv_sock
PickableDataServer.__init__(self, user=user)

async def run_server(self):
print("run_server_by_conn")
reader, writer = await asyncio.streams.open_connection(sock=self._srv_sock)
print("Got reader, writer")
await self.handle_client(reader, writer)


@logged_group("neon.Proxy")
class PickableDataClient:

def __init__(self, host: str, port: int):
CONNECTION_TIMEOUT_SEC = 5

self.info(f"Initialize PickableDataClient connecting to: {port} at: {host}")
self._connection = socket.create_connection((host, port))
def __init__(self):
self._client_sock = None

def send_data(self, pickable_data: Any):
def _set_client_sock(self, client_sock: socket.socket):
self._client_sock = client_sock
self._client_sock.setblocking(False)
self._client_sock.settimeout(self.CONNECTION_TIMEOUT_SEC)

def send_data(self, pickable_object: Any):
try:
payload = self._encode_pickable_data(pickable_data)
self._connection.send(payload)
payload = encode_pickable(pickable_object)
self._client_sock.send(payload)
len_packed: bytes = self._client_sock.recv(4)
data_len = struct.unpack("!I", len_packed)[0]
data = self._client_sock.recv(data_len)
if not data:
return None
result = pickle.loads(data)
return result
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def _encode_pickable_data(self, pickable_data: Any):
data = pickle.dumps(pickable_data)
data_len = len(data)
packed_len = struct.pack("!I", data_len)
payload = packed_len + data
return payload
async def send_data_async(self, pickable_object):
reader, writer = await asyncio.streams.open_connection(sock=self._client_sock)
try:
payload = encode_pickable(pickable_object)
writer.write(payload)
await writer.drain()
len_packed: bytes = await reader.readexactly(4)
if not len_packed:
return None
data_len = struct.unpack("!I", len_packed)[0]
data = await reader.readexactly(data_len)
if not data:
return None
result = pickle.loads(data)
return result
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def __del__(self):
self._connection.close()
self._client_sock.close()


class PipePickableDataClient(PickableDataClient):

def __init__(self, client_sock: socket.socket):
PickableDataClient.__init__(self)
self._set_client_sock(client_sock=client_sock)


class AddrPickableDataClient(PickableDataClient):

def __init__(self, addr: Tuple[str, int]):
PickableDataClient.__init__(self)
host, port = addr
client_sock = socket.create_connection((host, port))
self._set_client_sock(client_sock=client_sock)

1 change: 1 addition & 0 deletions proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .mempool_client import MemPoolClient
from .mempool_service import MemPoolService
from .mem_pool import MemPool
from .mempool_api import *

MEMPOOL_SERVICE_PORT = MemPoolService.MEMPOOL_SERVICE_PORT
MEMPOOL_SERVICE_HOST = MemPoolService.MEMPOOL_SERVICE_HOST
30 changes: 16 additions & 14 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import asyncio
import os

from logged_groups import logged_group
from concurrent.futures import ProcessPoolExecutor
import time

from ..common_neon.data import NeonTxData
from ..common_neon.config import IConfig

from .mempool_api import MemPoolTxRequest
from .mempool_tx_executor import MemPoolTxExecutor


@logged_group("neon.MemPool")
class MemPool:

POOL_PROC_COUNT = 8

def __init__(self):
def __init__(self, config: IConfig):
self._pool = ProcessPoolExecutor(self.POOL_PROC_COUNT)
self._event_loop = asyncio.get_event_loop()

def send_raw_transaction(self, neon_tx_data: NeonTxData):
self._pool.submit(MemPool._send_raw_transaction_impl, neon_tx_data)
self._tx_executor = MemPoolTxExecutor(config)

def send_raw_transaction(self, mempool_tx_request: MemPoolTxRequest) -> bool:
try:
self._pool.submit(MemPool._send_raw_transaction_impl, mempool_tx_request)
except Exception as err:
print(f"Failed enqueue mempool_tx_request into the worker pool: {err}")
return False
return True

@staticmethod
def _send_raw_transaction_impl(neon_tx_data: NeonTxData) -> bool:
pid = os.getpid()
print(f"PID: {pid}, neon_tx_data: {neon_tx_data}")
time.sleep(0.1)
def _send_raw_transaction_impl(mempool_tx_request: MemPoolTxRequest) -> bool:
print(f"mempool_tx_request: {mempool_tx_request}")
return True
11 changes: 11 additions & 0 deletions proxy/mempool/mempool_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import dataclass

from ..common_neon.eth_proto import Trx as NeonTx
from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult


@dataclass
class MemPoolTxRequest:
neon_tx: NeonTx
neon_tx_exec_cfg: NeonTxExecCfg
emulating_result: NeonEmulatingResult
10 changes: 5 additions & 5 deletions proxy/mempool/mempool_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from ..common_neon.data import NeonTxData
from ..common_neon.utils import AddrPickableDataClient

from ..common_neon.utils import PickableDataClient
from .mempool_api import MemPoolTxRequest


class MemPoolClient:

def __init__(self, host: str, port: int):
self._pickable_data_client = PickableDataClient(host, port)
self._pickable_data_client = AddrPickableDataClient((host, port))

def send_raw_transaction(self, neon_tx_data: NeonTxData):
self._pickable_data_client.send_data(neon_tx_data)
def send_raw_transaction(self, mempool_tx_request: MemPoolTxRequest):
self._pickable_data_client.send_data(mempool_tx_request)
17 changes: 10 additions & 7 deletions proxy/mempool/mempool_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import asyncio
from multiprocessing import Process

from ..common_neon.utils.pickable_data_server import PickableDataServer, PickableDataServerUser
from ..common_neon.utils.pickable_data_server import AddrPickableDataSrv, PickableDataServerUser
from ..common_neon.config import IConfig

from .mem_pool import MemPool

from typing import Any
Expand All @@ -14,21 +16,22 @@ class MemPoolService(PickableDataServerUser):
MEMPOOL_SERVICE_PORT = 9091
MEMPOOL_SERVICE_HOST = "0.0.0.0"

def __init__(self):
def __init__(self, config: IConfig):
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self._mempool_server = None
self._mempool = None
self._process = Process(target=self.run)
self._config = config

def start(self):
self.info("Run until complete")
self._process.start()

def on_data_received(self, data: Any):
self._mempool.send_raw_transaction(data)
def on_data_received(self, data: Any) -> Any:
return self._mempool.send_raw_transaction(data)

def run(self):
self._mempool_server = PickableDataServer(user=self, host=self.MEMPOOL_SERVICE_HOST, port=self.MEMPOOL_SERVICE_PORT)
self._mempool = MemPool()
self.event_loop.run_until_complete(self._mempool_server.run_server())
self._mempool_server = AddrPickableDataSrv(user=self, address=(self.MEMPOOL_SERVICE_HOST, self.MEMPOOL_SERVICE_PORT))
self._mempool = MemPool(self._config)
self.event_loop.run_forever()
29 changes: 29 additions & 0 deletions proxy/mempool/mempool_tx_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from logged_groups import logged_group

from ..common_neon.solana_interactor import SolanaInteractor
from ..common_neon.config import IConfig
from ..memdb.memdb import MemDB

# TODO: NeonTxSender should be moved out from there
from .transaction_sender import NeonTxSender
from .operator_resource_list import OperatorResourceList
from .mempool_api import MemPoolTxRequest


@logged_group("neon.MemPool")
class MemPoolTxExecutor:

def __init__(self, config: IConfig):

self._solana = SolanaInteractor(config.get_solana_url())
self._db = MemDB(self._solana)
self._config = config

def execute_neon_tx(self, mempool_tx_request: MemPoolTxRequest):
neon_tx = mempool_tx_request.neon_tx
neon_tx_cfg = mempool_tx_request.neon_tx_exec_cfg
emulating_result = mempool_tx_request.emulating_result
emv_step_count = self._config.get_evm_count()
tx_sender = NeonTxSender(self._db, self._solana, neon_tx, steps=emv_step_count)
with OperatorResourceList(tx_sender):
tx_sender.execute(neon_tx_cfg, emulating_result)
File renamed without changes.
Loading

0 comments on commit 9ec72ec

Please sign in to comment.