Skip to content

Commit

Permalink
#712 bring tx processing scheme onto asyncio platform
Browse files Browse the repository at this point in the history
  • Loading branch information
rozhkovdmitrii authored Apr 18, 2022
1 parent 3ffa382 commit 99de774
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 215 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ steps:
- label: ":terraform: build infrastructure"
key: "create_infrastructure"
if: &is_fts_enabled |
(build.pull_request.base_branch == "develop" && !build.pull_request.draft) ||
(build.pull_request.base_branch == "712-mempool" && !build.pull_request.draft) ||
(build.source == "trigger_job" && build.env("NEON_EVM_FULL_TEST_SUITE") == "true")
agents:
queue: "testing"
Expand Down
1 change: 0 additions & 1 deletion proxy/common_neon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .types import Result
18 changes: 18 additions & 0 deletions proxy/common_neon/data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Any


@dataclass
class NeonTxPrecheckResult:
is_underpriced_tx_without_chainid: bool
emulating_result: NeonEmulatingResult


NeonEmulatingResult = Dict[str, Any]


class NeonTxStatData:
Expand All @@ -10,3 +22,9 @@ def __init__(self, neon_tx_hash: str, neon_income: int, tx_type: str, is_cancele

def add_instruction(self, sol_tx_hash: str, sol_spent: int, steps: int, bpf: int) -> None:
self.instructions.append((sol_tx_hash, sol_spent, steps, bpf))


@dataclass
class NeonTxData:
tx_signed: str

2 changes: 1 addition & 1 deletion proxy/common_neon/emulator_interactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..environment import neon_cli, NEON_TOKEN_MINT, CHAIN_ID

from .errors import EthereumError
from .types import NeonEmulatingResult
from .data import NeonEmulatingResult


@logged_group("neon.Proxy")
Expand Down
24 changes: 0 additions & 24 deletions proxy/common_neon/types.py

This file was deleted.

3 changes: 2 additions & 1 deletion proxy/common_neon/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation
from .utils import *

from .pickable_data_server import PickableDataServer, PickableDataServerUser, PickableDataClient

84 changes: 84 additions & 0 deletions proxy/common_neon/utils/pickable_data_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from abc import ABC, abstractmethod
import asyncio
import socket
import pickle
import struct
from typing import Any
from logged_groups import logged_group


class PickableDataServerUser(ABC):

@abstractmethod
def on_data_received(self, data: Any):
"""Gets neon_tx_data from the neon rpc api service worker"""


@logged_group("neon.MemPool")
class PickableDataServer(ABC):

def __init__(self, *, user: PickableDataServerUser, host: str, port: int):
self._user = user
self._port = port
self._host = host

async def handle_client(self, client):
loop = asyncio.get_event_loop()
peer_name = client.getpeername()
self.debug(f"Got new incoming connection: {peer_name}")
while True:
try:
len_packed: bytes = await loop.sock_recv(client, 4)
if len(len_packed) == 0:
break
# TODO: all the data can be received by parts, handle it
payload_len_data = struct.unpack("!I", len_packed[:4])[0]
payload = await loop.sock_recv(client, payload_len_data)
data = pickle.loads(payload)
self._user.on_data_received(data)
except ConnectionResetError:
self.error(f"Client connection: {peer_name} - has been interrupted")
break
except Exception as err:
self.error(f"Failed to receive data over: {peer_name} - err: {err}")
continue
client.close()

async def run_server(self):
self.info(f"Listen port: {self._port} on: {self._host}")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self._host, self._port))
server.listen(8)
server.setblocking(False)

loop = asyncio.get_event_loop()
while True:
client, _ = await loop.sock_accept(server)
loop.create_task(self.handle_client(client))


@logged_group("neon.Proxy")
class PickableDataClient:

def __init__(self, host: str, port: int):

self.info(f"Initialize PickableDataClient connecting to: {port} at: {host}")
self._connection = socket.create_connection((host, port))

def send_data(self, pickable_data: Any):
try:
payload = self._encode_pickable_data(pickable_data)
self._connection.send(payload)
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def _encode_pickable_data(self, pickable_data: Any):
data = pickle.dumps(pickable_data)
data_len = len(data)
packed_len = struct.pack("!I", data_len)
payload = packed_len + data
return payload

def __del__(self):
self._connection.close()
117 changes: 0 additions & 117 deletions proxy/common_neon/utils/queue_based_service.py

This file was deleted.

5 changes: 4 additions & 1 deletion proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from .mempool_service import MemPoolService
from .mempool_client import MemPoolClient
from .mempool_service import MemPoolService
from .mem_pool import MemPool

MEMPOOL_SERVICE_PORT = MemPoolService.MEMPOOL_SERVICE_PORT
MEMPOOL_SERVICE_HOST = MemPoolService.MEMPOOL_SERVICE_HOST
47 changes: 20 additions & 27 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
import asyncio
import os

from logged_groups import logged_group
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
import time

from ..common_neon.data import NeonTxData


@logged_group("neon.MemPool")
class MemPool:

POOL_PROC_COUNT = 20
POOL_PROC_COUNT = 8

def __init__(self):
self._pool = Pool(processes=self.POOL_PROC_COUNT)

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._pool.apply_async(func=self._on_eth_send_raw_transaction_impl, args=(eth_trx_hash,),
callback=self.on_eth_send_raw_transaction_callback, error_callback=self.error_callback)

def error_callback(self, error):
self.error("Failed to invoke on worker process: ", error)

def on_eth_send_raw_transaction_callback(self, result):
pass

def _on_eth_send_raw_transaction_impl(self, eth_trx_hash):
self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}")

def do_extras(self):
pass

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['_pool']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)
self._pool = ProcessPoolExecutor(self.POOL_PROC_COUNT)
self._event_loop = asyncio.get_event_loop()

def send_raw_transaction(self, neon_tx_data: NeonTxData):
self._pool.submit(MemPool._send_raw_transaction_impl, neon_tx_data)

@staticmethod
def _send_raw_transaction_impl(neon_tx_data: NeonTxData) -> bool:
pid = os.getpid()
print(f"PID: {pid}, neon_tx_data: {neon_tx_data}")
time.sleep(0.1)
return True
22 changes: 7 additions & 15 deletions proxy/mempool/mempool_client.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
from logged_groups import logged_group
from ..common_neon.data import NeonTxData

from ..common_neon.utils import QueueBasedServiceClient
from ..common_neon import Result
from ..common_neon.utils import PickableDataClient

from . import MemPoolService

class MemPoolClient:

@logged_group("neon.Proxy")
class MemPoolClient(QueueBasedServiceClient):
def __init__(self, host: str, port: int):
self._pickable_data_client = PickableDataClient(host, port)

MEM_POOL_SERVICE_HOST = "127.0.0.1"

def __init__(self):
port, host = (MemPoolService.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST)
self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}")
QueueBasedServiceClient.__init__(self, host, port)

def on_eth_send_raw_transaction(self, eth_trx_signature) -> Result:
return self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature)
def send_raw_transaction(self, neon_tx_data: NeonTxData):
self._pickable_data_client.send_data(neon_tx_data)
Loading

0 comments on commit 99de774

Please sign in to comment.