Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

713 send tx from mempool worker #757

Merged
merged 61 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
36173d3
Get rid of queue based service
rozhkovdmitrii Apr 16, 2022
30e11cf
Move all data types into data.py
rozhkovdmitrii Apr 16, 2022
cbfc8bb
Simplify start mechanism
rozhkovdmitrii Apr 16, 2022
5444085
Update neon_rpc_api_model
rozhkovdmitrii Apr 16, 2022
d956dc7
mempool init
rozhkovdmitrii Apr 16, 2022
a2ada65
update mempool
rozhkovdmitrii Apr 16, 2022
6e810fe
Get rid of second getting price int test_integration_success_read_price
rozhkovdmitrii Apr 16, 2022
4675a2c
Bring processing onto asyncio
rozhkovdmitrii Apr 17, 2022
76b73d7
Implement asyncio event loop on the MemPool
rozhkovdmitrii Apr 17, 2022
dc11a9d
rename mempool_server.py
rozhkovdmitrii Apr 17, 2022
89d63c8
Correct some things
rozhkovdmitrii Apr 17, 2022
6ae513b
Spit and polish
rozhkovdmitrii Apr 17, 2022
4bfd337
Spit and polish
rozhkovdmitrii Apr 17, 2022
7d49886
Merge branch '712-mempool' into 712-restructure-processing
rozhkovdmitrii Apr 17, 2022
817df22
Spit and polish
rozhkovdmitrii Apr 18, 2022
39ad9c9
Move pickable_data server into utils
rozhkovdmitrii Apr 18, 2022
22b0ac7
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 21, 2022
8b9a821
Introduce a variety of approaches to transfer a pickable data, config
rozhkovdmitrii Apr 21, 2022
017bcde
Spit on MemPool and polish a little
rozhkovdmitrii Apr 21, 2022
1df5786
Move transaction sending functionality
rozhkovdmitrii Apr 21, 2022
3b725ca
.
rozhkovdmitrii Apr 21, 2022
d4b68d7
Bring emulating beck to transaction_validator
rozhkovdmitrii Apr 21, 2022
df68f14
Resolve some remarks made by @a_falaleev
rozhkovdmitrii Apr 21, 2022
406aa6b
spit and polish
rozhkovdmitrii Apr 21, 2022
88376f8
Rename neon_tx_exec_cfg
rozhkovdmitrii Apr 21, 2022
971cb11
Turn down full test suite
rozhkovdmitrii Apr 22, 2022
9dc5d44
Save changes as they are
rozhkovdmitrii Apr 23, 2022
2c0f4eb
Fix pickable data client
rozhkovdmitrii Apr 23, 2022
b4e33e5
Spit ans polish
rozhkovdmitrii Apr 25, 2022
153f3a3
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
634b358
Merge branch '712-mempool' into 713-make-get_receipt-working
rozhkovdmitrii Apr 25, 2022
2464609
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
18af023
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 26, 2022
779019f
Get rid of endless request processing
rozhkovdmitrii Apr 26, 2022
f0d9f1a
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
6dfc5d1
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
dcf699b
Just reraise exception when failed to send
rozhkovdmitrii Apr 29, 2022
e78153b
Just try to fix logs
rozhkovdmitrii Apr 29, 2022
a467a3d
Logging logs
rozhkovdmitrii Apr 29, 2022
60b713a
Provide some extra logging
rozhkovdmitrii Apr 30, 2022
925d6b2
Just add address to eth_getLogs tests filter
rozhkovdmitrii May 1, 2022
f3c99bb
Fix test_neon_tx_sender.py
rozhkovdmitrii May 1, 2022
cdcca71
enable FTS
rozhkovdmitrii May 1, 2022
5366a53
revert some changes
rozhkovdmitrii May 13, 2022
c6b6935
fix one error
rozhkovdmitrii May 13, 2022
07ccac2
fix counters store
rozhkovdmitrii May 13, 2022
a3bf585
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 16, 2022
9f37b0f
Lowercase account before getting transaction count
rozhkovdmitrii May 17, 2022
bd668e1
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
62d553f
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
0b6686b
Rollback some changes
rozhkovdmitrii May 18, 2022
3544bd2
Rollback some changes
rozhkovdmitrii May 18, 2022
260e8f7
Spit and polish
rozhkovdmitrii May 19, 2022
8234bb8
Spit and polish
rozhkovdmitrii May 19, 2022
0a13155
Spit and polish
rozhkovdmitrii May 19, 2022
dbf5ca1
Spit and polish
rozhkovdmitrii May 19, 2022
388f028
Spit and polish
rozhkovdmitrii May 19, 2022
8e7eaf4
Spit and polish
rozhkovdmitrii May 19, 2022
93051f0
Spit and polish
rozhkovdmitrii May 19, 2022
728880a
Spit and polish
rozhkovdmitrii May 19, 2022
3d71178
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ steps:
- label: ":terraform: build infrastructure"
key: "create_infrastructure"
if: &is_fts_enabled |
(build.pull_request.base_branch == "712-mempool" && !build.pull_request.draft) ||
(build.pull_request.base_branch == "develop" && !build.pull_request.draft) ||
(build.source == "trigger_job" && build.env("NEON_EVM_FULL_TEST_SUITE") == "true")
agents:
queue: "testing"
Expand Down
6 changes: 0 additions & 6 deletions proxy/common_neon/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,3 @@ def __init__(self, neon_tx_hash: str, neon_income: int, tx_type: str, is_cancele

def add_instruction(self, sol_tx_hash: str, sol_spent: int, steps: int, bpf: int) -> None:
self.instructions.append((sol_tx_hash, sol_spent, steps, bpf))


@dataclass
class NeonTxData:
tx_signed: str

37 changes: 15 additions & 22 deletions proxy/common_neon/utils/pickable_data_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class PickableDataServerUser(ABC):

@abstractmethod
def on_data_received(self, data: Any) -> Any:
async def on_data_received(self, data: Any) -> Any:
"""Gets neon_tx_data from the neon rpc api service worker"""


Expand All @@ -37,23 +37,25 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter):
while True:
try:
data = await self._recv_pickable_data(reader)
result = self._user.on_data_received(data)
result = await self._user.on_data_received(data)
result_data = encode_pickable(result)
writer.write(result_data)
await writer.drain()
except ConnectionResetError:
self.error(f"Client connection has been closed")
break
except asyncio.exceptions.IncompleteReadError as err:
self.error(f"Incomplete read error: {err}")
break
except Exception as err:
self.error(f"Failed to receive data err: {err}, type: {type(err)}")
break

async def _recv_pickable_data(self, reader: StreamReader):
len_packed: bytes = await reader.readexactly(4)
len_packed: bytes = await reader.read(4)
if len(len_packed) == 0:
raise ConnectionResetError()
payload_len_data = struct.unpack("!I", len_packed)[0]
payload = await reader.readexactly(payload_len_data)
payload = await reader.read(payload_len_data)
data = pickle.loads(payload)

return data
Expand All @@ -78,29 +80,23 @@ def __init__(self, *, user: PickableDataServerUser, srv_sock: socket.socket):
PickableDataServer.__init__(self, user=user)

async def run_server(self):
print("run_server_by_conn")
reader, writer = await asyncio.streams.open_connection(sock=self._srv_sock)
print("Got reader, writer")
await self.handle_client(reader, writer)


@logged_group("neon.Proxy")
class PickableDataClient:

CONNECTION_TIMEOUT_SEC = 5

def __init__(self):
self._client_sock = None

def _set_client_sock(self, client_sock: socket.socket):
self._client_sock = client_sock
self._client_sock.setblocking(False)
self._client_sock.settimeout(self.CONNECTION_TIMEOUT_SEC)

def send_data(self, pickable_object: Any):
try:
payload = encode_pickable(pickable_object)
self._client_sock.send(payload)
sent = self._client_sock.send(payload)
len_packed: bytes = self._client_sock.recv(4)
data_len = struct.unpack("!I", len_packed)[0]
data = self._client_sock.recv(data_len)
Expand All @@ -110,29 +106,26 @@ def send_data(self, pickable_object: Any):
return result
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")
raise

async def send_data_async(self, pickable_object):
reader, writer = await asyncio.streams.open_connection(sock=self._client_sock)
loop = asyncio.get_event_loop()
try:
payload = encode_pickable(pickable_object)
writer.write(payload)
await writer.drain()
len_packed: bytes = await reader.readexactly(4)
await loop.sock_sendall(self._client_sock, payload)

len_packed: bytes = await loop.sock_recv(self._client_sock, 4)
if not len_packed:
return None
data_len = struct.unpack("!I", len_packed)[0]
data = await reader.readexactly(data_len)
data = await loop.sock_recv(self._client_sock, data_len)
if not data:
return None
result = pickle.loads(data)
return result
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def __del__(self):
self._client_sock.close()
raise


class PipePickableDataClient(PickableDataClient):
Expand Down
7 changes: 5 additions & 2 deletions proxy/memdb/transactions_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ def _has_topics(src_topics, dst_topics):
return False

result_list = []
indexed_logs = self._db.get_logs(from_block, to_block, addresses, topics, block_hash)

with self._tx_slot.get_lock():
for data in self._tx_by_neon_sign.values():
tx = pickle.loads(data)
Expand All @@ -97,9 +99,10 @@ def _has_topics(src_topics, dst_topics):
continue
if len(topics) and (not _has_topics(topics, log['topics'])):
continue
if log in indexed_logs:
continue
result_list.append(log)

return result_list + self._db.get_logs(from_block, to_block, addresses, topics, block_hash)
return indexed_logs + result_list

def get_sol_sign_list_by_neon_sign(self, neon_sign: str, is_pended_tx: bool, before_slot: int) -> [str]:
if not is_pended_tx:
Expand Down
8 changes: 4 additions & 4 deletions proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .mempool_client import MemPoolClient
from .mempool_service import MemPoolService
from .mem_pool import MemPool
from .mempool_service import MPService
from .mempool import MemPool
from .mempool_api import *

MEMPOOL_SERVICE_PORT = MemPoolService.MEMPOOL_SERVICE_PORT
MEMPOOL_SERVICE_HOST = MemPoolService.MEMPOOL_SERVICE_HOST
MP_SERVICE_PORT = MPService.MP_SERVICE_PORT
MP_SERVICE_HOST = MPService.MP_SERVICE_HOST
91 changes: 91 additions & 0 deletions proxy/mempool/executor_mng.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import dataclasses
import socket

from collections import deque
from typing import List, Tuple, Deque, Set
from logged_groups import logged_group

from ..common_neon.config import IConfig
from ..common_neon.utils import PipePickableDataClient

from .mempool_api import MPRequest, IMPExecutor
from .mempool_executor import MPExecutor


class MpExecutorClient(PipePickableDataClient):

def __init__(self, client_sock: socket.socket):
PipePickableDataClient.__init__(self, client_sock=client_sock)

async def send_tx_request(self, mempool_tx_request: MPRequest):
return await self.send_data_async(mempool_tx_request)


@logged_group("neon.MemPool")
class MPExecutorMng(IMPExecutor):

BRING_BACK_EXECUTOR_TIMEOUT_SEC = 1800

@dataclasses.dataclass
class ExecutorInfo:
executor: MPExecutor
client: MpExecutorClient
id: int

def __init__(self, executor_count: int, config: IConfig):
self.info(f"Initialize executor mng with executor_count: {executor_count}")
self._available_executor_pool: Deque[int] = deque()
self._busy_executor_pool: Set[int] = set()
self._executors: List[MPExecutorMng.ExecutorInfo] = list()
for i in range(executor_count):
executor_info = MPExecutorMng._create_executor(i, config)
self._executors.append(executor_info)
self._available_executor_pool.appendleft(i)
executor_info.executor.start()

def submit_mp_request(self, mp_reqeust: MPRequest) -> Tuple[int, asyncio.Task]:
executor_id, executor = self._get_executor()
tx_hash = "0x" + mp_reqeust.neon_tx.hash_signed().hex()
self.debug(f"Tx: {tx_hash} - scheduled on executor: {executor_id}")
task = asyncio.get_event_loop().create_task(executor.send_data_async(mp_reqeust))
return executor_id, task

def is_available(self) -> bool:
return self._has_available()

def _has_available(self) -> bool:
return len(self._available_executor_pool) > 0

def _get_executor(self) -> Tuple[int, MpExecutorClient]:
executor_id = self._available_executor_pool.pop()
self.debug(f"Acquire executor: {executor_id}")
self._busy_executor_pool.add(executor_id)
executor_info = self._executors[executor_id]
return executor_id, executor_info.client

def on_no_liquidity(self, resource_id: int):
self.debug(f"No liquidity, executor: {resource_id} - will be unblocked in: {MPExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC} sec")
asyncio.get_event_loop().create_task(self._release_executor_later(resource_id))

async def _release_executor_later(self, executor_id: int):
await asyncio.sleep(MPExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC)
self.release_resource(executor_id)

def release_resource(self, resource_id: int):
self.debug(f"Release executor: {resource_id}")
self._busy_executor_pool.remove(resource_id)
self._available_executor_pool.appendleft(resource_id)

@staticmethod
def _create_executor(executor_id: int, config: IConfig) -> ExecutorInfo:
client_sock, srv_sock = socket.socketpair()
executor = MPExecutor(executor_id, srv_sock, config)
client = MpExecutorClient(client_sock)
return MPExecutorMng.ExecutorInfo(executor=executor, client=client, id=executor_id)

def __del__(self):
for executor_info in self._executors:
executor_info.executor.kill()
self._busy_executor_pool.clear()
self._available_executor_pool.clear()
30 changes: 0 additions & 30 deletions proxy/mempool/mem_pool.py

This file was deleted.

Loading