Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw batch data syncing #1

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#Orderbook DB Credentials
BARN_DB_URL=
PROD_DB_URL=
ANALYTICS_DB_URL=


NETWORK=

NODE_URL_ETHEREUM=
NODE_URL_GNOSIS=
NODE_URL_ARBITRUM=

EPSILON_LOWER_ETHEREUM=
EPSILON_UPPER_ETHEREUM=
EPSILON_LOWER_GNOSIS=
EPSILON_UPPER_GNOSIS=
EPSILON_LOWER_ARBITRUM=
EPSILON_UPPER_ARBITRUM=
Empty file added __init__.py
Empty file.
28 changes: 8 additions & 20 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.tables import SyncTable
from src.sync.config import (
BatchDataSyncConfig,
)
from src.sync.batch_data import sync_batch_data
from src.utils import node_suffix

Expand All @@ -37,31 +34,22 @@ def __init__(self) -> None:
arguments, _ = parser.parse_known_args()
self.sync_table: SyncTable = arguments.sync_table


def main() -> None:
"""
Main function
"""
load_dotenv()
args = ScriptArgs()
orderbook = OrderbookFetcher()
network = os.environ.get("NETWORK", "mainnet")
network = node_suffix(os.environ.get("NETWORK", "mainnet"))
log.info(f"Network is set to: {network}")
web3 = Web3(
Web3.HTTPProvider(os.environ.get("NODE_URL" + "_" + node_suffix(network)))
)

# if args.sync_table == SyncTable.BATCH_DATA:
# table = os.environ["BATCH_DATA_TARGET_TABLE"]
# assert table, "BATCH DATA sync needs a BATCH_DATA_TARGET_TABLE env"
# asyncio.run(
# sync_batch_data(
# web3,
# orderbook,
# config=BatchDataSyncConfig(table),
# )
# )
# else:
# log.error(f"unsupported sync_table '{args.sync_table}'")
web3 = Web3(Web3.HTTPProvider(os.environ.get("NODE_URL" + "_" + network)))

if args.sync_table == SyncTable.BATCH_DATA:
asyncio.run(sync_batch_data(web3, orderbook, network))
else:
log.error(f"unsupported sync_table '{args.sync_table}'")


if __name__ == "__main__":
Expand Down
18 changes: 18 additions & 0 deletions src/sql/orderbook/create_batch_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- sample table name for creating the intermediate tables used in the analytics db to store batch data
create table raw_batch_data_latest_odd_month_gnosis (
environment varchar(6) not null,
auction_id bigint not null,
settlement_block bigint,
block_deadline bigint not null,
tx_hash bytea,
solver bytea not null,
execution_cost numeric(78,0),
surplus numeric(78,0),
protocol_fee numeric(78,0),
network_fee numeric(78,0),
uncapped_payment_native_token numeric(78,0) not null,
capped_payment numeric (78,0) not null,
winning_score numeric(78,0) not null,
reference_score numeric(78,0) not null,
PRIMARY KEY (block_deadline, auction_id, environment)
);
54 changes: 26 additions & 28 deletions src/sync/batch_data.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,40 @@
"""Main Entry point for batch data sync"""
import os
from dotenv import load_dotenv
from dune_client.client import DuneClient
from web3 import Web3
from src.fetch.orderbook import OrderbookFetcher, OrderbookEnv
from src.logger import set_log
from src.sync.config import BatchDataSyncConfig
from src.sync.common import compute_block_and_month_range
from src.models.block_range import BlockRange


log = set_log(__name__)


# async def sync_batch_data(
# node: Web3,
# orderbook: OrderbookFetcher,
# config: BatchDataSyncConfig,
# ) -> None:
# """Batch data Sync Logic"""
# load_dotenv()
# network = os.environ["NETWORK"]
async def sync_batch_data(
node: Web3, orderbook: OrderbookFetcher, network: str
) -> None:
"""Batch data Sync Logic"""

# block_range_list, months_list, is_even = compute_block_and_month_range(node)
# for i, _ in enumerate(block_range_list):
# start_block = block_range_list[i][0]
# end_block = block_range_list[i][1]
# if is_even[i]:
# table_name = "raw_batch_data_latest_even_month_" + str(network)
# else:
# table_name = "raw_batch_data_latest_odd_month_" + str(network)
# block_range = BlockRange(block_from=start_block, block_to=end_block)
# log.info(
# f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}"
# )
# batch_data = orderbook.get_batch_data(block_range)
# log.info("SQL query successfully executed. About to update analytics table.")
# batch_data.to_sql(table_name,orderbook._pg_engine(OrderbookEnv.ANALYTICS),if_exists= 'replace')
# log.info(
# f"batch data sync run completed successfully for month {months_list[i]}"
# )
block_range_list, months_list, is_even = compute_block_and_month_range(node)
for i, _ in enumerate(block_range_list):
start_block = block_range_list[i][0]
end_block = block_range_list[i][1]
if is_even[i]:
table_name = "raw_batch_data_latest_even_month_" + network.lower()
else:
table_name = "raw_batch_data_latest_odd_month_" + network.lower()
block_range = BlockRange(block_from=start_block, block_to=end_block)
log.info(
f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}"
)
batch_data = orderbook.get_batch_data(block_range)
log.info("SQL query successfully executed. About to update analytics table.")
batch_data.to_sql(
table_name,
orderbook._pg_engine(OrderbookEnv.ANALYTICS),
if_exists="replace",
)
log.info(
f"batch data sync run completed successfully for month {months_list[i]}"
)
182 changes: 91 additions & 91 deletions src/sync/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,106 +7,106 @@

log = set_log(__name__)

# def find_block_with_timestamp(node: Web3, time_stamp: float) -> int:
# """
# This implements binary search and returns the smallest block number
# whose timestamp is at least as large as the time_stamp argument passed in the function
# """
# end_block_number = int(node.eth.get_block("finalized")["number"])
# start_block_number = 1
# close_in_seconds = 30
def find_block_with_timestamp(node: Web3, time_stamp: float) -> int:
"""
This implements binary search and returns the smallest block number
whose timestamp is at least as large as the time_stamp argument passed in the function
"""
end_block_number = int(node.eth.get_block("finalized")["number"])
start_block_number = 1
close_in_seconds = 30

# while True:
# mid_block_number = (start_block_number + end_block_number) // 2
# block = node.eth.get_block(mid_block_number)
# block_time = block["timestamp"]
# difference_in_seconds = int((time_stamp - block_time))
while True:
mid_block_number = (start_block_number + end_block_number) // 2
block = node.eth.get_block(mid_block_number)
block_time = block["timestamp"]
difference_in_seconds = int((time_stamp - block_time))

# if abs(difference_in_seconds) < close_in_seconds:
# break
if abs(difference_in_seconds) < close_in_seconds:
break

# if difference_in_seconds < 0:
# end_block_number = mid_block_number - 1
# else:
# start_block_number = mid_block_number + 1
if difference_in_seconds < 0:
end_block_number = mid_block_number - 1
else:
start_block_number = mid_block_number + 1

# ## we now brute-force to ensure we have found the right block
# for b in range(mid_block_number - 200, mid_block_number + 200):
# block = node.eth.get_block(b)
# block_time_stamp = block["timestamp"]
# if block_time_stamp >= time_stamp:
# return int(block["number"])
# # fallback in case correct block number hasn't been found
# # in that case, we will include some more blocks than necessary
# return mid_block_number + 200
## we now brute-force to ensure we have found the right block
for b in range(mid_block_number - 200, mid_block_number + 200):
block = node.eth.get_block(b)
block_time_stamp = block["timestamp"]
if block_time_stamp >= time_stamp:
return int(block["number"])
# fallback in case correct block number hasn't been found
# in that case, we will include some more blocks than necessary
return mid_block_number + 200


# def compute_block_and_month_range( # pylint: disable=too-many-locals
# node: Web3,
# ) -> Tuple[List[Tuple[int, int]], List[str], List[bool]]:
# """
# This determines the block range and the relevant months
# for which we will compute and upload data on Dune.
# """
# # We first compute the relevant block range
# # Here, we assume that the job runs at least once every 24h
# # Because of that, if it is the first day of month, we also
# # compute the previous month's table just to be on the safe side
def compute_block_and_month_range( # pylint: disable=too-many-locals
node: Web3,
) -> Tuple[List[Tuple[int, int]], List[str], List[bool]]:
"""
This determines the block range and the relevant months
for which we will compute and upload data on Dune.
"""
# We first compute the relevant block range
# Here, we assume that the job runs at least once every 24h
# Because of that, if it is the first day of month, we also
# compute the previous month's table just to be on the safe side

# latest_finalized_block = node.eth.get_block("finalized")
latest_finalized_block = node.eth.get_block("finalized")

# current_month_end_block = int(latest_finalized_block["number"])
# current_month_end_timestamp = latest_finalized_block["timestamp"]
current_month_end_block = int(latest_finalized_block["number"])
current_month_end_timestamp = latest_finalized_block["timestamp"]

# current_month_end_datetime = datetime.fromtimestamp(
# current_month_end_timestamp, tz=timezone.utc
# )
# current_month_start_datetime = datetime(
# current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00
# )
# current_month_start_timestamp = current_month_start_datetime.replace(
# tzinfo=timezone.utc
# ).timestamp()
current_month_end_datetime = datetime.fromtimestamp(
current_month_end_timestamp, tz=timezone.utc
)
current_month_start_datetime = datetime(
current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00
)
current_month_start_timestamp = current_month_start_datetime.replace(
tzinfo=timezone.utc
).timestamp()

# current_month_start_block = find_block_with_timestamp(
# node, current_month_start_timestamp
# )
current_month_start_block = find_block_with_timestamp(
node, current_month_start_timestamp
)

# current_month = (
# f"{current_month_end_datetime.year}_{current_month_end_datetime.month}"
# )
# if current_month_end_datetime.month % 2 == 0:
# is_even = [True]
# else:
# is_even = [False]
# months_list = [current_month]
# block_range = [(current_month_start_block, current_month_end_block)]
# if current_month_end_datetime.day == 1:
# is_even.append(not is_even[0])
# if current_month_end_datetime.month == 1:
# previous_month = f"{current_month_end_datetime.year - 1}_12"
# previous_month_start_datetime = datetime(
# current_month_end_datetime.year - 1, 12, 1, 00, 00
# )
# else:
# previous_month = f"""{current_month_end_datetime.year}_
# {current_month_end_datetime.month - 1}
# """
# previous_month_start_datetime = datetime(
# current_month_end_datetime.year,
# current_month_end_datetime.month - 1,
# 1,
# 00,
# 00,
# )
# months_list.append(previous_month)
# previous_month_start_timestamp = previous_month_start_datetime.replace(
# tzinfo=timezone.utc
# ).timestamp()
# previous_month_start_block = find_block_with_timestamp(
# node, previous_month_start_timestamp
# )
# previous_month_end_block = current_month_start_block
# block_range.append((previous_month_start_block, previous_month_end_block))
current_month = (
f"{current_month_end_datetime.year}_{current_month_end_datetime.month}"
)
if current_month_end_datetime.month % 2 == 0:
is_even = [True]
else:
is_even = [False]
months_list = [current_month]
block_range = [(current_month_start_block, current_month_end_block)]
if current_month_end_datetime.day == 1:
is_even.append(not is_even[0])
if current_month_end_datetime.month == 1:
previous_month = f"{current_month_end_datetime.year - 1}_12"
previous_month_start_datetime = datetime(
current_month_end_datetime.year - 1, 12, 1, 00, 00
)
else:
previous_month = f"""{current_month_end_datetime.year}_
{current_month_end_datetime.month - 1}
"""
previous_month_start_datetime = datetime(
current_month_end_datetime.year,
current_month_end_datetime.month - 1,
1,
00,
00,
)
months_list.append(previous_month)
previous_month_start_timestamp = previous_month_start_datetime.replace(
tzinfo=timezone.utc
).timestamp()
previous_month_start_block = find_block_with_timestamp(
node, previous_month_start_timestamp
)
previous_month_end_block = current_month_start_block
block_range.append((previous_month_start_block, previous_month_end_block))

# return block_range, months_list, is_even
return block_range, months_list, is_even
15 changes: 0 additions & 15 deletions src/sync/config.py

This file was deleted.