From 851c56d7c5fee0914daadde3c737380f3da6d620 Mon Sep 17 00:00:00 2001 From: mertwole <33563701+mertwole@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:55:48 +0400 Subject: [PATCH] feat(relayer): Introduce manual relayers (#243) --- relayer/src/cli/common.rs | 103 ++++ relayer/src/cli/mod.rs | 210 +++++++ relayer/src/ethereum_checkpoints/mod.rs | 280 ++++----- relayer/src/hex_utils.rs | 8 +- relayer/src/main.rs | 531 +++++++----------- .../ethereum/deposit_event_extractor.rs | 8 +- .../common/gear/message_sender/mod.rs | 43 +- .../eth_to_gear/all_token_transfers.rs | 10 +- .../src/message_relayer/eth_to_gear/manual.rs | 79 +++ .../src/message_relayer/eth_to_gear/mod.rs | 1 + .../eth_to_gear/paid_token_transfers.rs | 6 + .../src/message_relayer/gear_to_eth/manual.rs | 72 +++ .../src/message_relayer/gear_to_eth/mod.rs | 1 + 13 files changed, 865 insertions(+), 487 deletions(-) create mode 100644 relayer/src/cli/common.rs create mode 100644 relayer/src/cli/mod.rs create mode 100644 relayer/src/message_relayer/eth_to_gear/manual.rs create mode 100644 relayer/src/message_relayer/gear_to_eth/manual.rs diff --git a/relayer/src/cli/common.rs b/relayer/src/cli/common.rs new file mode 100644 index 00000000..13028a2b --- /dev/null +++ b/relayer/src/cli/common.rs @@ -0,0 +1,103 @@ +use clap::Args; + +#[derive(Args)] +pub struct ProofStorageArgs { + /// Gear fee payer. If not set, proofs are saved to file system + #[arg(long = "gear-fee-payer", env = "GEAR_FEE_PAYER")] + pub gear_fee_payer: Option, +} + +#[derive(Args)] +pub struct GenesisConfigArgs { + /// Authority set hash used in genesis config + #[arg(long = "authority-set-hash", env = "GENESIS_CONFIG_AUTHORITY_SET_HASH")] + pub authority_set_hash: String, + /// Authority set id used in genesis config + #[arg(long = "authority-set-id", env = "GENESIS_CONFIG_AUTHORITY_SET_ID")] + pub authority_set_id: u64, +} + +#[derive(Args)] +pub struct GearSignerArgs { + #[clap(flatten)] + pub common: GearArgs, + + /// Substrate URI that identifies a user by a mnemonic phrase or + /// provides default users from the keyring (e.g., "//Alice", "//Bob", + /// etc.). The password for URI should be specified in the same `suri`, + /// separated by the ':' char + #[arg(long = "gear-suri", env = "GEAR_SURI")] + pub suri: String, +} + +#[derive(Args)] +pub struct GearArgs { + /// Domain of the Gear RPC endpoint + #[arg( + long = "gear-domain", + default_value = "ws://127.0.0.1", + env = "GEAR_DOMAIN" + )] + pub domain: String, + + /// Port of the Gear RPC endpoint + #[arg(long = "gear-port", default_value = "9944", env = "GEAR_PORT")] + pub port: u16, + + /// Retry count of the Gear RPC client + #[arg( + long = "gear-rpc-retries", + default_value = "3", + env = "GEAR_RPC_RETRIES" + )] + pub retries: u8, +} + +#[derive(Args)] +pub struct EthereumSignerArgs { + #[clap(flatten)] + pub ethereum_args: EthereumArgs, + + /// Private key for fee payer + #[arg(long = "eth-fee-payer", env = "ETH_FEE_PAYER")] + pub fee_payer: String, +} + +#[derive(Args)] +pub struct EthereumArgs { + /// Address of the ethereum endpoint + #[arg(long = "ethereum-endpoint", env = "ETH_RPC")] + pub eth_endpoint: String, + /// Ethereum address of relayer contract + #[arg(long = "relayer-address", env = "ETH_RELAYER_ADDRESS")] + pub relayer_address: String, + /// Ethereum address of message queue contract + #[arg(long = "mq-address", env = "ETH_MESSAGE_QUEUE_ADDRESS")] + pub mq_address: String, +} + +#[derive(Args)] +pub struct BeaconRpcArgs { + /// Address of the ethereum beacon RPC endpoint + #[arg(long = "ethereum-beacon-rpc", env = "ETH_BEACON_RPC")] + pub beacon_endpoint: String, + + /// Timeout in seconds for requests to the ethereum beacon RPC + #[arg( + long = "ethereum-beacon-rpc-timeout", + env = "ETH_BEACON_RPC_TIMEOUT", + default_value = "10" + )] + pub beacon_timeout: Option, +} + +#[derive(Args)] +pub struct PrometheusArgs { + /// Address of the prometheus endpoint + #[arg( + long = "prometheus-endpoint", + default_value = "0.0.0.0:9090", + env = "PROMETHEUS_ENDPOINT" + )] + pub endpoint: String, +} diff --git a/relayer/src/cli/mod.rs b/relayer/src/cli/mod.rs new file mode 100644 index 00000000..c8050189 --- /dev/null +++ b/relayer/src/cli/mod.rs @@ -0,0 +1,210 @@ +use clap::{Args, Parser, Subcommand}; + +mod common; + +pub use common::{ + BeaconRpcArgs, EthereumArgs, EthereumSignerArgs, GearArgs, GearSignerArgs, GenesisConfigArgs, + PrometheusArgs, ProofStorageArgs, +}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +#[command(propagate_version = true)] +pub struct Cli { + #[command(subcommand)] + pub command: CliCommands, +} + +#[allow(clippy::enum_variant_names)] +#[derive(Subcommand)] +pub enum CliCommands { + /// Start core protocol gear to ethereum relayer + GearEthCore(GearEthCoreArgs), + /// Start core protocol ethereum to gear relayer + EthGearCore(EthGearCoreArgs), + + /// Relay tokens from gear to ethereum + GearEthTokens(GearEthTokensArgs), + /// Relay tokens from ethereum to gear + EthGearTokens(EthGearTokensArgs), + + /// Manually relay message from gear to ethereum + GearEthManual(GearEthManualArgs), + /// Manually relay message from ethereum to gear + EthGearManual(EthGearManualArgs), + + /// Start kill switch relayer + KillSwitch(KillSwitchArgs), +} + +#[derive(Args)] +pub struct GearEthCoreArgs { + #[clap(flatten)] + pub gear_args: GearArgs, + #[clap(flatten)] + pub ethereum_args: EthereumSignerArgs, + #[clap(flatten)] + pub genesis_config_args: GenesisConfigArgs, + #[clap(flatten)] + pub prometheus_args: PrometheusArgs, + #[clap(flatten)] + pub proof_storage_args: ProofStorageArgs, +} + +#[derive(Args)] +pub struct EthGearCoreArgs { + /// ProgramId of the checkpoint-light-client program + #[arg(long, env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS")] + pub program_id: String, + #[clap(flatten)] + pub beacon_args: BeaconRpcArgs, + #[clap(flatten)] + pub gear_args: GearSignerArgs, + #[clap(flatten)] + pub prometheus_args: PrometheusArgs, +} + +#[derive(Args)] +pub struct GearEthTokensArgs { + #[clap(subcommand)] + pub command: GearEthTokensCommands, + + /// Block number to start relaying from. If not specified equals to the latest finalized block + #[arg(long = "from-block")] + pub from_block: Option, + + #[clap(flatten)] + pub gear_args: GearArgs, + #[clap(flatten)] + pub ethereum_args: EthereumSignerArgs, + #[clap(flatten)] + pub prometheus_args: PrometheusArgs, +} + +#[derive(Subcommand)] +pub enum GearEthTokensCommands { + /// Relay all the messages + AllTokenTransfers, + /// Relay only messages sent through bridging-payment + PaidTokenTransfers { + /// Address of the bridging-payment program + #[arg(long = "bridging-payment-address", env = "BRIDGING_PAYMENT_ADDRESS")] + bridging_payment_address: String, + }, +} + +#[derive(Args)] +pub struct EthGearTokensArgs { + #[command(subcommand)] + pub command: EthGearTokensCommands, + + /// Address of the checkpoint-light-client program on gear + #[arg( + long = "checkpoint-light-client-address", + env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS" + )] + pub checkpoint_light_client_address: String, + + #[arg(long = "historical-proxy-address", env = "HISTORICAL_PROXY_ADDRESS")] + pub historical_proxy_address: String, + + #[arg(long = "vft-manager-address", env = "VFT_MANAGER_ADDRESS")] + pub vft_manager_address: String, + + #[clap(flatten)] + pub gear_args: GearSignerArgs, + #[clap(flatten)] + pub ethereum_args: EthereumArgs, + #[clap(flatten)] + pub beacon_rpc: BeaconRpcArgs, + #[clap(flatten)] + pub prometheus_args: PrometheusArgs, +} + +#[derive(Subcommand)] +pub enum EthGearTokensCommands { + /// Relay all the transactions + AllTokenTransfers { + /// Address of the ERC20Manager contract on ethereum + #[arg(long = "erc20-manager-address", env = "ERC20_MANAGER_ADDRESS")] + erc20_manager_address: String, + }, + /// Relay only transactions sent to BridgingPayment + PaidTokenTransfers { + /// Address of the BridgingPayment contract on ethereum + #[arg(long = "bridging-payment-address", env = "BRIDGING_PAYMENT_ADDRESS")] + bridging_payment_address: String, + }, +} + +#[derive(Args)] +pub struct GearEthManualArgs { + /// Nonce of the target message + #[arg(long = "message-nonce", short = 'n')] + pub nonce: String, + + /// Block where target message was sent + #[arg(long = "message-block", short = 'b')] + pub block: u32, + + /// Ethereum block number to start listening for merkle roots from. If not specified equals to the latest finalized block + #[arg(long = "from-eth-block")] + pub from_eth_block: Option, + + #[clap(flatten)] + pub gear_args: GearArgs, + #[clap(flatten)] + pub ethereum_args: EthereumSignerArgs, +} + +#[derive(Args)] +pub struct EthGearManualArgs { + /// Transaction hash of the target message + #[arg(long = "tx-hash", short = 't')] + pub tx_hash: String, + + /// Ethereum slot containing target message + #[arg(long = "slot", short = 's')] + pub slot: u64, + + /// ProgramId of the checkpoint-light-client program + #[arg(long = "checkpoint-light-client")] + pub checkpoint_light_client: String, + + /// ProgramId of the historical-proxy program + #[arg(long = "historical-proxy")] + pub historical_proxy: String, + + /// ProgramId of the program that will receive target message + #[arg(long = "receiver-program")] + pub receiver_program: String, + + /// Route of the function that will be called on receiver-program + #[arg(long = "receiver-route")] + pub receiver_route: String, + + #[clap(flatten)] + pub gear_args: GearSignerArgs, + #[clap(flatten)] + pub ethereum_args: EthereumArgs, + #[clap(flatten)] + pub beacon_args: BeaconRpcArgs, +} + +#[derive(Args)] +pub struct KillSwitchArgs { + /// Eth block number to start kill switch relayer read events from. If not specified equals to the latest finalized block + #[arg(long = "from-eth-block")] + pub from_eth_block: Option, + + #[clap(flatten)] + pub gear_args: GearArgs, + #[clap(flatten)] + pub ethereum_args: EthereumSignerArgs, + #[clap(flatten)] + pub genesis_config_args: GenesisConfigArgs, + #[clap(flatten)] + pub prometheus_args: PrometheusArgs, + #[clap(flatten)] + pub proof_storage_args: ProofStorageArgs, +} diff --git a/relayer/src/ethereum_checkpoints/mod.rs b/relayer/src/ethereum_checkpoints/mod.rs index d73e94bc..efaac354 100644 --- a/relayer/src/ethereum_checkpoints/mod.rs +++ b/relayer/src/ethereum_checkpoints/mod.rs @@ -1,24 +1,26 @@ -use super::*; use anyhow::{anyhow, Result as AnyResult}; -use checkpoint_light_client_io::{ - ethereum_common::{utils as eth_utils, MAX_REQUEST_LIGHT_CLIENT_UPDATES, SLOTS_PER_EPOCH}, - meta::ReplayBack, - tree_hash::Hash256, - Handle, HandleResult, Slot, SyncCommitteeUpdate, G2, -}; -use ethereum_beacon_client::{slots_batch::Iter as SlotsBatchIter, BeaconClient}; use futures::{ future::{self, Either}, pin_mut, }; -use gclient::{EventProcessor, GearApi, WSAddress}; +use gclient::{EventProcessor, GearApi}; use parity_scale_codec::Decode; +use primitive_types::H256; use tokio::{ signal::unix::{self, SignalKind}, sync::mpsc::{self, Sender}, time::{self, Duration}, }; +use checkpoint_light_client_io::{ + ethereum_common::{utils as eth_utils, MAX_REQUEST_LIGHT_CLIENT_UPDATES, SLOTS_PER_EPOCH}, + meta::ReplayBack, + tree_hash::Hash256, + Handle, HandleResult, Slot, SyncCommitteeUpdate, G2, +}; +use ethereum_beacon_client::{slots_batch::Iter as SlotsBatchIter, BeaconClient}; +use utils_prometheus::MeteredService; + #[cfg(test)] mod tests; @@ -33,163 +35,165 @@ const DELAY_SECS_UPDATE_REQUEST: u64 = 30; // The constant is intentionally duplicated since vara-runtime is too heavy dependency. const UNITS: u128 = 1_000_000_000_000; -pub async fn relay(args: RelayCheckpointsArgs) { - log::info!("Started"); - - let RelayCheckpointsArgs { - program_id, - beacon_endpoint, - beacon_timeout, - vara_args: - VaraArgs { - vara_domain, - vara_port, - vara_rpc_retries, - }, - vara_suri, - prometheus_args: PrometheusArgs { - endpoint: endpoint_prometheus, - }, - } = args; - - let program_id = - crate::hex_utils::decode_byte_array(&program_id).expect("Failed to parse ProgramId"); - - let timeout = Some(Duration::from_secs(beacon_timeout)); - let beacon_client = BeaconClient::new(beacon_endpoint.clone(), timeout) - .await - .expect("Failed to connect to beacon node"); - - let mut signal_interrupt = unix::signal(SignalKind::interrupt()).expect("Set SIGINT handler"); - - let (sender, mut receiver) = mpsc::channel(SIZE_CHANNEL); +pub struct Relayer { + program_id: H256, - sync_update::spawn_receiver(beacon_client.clone(), sender); + beacon_client: BeaconClient, + gear_api: GearApi, - let client = GearApi::builder() - .retries(vara_rpc_retries) - .suri(vara_suri) - .build(WSAddress::new(vara_domain, vara_port)) - .await - .expect("GearApi client should be created"); + metrics: metrics::Updates, +} - let gas_limit_block = client - .block_gas_limit() - .expect("Block gas limit should be determined"); +impl MeteredService for Relayer { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics.get_sources() + } +} - // use 95% of block gas limit for all extrinsics - let gas_limit = gas_limit_block / 100 * 95; - log::info!("Gas limit for extrinsics: {gas_limit}"); +impl Relayer { + pub fn new(program_id: H256, beacon_client: BeaconClient, gear_api: GearApi) -> Self { + Self { + program_id, + beacon_client, + gear_api, + metrics: metrics::Updates::new(), + } + } - let sync_update = receiver - .recv() - .await - .expect("Updates receiver should be open before the loop"); + pub async fn run(self) { + log::info!("Started"); - let mut slot_last = sync_update.finalized_header.slot; + let mut signal_interrupt = + unix::signal(SignalKind::interrupt()).expect("Set SIGINT handler"); - match sync_update::try_to_apply(&client, program_id, sync_update.clone(), gas_limit).await { - Err(e) => { - log::error!("{e:?}"); - return; - } - Ok(Err(sync_update::Error::ReplayBackRequired { - replay_back, - checkpoint, - })) => { - if let Err(e) = replay_back::execute( - &beacon_client, - &client, - program_id, - gas_limit, - replay_back, - checkpoint, - sync_update, - ) - .await - { - log::error!("{e:?}. Exiting"); - return; - } - } - Ok(Ok(_) | Err(sync_update::Error::NotActual)) => (), - _ => { - slot_last = 0; - } - } + let (sender, mut receiver) = mpsc::channel(SIZE_CHANNEL); - let update_metrics = metrics::Updates::new(); - MetricsBuilder::new() - .register_service(&update_metrics) - .build() - .run(endpoint_prometheus) - .await; + sync_update::spawn_receiver(self.beacon_client.clone(), sender); - log::info!("Metrics service spawned"); + let gas_limit_block = self + .gear_api + .block_gas_limit() + .expect("Block gas limit should be determined"); - update_total_balance(&client, &update_metrics).await; + // use 95% of block gas limit for all extrinsics + let gas_limit = gas_limit_block / 100 * 95; + log::info!("Gas limit for extrinsics: {gas_limit}"); - loop { - let future_interrupt = signal_interrupt.recv(); - pin_mut!(future_interrupt); + let sync_update = receiver + .recv() + .await + .expect("Updates receiver should be open before the loop"); - let future_update = receiver.recv(); - pin_mut!(future_update); + let mut slot_last = sync_update.finalized_header.slot; - let sync_update = match future::select(future_interrupt, future_update).await { - Either::Left((_interrupted, _)) => { - log::info!("Caught SIGINT. Exiting"); + match sync_update::try_to_apply( + &self.gear_api, + self.program_id.0, + sync_update.clone(), + gas_limit, + ) + .await + { + Err(e) => { + log::error!("{e:?}"); return; } - - Either::Right((Some(sync_update), _)) => sync_update, - Either::Right((None, _)) => { - log::info!("Updates receiver has been closed. Exiting"); - return; + Ok(Err(sync_update::Error::ReplayBackRequired { + replay_back, + checkpoint, + })) => { + if let Err(e) = replay_back::execute( + &self.beacon_client, + &self.gear_api, + self.program_id.0, + gas_limit, + replay_back, + checkpoint, + sync_update, + ) + .await + { + log::error!("{e:?}. Exiting"); + return; + } + } + Ok(Ok(_) | Err(sync_update::Error::NotActual)) => (), + _ => { + slot_last = 0; } - }; - let slot = sync_update.finalized_header.slot; + } - update_metrics - .fetched_sync_update_slot - .set(i64::from_le_bytes(slot.to_le_bytes())); + update_total_balance(&self.gear_api, &self.metrics).await; - let committee_update = sync_update.sync_committee_next_pub_keys.is_some(); - if !committee_update { - update_metrics.total_fetched_finality_updates.inc(); - } + loop { + let future_interrupt = signal_interrupt.recv(); + pin_mut!(future_interrupt); - if slot == slot_last { - continue; - } + let future_update = receiver.recv(); + pin_mut!(future_update); - match sync_update::try_to_apply(&client, program_id, sync_update, gas_limit).await { - Ok(Ok(_)) => { - slot_last = slot; + let sync_update = match future::select(future_interrupt, future_update).await { + Either::Left((_interrupted, _)) => { + log::info!("Caught SIGINT. Exiting"); + return; + } - if committee_update { - update_metrics.processed_committee_updates.inc(); - } else { - update_metrics.processed_finality_updates.inc(); + Either::Right((Some(sync_update), _)) => sync_update, + Either::Right((None, _)) => { + log::info!("Updates receiver has been closed. Exiting"); + return; } + }; + let slot = sync_update.finalized_header.slot; + + self.metrics + .fetched_sync_update_slot + .set(i64::from_le_bytes(slot.to_le_bytes())); + + let committee_update = sync_update.sync_committee_next_pub_keys.is_some(); + if !committee_update { + self.metrics.total_fetched_finality_updates.inc(); } - Ok(Err(sync_update::Error::ReplayBackRequired { .. })) => { - log::error!("Replay back within the main loop. Exiting"); - return; + + if slot == slot_last { + continue; } - Ok(Err(e)) => { - log::error!("The program failed with: {e:?}. Skipping"); - if let sync_update::Error::NotActual = e { + + match sync_update::try_to_apply( + &self.gear_api, + self.program_id.0, + sync_update, + gas_limit, + ) + .await + { + Ok(Ok(_)) => { slot_last = slot; + + if committee_update { + self.metrics.processed_committee_updates.inc(); + } else { + self.metrics.processed_finality_updates.inc(); + } + } + Ok(Err(sync_update::Error::ReplayBackRequired { .. })) => { + log::error!("Replay back within the main loop. Exiting"); + return; + } + Ok(Err(e)) => { + log::error!("The program failed with: {e:?}. Skipping"); + if let sync_update::Error::NotActual = e { + slot_last = slot; + } + } + Err(e) => { + log::error!("{e:?}"); + return; } } - Err(e) => { - log::error!("{e:?}"); - return; - } - } - update_total_balance(&client, &update_metrics).await; + update_total_balance(&self.gear_api, &self.metrics).await; + } } } diff --git a/relayer/src/hex_utils.rs b/relayer/src/hex_utils.rs index 8c6ffd0b..4e83ab37 100644 --- a/relayer/src/hex_utils.rs +++ b/relayer/src/hex_utils.rs @@ -11,8 +11,12 @@ pub fn decode_h160(hex: &str) -> anyhow::Result { } pub fn decode_byte_array(hex: &str) -> anyhow::Result<[u8; LEN]> { - let address = if &hex[..2] == "0x" { &hex[2..] } else { hex }; - hex::decode(address)? + decode_byte_vec(hex)? .try_into() .map_err(|_| anyhow::anyhow!("Invalid length")) } + +pub fn decode_byte_vec(hex: &str) -> anyhow::Result> { + let address = if &hex[..2] == "0x" { &hex[2..] } else { hex }; + Ok(hex::decode(address)?) +} diff --git a/relayer/src/main.rs b/relayer/src/main.rs index f13f9dfc..ade59cf2 100644 --- a/relayer/src/main.rs +++ b/relayer/src/main.rs @@ -1,8 +1,8 @@ use std::time::Duration; -use clap::{Args, Parser, Subcommand}; -use ethereum_beacon_client::BeaconClient; +use clap::Parser; +use ethereum_beacon_client::BeaconClient; use ethereum_client::EthApi; use gear_rpc_client::GearApi; use kill_switch::KillSwitchRelayer; @@ -12,6 +12,7 @@ use prover::proving::GenesisConfig; use relay_merkle_roots::MerkleRootRelayer; use utils_prometheus::MetricsBuilder; +mod cli; mod common; mod ethereum_checkpoints; mod hex_utils; @@ -21,247 +22,11 @@ mod proof_storage; mod prover_interface; mod relay_merkle_roots; -const DEFAULT_ETH_BEACON_RPC: &str = "http://localhost:50000"; -const DEFAULT_ETH_RPC: &str = "http://localhost:8545"; -const DEFAULT_PROMETHEUS_ENDPOINT: &str = "0.0.0.0:9090"; -const DEFAULT_VARA_SURI: &str = "//Alice"; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -#[command(propagate_version = true)] -struct Cli { - #[command(subcommand)] - command: CliCommands, -} - -#[allow(clippy::enum_variant_names)] -#[derive(Subcommand)] -enum CliCommands { - /// Start service constantly relaying messages to ethereum - #[clap(visible_alias("rr"))] - RelayMerkleRoots(RelayMerkleRootsArgs), - /// Relay message to ethereum - #[clap(visible_alias("rm"))] - RelayMessages(RelayMessagesArgs), - /// Start service constantly relaying Ethereum checkpoints to the Vara program - RelayCheckpoints(RelayCheckpointsArgs), - /// Relay the ERC20 tokens to the Vara network - RelayErc20(RelayErc20Args), - /// Kill switch relayer - #[clap(visible_alias("ks"))] - KillSwitch(KillSwitchArgs), -} - -#[derive(Args)] -struct RelayMessagesArgs { - #[clap(flatten)] - vara_args: VaraArgs, - #[clap(flatten)] - ethereum_args: EthereumArgs, - #[clap(flatten)] - prometheus_args: PrometheusArgs, - /// Block number to start relaying from. If not specified equals to the latest finalized block - #[arg(long = "from-block")] - from_block: Option, - /// Address of bridging payment program (if not specified, relayer will relay all messages) - #[arg(long = "bridging-payment-address", env = "BRIDGING_PAYMENT_ADDRESS")] - bridging_payment_address: Option, -} - -#[derive(Args)] -struct RelayMerkleRootsArgs { - #[clap(flatten)] - vara_args: VaraArgs, - #[clap(flatten)] - ethereum_args: EthereumArgs, - #[clap(flatten)] - genesis_config_args: GenesisConfigArgs, - #[clap(flatten)] - prometheus_args: PrometheusArgs, - #[clap(flatten)] - proof_storage_args: ProofStorageArgs, -} - -#[derive(Args)] -struct KillSwitchArgs { - #[clap(flatten)] - vara_args: VaraArgs, - #[clap(flatten)] - ethereum_args: EthereumArgs, - #[clap(flatten)] - genesis_config_args: GenesisConfigArgs, - /// Eth block number to start kill switch relayer read events from. If not specified equals to the latest finalized block - #[arg(long = "from-eth-block")] - from_eth_block: Option, - #[clap(flatten)] - prometheus_args: PrometheusArgs, - #[clap(flatten)] - proof_storage_args: ProofStorageArgs, -} - -#[derive(Args)] -struct VaraArgs { - /// Domain of the VARA RPC endpoint - #[arg(long, default_value = "ws://127.0.0.1", env = "VARA_DOMAIN")] - vara_domain: String, - - /// Port of the VARA RPC endpoint - #[arg(long, default_value = "9944", env = "VARA_PORT")] - vara_port: u16, - - /// Set retries of the VARA RPC client - #[arg(long, default_value = "3", env = "VARA_RPC_RETRIES")] - vara_rpc_retries: u8, -} - -#[derive(Args)] -struct EthereumArgs { - /// Address of the ethereum endpoint - #[arg( - long = "ethereum-endpoint", - default_value = DEFAULT_ETH_RPC, - env = "ETH_RPC" - )] - eth_endpoint: String, - /// Private key for fee payer - #[arg(long = "eth-fee-payer", env = "ETH_FEE_PAYER")] - fee_payer: Option, - /// Ethereum address of relayer contract - #[arg(long = "relayer-address", env = "ETH_RELAYER_ADDRESS")] - relayer_address: String, - /// Ethereum address of message queue contract - #[arg(long = "mq-address", env = "ETH_MESSAGE_QUEUE_ADDRESS")] - mq_address: String, -} - -#[derive(Args)] -struct BeaconRpcArgs { - /// Address of the ethereum beacon RPC endpoint - #[arg( - long = "ethereum-beacon-rpc", - default_value = DEFAULT_ETH_BEACON_RPC, - env = "ETH_BEACON_RPC" - )] - beacon_endpoint: String, - - /// Timeout in seconds for requests to the ethereum beacon RPC - #[arg(long = "ethereum-beacon-rpc-timeout", env = "ETH_BEACON_RPC_TIMEOUT")] - beacon_timeout: Option, -} - -#[derive(Args)] -struct PrometheusArgs { - /// Address of the prometheus endpoint - #[arg( - long = "prometheus-endpoint", - default_value = DEFAULT_PROMETHEUS_ENDPOINT, - env = "PROMETHEUS_ENDPOINT" - )] - endpoint: String, -} - -#[derive(Args)] -struct ProofStorageArgs { - /// Gear fee payer. If not set, proofs are saved to file system - #[arg(long = "gear-fee-payer", env = "GEAR_FEE_PAYER")] - gear_fee_payer: Option, -} - -#[derive(Args)] -struct GenesisConfigArgs { - /// Authority set hash used in genesis config - #[arg(long = "authority-set-hash", env = "GENESIS_CONFIG_AUTHORITY_SET_HASH")] - authority_set_hash: String, - /// Authority set id used in genesis config - #[arg(long = "authority-set-id", env = "GENESIS_CONFIG_AUTHORITY_SET_ID")] - authority_set_id: u64, -} - -#[derive(Args)] -struct RelayCheckpointsArgs { - /// Specify ProgramId of the Checkpoint-light-client program - #[arg(long, env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS")] - program_id: String, - - /// Specify the endpoint providing Beacon API - #[arg(long, env = "BEACON_ENDPOINT")] - beacon_endpoint: String, - - /// Specify the timeout in seconds for requests to the Beacon API endpoint - #[arg(long, default_value = "120", env = "BEACON_TIMEOUT")] - beacon_timeout: u64, - - #[clap(flatten)] - vara_args: VaraArgs, - - /// Substrate URI that identifies a user by a mnemonic phrase or - /// provides default users from the keyring (e.g., "//Alice", "//Bob", - /// etc.). The password for URI should be specified in the same `suri`, - /// separated by the ':' char - #[arg(long, default_value = DEFAULT_VARA_SURI, env = "VARA_SURI")] - vara_suri: String, - - #[clap(flatten)] - prometheus_args: PrometheusArgs, -} - -#[derive(Args)] -struct RelayErc20Args { - #[clap(flatten)] - common: RelayErc20ArgsCommon, - - #[command(subcommand)] - command: RelayErc20Commands, -} - -#[derive(Subcommand)] -enum RelayErc20Commands { - /// Relay all the transactions - AllTokenTransfers { - /// Address of the ERC20Treasury contract on ethereum - #[arg(long = "erc20-treasury-address", env = "ERC20_TREASURY_ADDRESS")] - erc20_treasury_address: String, - }, - /// Relay only transactions sent to BridgingPayment - PaidTokenTransfers { - /// Address of the BridgingPayment contract on ethereum - #[arg(long = "bridging-payment-address", env = "BRIDGING_PAYMENT_ADDRESS")] - bridging_payment_address: String, - }, -} - -#[derive(Args)] -struct RelayErc20ArgsCommon { - /// Address of the checkpoint-light-client program on gear - #[arg( - long = "checkpoint-light-client-address", - env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS" - )] - checkpoint_light_client_address: String, - - #[arg(long = "historical-proxy-address", env = "HISTORICAL_PROXY_ADDRESS")] - historical_proxy_address: String, - #[arg(long = "vft-manager-address", env = "VFT_MANAGER_ADDRESS")] - vft_manager_address: String, - #[clap(flatten)] - vara_args: VaraArgs, - - /// Substrate URI that identifies a user by a mnemonic phrase or - /// provides default users from the keyring (e.g., "//Alice", "//Bob", - /// etc.). The password for URI should be specified in the same `suri`, - /// separated by the ':' char - #[arg(long, default_value = DEFAULT_VARA_SURI, env = "VARA_SURI")] - vara_suri: String, - - #[clap(flatten)] - ethereum_args: EthereumArgs, - - #[clap(flatten)] - beacon_rpc: BeaconRpcArgs, - - #[clap(flatten)] - prometheus_args: PrometheusArgs, -} +use cli::{ + BeaconRpcArgs, Cli, CliCommands, EthGearManualArgs, EthGearTokensArgs, EthGearTokensCommands, + EthereumArgs, EthereumSignerArgs, GearArgs, GearEthTokensCommands, GearSignerArgs, + GenesisConfigArgs, ProofStorageArgs, +}; #[tokio::main] async fn main() { @@ -281,14 +46,14 @@ async fn main() { let cli = Cli::parse(); match cli.command { - CliCommands::RelayMerkleRoots(args) => { - let gear_api = create_gear_client(&args.vara_args).await; - let eth_api = create_eth_client(&args.ethereum_args); + CliCommands::GearEthCore(args) => { + let gear_api = create_gear_client(&args.gear_args).await; + let eth_api = create_eth_signer_client(&args.ethereum_args); let metrics = MetricsBuilder::new(); let (proof_storage, metrics) = - create_proof_storage(&args.proof_storage_args, &args.vara_args, metrics).await; + create_proof_storage(&args.proof_storage_args, &args.gear_args, metrics).await; let genesis_config = create_genesis_config(&args.genesis_config_args); @@ -304,13 +69,13 @@ async fn main() { relayer.run().await.expect("Merkle root relayer failed"); } CliCommands::KillSwitch(args) => { - let gear_api = create_gear_client(&args.vara_args).await; - let eth_api = create_eth_client(&args.ethereum_args); + let gear_api = create_gear_client(&args.gear_args).await; + let eth_api = create_eth_signer_client(&args.ethereum_args); let metrics = MetricsBuilder::new(); let (proof_storage, metrics) = - create_proof_storage(&args.proof_storage_args, &args.vara_args, metrics).await; + create_proof_storage(&args.proof_storage_args, &args.gear_args, metrics).await; let genesis_config = create_genesis_config(&args.genesis_config_args); @@ -335,92 +100,124 @@ async fn main() { kill_switch.run().await.expect("Kill switch relayer failed"); } - CliCommands::RelayMessages(args) => { - let gear_api = create_gear_client(&args.vara_args).await; - let eth_api = create_eth_client(&args.ethereum_args); + CliCommands::GearEthTokens(args) => { + let gear_api = create_gear_client(&args.gear_args).await; + let eth_api = create_eth_signer_client(&args.ethereum_args); let gsdk_args = message_relayer::common::GSdkArgs { - vara_domain: args.vara_args.vara_domain, - vara_port: args.vara_args.vara_port, - vara_rpc_retries: args.vara_args.vara_rpc_retries, + vara_domain: args.gear_args.domain, + vara_port: args.gear_args.port, + vara_rpc_retries: args.gear_args.retries, }; - if let Some(bridging_payment_address) = args.bridging_payment_address { - let bridging_payment_address = hex_utils::decode_h256(&bridging_payment_address) - .expect("Failed to parse address"); - let relayer = gear_to_eth::paid_token_transfers::Relayer::new( - gear_api, - gsdk_args, - eth_api, - args.from_block, + let mut metrics_builder = MetricsBuilder::new(); + + match args.command { + GearEthTokensCommands::AllTokenTransfers => { + let relayer = gear_to_eth::all_token_transfers::Relayer::new( + gear_api, + gsdk_args, + eth_api, + args.from_block, + ) + .await + .unwrap(); + + metrics_builder = metrics_builder.register_service(&relayer); + + relayer.run(); + } + GearEthTokensCommands::PaidTokenTransfers { bridging_payment_address, - ) - .await - .unwrap(); - - MetricsBuilder::new() - .register_service(&relayer) - .build() - .run(args.prometheus_args.endpoint) - .await; - - relayer.run(); - } else { - let relayer = gear_to_eth::all_token_transfers::Relayer::new( - gear_api, - gsdk_args, - eth_api, - args.from_block, - ) - .await - .unwrap(); - - MetricsBuilder::new() - .register_service(&relayer) - .build() - .run(args.prometheus_args.endpoint) - .await; - - relayer.run(); + } => { + let bridging_payment_address = + hex_utils::decode_h256(&bridging_payment_address) + .expect("Failed to parse address"); + + let relayer = gear_to_eth::paid_token_transfers::Relayer::new( + gear_api, + gsdk_args, + eth_api, + args.from_block, + bridging_payment_address, + ) + .await + .unwrap(); + + metrics_builder = metrics_builder.register_service(&relayer); + + relayer.run(); + } } + metrics_builder + .build() + .run(args.prometheus_args.endpoint) + .await; + loop { // relayer.run() spawns thread and exits, so we need to add this loop after calling run. std::thread::sleep(Duration::from_millis(100)); } } - CliCommands::RelayCheckpoints(args) => ethereum_checkpoints::relay(args).await, - CliCommands::RelayErc20(RelayErc20Args { common, command }) => { - let eth_api = create_eth_client(&common.ethereum_args); - let beacon_client = create_beacon_client(&common.beacon_rpc).await; + CliCommands::EthGearCore(args) => { + let gear_api = create_gclient_client(&args.gear_args).await; + + let beacon_client = create_beacon_client(&args.beacon_args).await; + + let program_id = + hex_utils::decode_h256(&args.program_id).expect("Failed to decode program_id"); + + let relayer = ethereum_checkpoints::Relayer::new(program_id, beacon_client, gear_api); + + MetricsBuilder::new() + .register_service(&relayer) + .build() + .run(args.prometheus_args.endpoint) + .await; + + relayer.run().await; + } + CliCommands::EthGearTokens(EthGearTokensArgs { + command, + checkpoint_light_client_address, + historical_proxy_address, + vft_manager_address, + gear_args, + ethereum_args, + beacon_rpc, + prometheus_args, + }) => { + let eth_api = create_eth_client(ðereum_args); + let beacon_client = create_beacon_client(&beacon_rpc).await; let gsdk_args = message_relayer::common::GSdkArgs { - vara_domain: common.vara_args.vara_domain, - vara_port: common.vara_args.vara_port, - vara_rpc_retries: common.vara_args.vara_rpc_retries, + vara_domain: gear_args.common.domain, + vara_port: gear_args.common.port, + vara_rpc_retries: gear_args.common.retries, }; let checkpoint_light_client_address = - hex_utils::decode_h256(&common.checkpoint_light_client_address) + hex_utils::decode_h256(&checkpoint_light_client_address) .expect("Failed to parse address"); - let historical_proxy_address = hex_utils::decode_h256(&common.historical_proxy_address) - .expect("Failed to parse address"); - let vft_manager_address = hex_utils::decode_h256(&common.vft_manager_address) - .expect("Failed to parse address"); + let historical_proxy_address = + hex_utils::decode_h256(&historical_proxy_address).expect("Failed to parse address"); + let vft_manager_address = + hex_utils::decode_h256(&vft_manager_address).expect("Failed to parse address"); match command { - RelayErc20Commands::AllTokenTransfers { - erc20_treasury_address, + EthGearTokensCommands::AllTokenTransfers { + erc20_manager_address, } => { - let erc20_treasury_address = hex_utils::decode_h160(&erc20_treasury_address) + let erc20_manager_address = hex_utils::decode_h160(&erc20_manager_address) .expect("Failed to parse address"); let relayer = eth_to_gear::all_token_transfers::Relayer::new( gsdk_args, - common.vara_suri, + gear_args.suri, eth_api, beacon_client, - erc20_treasury_address, + erc20_manager_address, checkpoint_light_client_address, historical_proxy_address, vft_manager_address, @@ -431,13 +228,12 @@ async fn main() { MetricsBuilder::new() .register_service(&relayer) .build() - .run(common.prometheus_args.endpoint) + .run(prometheus_args.endpoint) .await; relayer.run(); } - - RelayErc20Commands::PaidTokenTransfers { + EthGearTokensCommands::PaidTokenTransfers { bridging_payment_address, } => { let bridging_payment_address = @@ -446,7 +242,7 @@ async fn main() { let relayer = eth_to_gear::paid_token_transfers::Relayer::new( gsdk_args, - common.vara_suri, + gear_args.suri, eth_api, beacon_client, bridging_payment_address, @@ -460,7 +256,7 @@ async fn main() { MetricsBuilder::new() .register_service(&relayer) .build() - .run(common.prometheus_args.endpoint) + .run(prometheus_args.endpoint) .await; relayer.run(); @@ -472,30 +268,115 @@ async fn main() { std::thread::sleep(Duration::from_millis(100)); } } + CliCommands::GearEthManual(args) => { + let nonce = hex_utils::decode_h256(&args.nonce).expect("Failed to parse message nonce"); + let eth_api = create_eth_signer_client(&args.ethereum_args); + let gear_api = create_gear_client(&args.gear_args).await; + + gear_to_eth::manual::relay(gear_api, eth_api, nonce, args.block, args.from_eth_block) + .await; + + loop { + // relay() spawns thread and exits, so we need to add this loop after calling run. + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + CliCommands::EthGearManual(EthGearManualArgs { + tx_hash, + slot, + checkpoint_light_client, + historical_proxy, + receiver_program, + receiver_route, + gear_args, + ethereum_args, + beacon_args, + }) => { + let gear_client_args = message_relayer::common::GSdkArgs { + vara_domain: gear_args.common.domain, + vara_port: gear_args.common.port, + vara_rpc_retries: gear_args.common.retries, + }; + let eth_api = create_eth_client(ðereum_args); + let beacon_client = create_beacon_client(&beacon_args).await; + let checkpoint_light_client_address = hex_utils::decode_h256(&checkpoint_light_client) + .expect("Failed to parse checkpoint light client address"); + let historical_proxy_address = hex_utils::decode_h256(&historical_proxy) + .expect("Failed to parse historical proxy address"); + let receiver_address = hex_utils::decode_h256(&receiver_program) + .expect("Failed to parse receiver program address"); + let receiver_route = hex_utils::decode_byte_vec(&receiver_route) + .expect("Failed to decode receiver route"); + let tx_hash = hex_utils::decode_h256(&tx_hash) + .expect("Failed to decode tx hash") + .0 + .into(); + + eth_to_gear::manual::relay( + gear_client_args, + gear_args.suri, + eth_api, + beacon_client, + checkpoint_light_client_address, + historical_proxy_address, + receiver_address, + receiver_route, + tx_hash, + slot, + ) + .await; + + loop { + // relay() spawns thread and exits, so we need to add this loop after calling run. + tokio::time::sleep(Duration::from_secs(1)).await; + } + } }; } -async fn create_gear_client(args: &VaraArgs) -> GearApi { - GearApi::new(&args.vara_domain, args.vara_port, args.vara_rpc_retries) +async fn create_gclient_client(args: &GearSignerArgs) -> gclient::GearApi { + gclient::GearApi::builder() + .retries(args.common.retries) + .suri(&args.suri) + .build(gclient::WSAddress::new( + &args.common.domain, + args.common.port, + )) + .await + .expect("GearApi client should be created") +} + +async fn create_gear_client(args: &GearArgs) -> GearApi { + GearApi::new(&args.domain, args.port, args.retries) .await .unwrap_or_else(|err| panic!("Error while creating gear client: {}", err)) } -fn create_eth_client(args: &EthereumArgs) -> EthApi { +fn create_eth_signer_client(args: &EthereumSignerArgs) -> EthApi { let EthereumArgs { eth_endpoint, - fee_payer, relayer_address, mq_address, - } = args; + } = &args.ethereum_args; EthApi::new( eth_endpoint, mq_address, relayer_address, - fee_payer.as_deref(), + Some(&args.fee_payer), ) - .unwrap_or_else(|err| panic!("Error while creating ethereum client: {}", err)) + .expect("Error while creating ethereum client") +} + +fn create_eth_client(args: &EthereumArgs) -> EthApi { + let EthereumArgs { + eth_endpoint, + relayer_address, + mq_address, + } = args; + + EthApi::new(eth_endpoint, mq_address, relayer_address, None) + .expect("Error while creating ethereum client") } async fn create_beacon_client(args: &BeaconRpcArgs) -> BeaconClient { @@ -508,15 +389,15 @@ async fn create_beacon_client(args: &BeaconRpcArgs) -> BeaconClient { async fn create_proof_storage( proof_storage_args: &ProofStorageArgs, - vara_args: &VaraArgs, + gear_args: &GearArgs, mut metrics: MetricsBuilder, ) -> (Box, MetricsBuilder) { let proof_storage: Box = if let Some(fee_payer) = proof_storage_args.gear_fee_payer.as_ref() { let proof_storage = GearProofStorage::new( - &vara_args.vara_domain, - vara_args.vara_port, - vara_args.vara_rpc_retries, + &gear_args.domain, + gear_args.port, + gear_args.retries, fee_payer, "./onchain_proof_storage_data".into(), ) diff --git a/relayer/src/message_relayer/common/ethereum/deposit_event_extractor.rs b/relayer/src/message_relayer/common/ethereum/deposit_event_extractor.rs index 1f74c775..5b405df7 100644 --- a/relayer/src/message_relayer/common/ethereum/deposit_event_extractor.rs +++ b/relayer/src/message_relayer/common/ethereum/deposit_event_extractor.rs @@ -16,7 +16,7 @@ pub struct DepositEventExtractor { eth_api: EthApi, beacon_client: BeaconClient, - erc20_treasury_address: H160, + erc20_manager_address: H160, metrics: Metrics, } @@ -37,12 +37,12 @@ impl_metered_service! { } impl DepositEventExtractor { - pub fn new(eth_api: EthApi, beacon_client: BeaconClient, erc20_treasury_address: H160) -> Self { + pub fn new(eth_api: EthApi, beacon_client: BeaconClient, erc20_manager_address: H160) -> Self { Self { eth_api, beacon_client, - erc20_treasury_address, + erc20_manager_address, metrics: Metrics::new(), } @@ -80,7 +80,7 @@ impl DepositEventExtractor { ) -> anyhow::Result<()> { let events = self .eth_api - .fetch_deposit_events(self.erc20_treasury_address, block.0) + .fetch_deposit_events(self.erc20_manager_address, block.0) .await?; if events.is_empty() { diff --git a/relayer/src/message_relayer/common/gear/message_sender/mod.rs b/relayer/src/message_relayer/common/gear/message_sender/mod.rs index c3ccd74c..012622b4 100644 --- a/relayer/src/message_relayer/common/gear/message_sender/mod.rs +++ b/relayer/src/message_relayer/common/gear/message_sender/mod.rs @@ -33,7 +33,9 @@ pub struct MessageSender { eth_api: EthApi, beacon_client: BeaconClient, historical_proxy_address: H256, - vft_manager_address: H256, + receiver_address: H256, + receiver_route: Vec, + decode_reply: bool, waiting_checkpoint: Vec, @@ -64,22 +66,26 @@ impl_metered_service! { } impl MessageSender { + #[allow(clippy::too_many_arguments)] pub fn new( args: GSdkArgs, suri: String, eth_api: EthApi, beacon_client: BeaconClient, historical_proxy_address: H256, - vft_manager_address: H256, + receiver_address: H256, + receiver_route: Vec, + decode_reply: bool, ) -> Self { Self { args, suri, eth_api, beacon_client, - historical_proxy_address, - vft_manager_address, + receiver_address, + receiver_route, + decode_reply, waiting_checkpoint: vec![], @@ -170,12 +176,12 @@ impl MessageSender { let mut proxy_service = HistoricalProxy::new(remoting.clone()); - let (_, vft_manager_reply) = proxy_service + let (_, receiver_reply) = proxy_service .redirect( payload.proof_block.block.slot, payload.encode(), - self.vft_manager_address.into(), - ::ROUTE.to_vec(), + self.receiver_address.into(), + self.receiver_route.clone(), ) .with_gas_limit(gas_limit) .send_recv(self.historical_proxy_address.into()) @@ -188,17 +194,22 @@ impl MessageSender { })? .map_err(|e| anyhow::anyhow!("Internal historical proxy error: {:?}", e))?; - let reply = SubmitReceipt::decode_reply(&vft_manager_reply) - .map_err(|e| anyhow::anyhow!("Failed to decode vft-manager reply: {:?}", e))?; + // TODO: Refactor this approach. #255 + if self.decode_reply { + let reply = SubmitReceipt::decode_reply(&receiver_reply) + .map_err(|e| anyhow::anyhow!("Failed to decode vft-manager reply: {:?}", e))?; - match reply { - Ok(_) => {} - Err(vft_manager_client::Error::NotSupportedEvent) => { - log::warn!("Dropping message for {} as it's considered invalid by vft-manager(probably unsupported ERC20 token)", message.tx_hash); - } - Err(e) => { - anyhow::bail!("Internal vft-manager error: {:?}", e); + match reply { + Ok(_) => {} + Err(vft_manager_client::Error::NotSupportedEvent) => { + log::warn!("Dropping message for {} as it's considered invalid by vft-manager(probably unsupported ERC20 token)", message.tx_hash); + } + Err(e) => { + anyhow::bail!("Internal vft-manager error: {:?}", e); + } } + } else { + log::info!("Received reply: {}", hex::encode(&receiver_reply)); } Ok(()) diff --git a/relayer/src/message_relayer/eth_to_gear/all_token_transfers.rs b/relayer/src/message_relayer/eth_to_gear/all_token_transfers.rs index 3997068c..a5e009df 100644 --- a/relayer/src/message_relayer/eth_to_gear/all_token_transfers.rs +++ b/relayer/src/message_relayer/eth_to_gear/all_token_transfers.rs @@ -1,6 +1,7 @@ use std::iter; use primitive_types::{H160, H256}; +use sails_rs::calls::ActionIo; use ethereum_beacon_client::BeaconClient; use ethereum_client::EthApi; @@ -47,7 +48,7 @@ impl Relayer { suri: String, eth_api: EthApi, beacon_client: BeaconClient, - erc20_treasury_address: H160, + erc20_manager_address: H160, checkpoint_light_client_address: H256, historical_proxy_address: H256, vft_manager_address: H256, @@ -67,12 +68,15 @@ impl Relayer { let deposit_event_extractor = DepositEventExtractor::new( eth_api.clone(), beacon_client.clone(), - erc20_treasury_address, + erc20_manager_address, ); let checkpoints_extractor = CheckpointsExtractor::new(args.clone(), checkpoint_light_client_address); + let route = + ::ROUTE.to_vec(); + let gear_message_sender = MessageSender::new( args, suri, @@ -80,6 +84,8 @@ impl Relayer { beacon_client, historical_proxy_address, vft_manager_address, + route, + true, ); Ok(Self { diff --git a/relayer/src/message_relayer/eth_to_gear/manual.rs b/relayer/src/message_relayer/eth_to_gear/manual.rs new file mode 100644 index 00000000..fc9c8739 --- /dev/null +++ b/relayer/src/message_relayer/eth_to_gear/manual.rs @@ -0,0 +1,79 @@ +use std::sync::mpsc::channel; + +use primitive_types::H256; + +use ethereum_beacon_client::BeaconClient; +use ethereum_client::{EthApi, TxHash}; +use gear_rpc_client::GearApi; + +use crate::message_relayer::common::{ + gear::{ + block_listener::BlockListener as GearBlockListener, + checkpoints_extractor::CheckpointsExtractor, message_sender::MessageSender, + }, + EthereumSlotNumber, GSdkArgs, TxHashWithSlot, +}; + +#[allow(clippy::too_many_arguments)] +pub async fn relay( + gear_client_args: GSdkArgs, + gear_suri: String, + + eth_api: EthApi, + beacon_client: BeaconClient, + + checkpoint_light_client_address: H256, + historical_proxy_address: H256, + receiver_address: H256, + + receiver_route: Vec, + + tx_hash: TxHash, + slot: u64, +) { + let gear_api = GearApi::new( + &gear_client_args.vara_domain, + gear_client_args.vara_port, + gear_client_args.vara_rpc_retries, + ) + .await + .expect("Failed to create GearApi"); + + let from_gear_block = gear_api + .latest_finalized_block() + .await + .expect("Failed to fetch latest finalized block"); + + let from_gear_block = gear_api + .block_hash_to_number(from_gear_block) + .await + .expect("Failed to fetch block number by hash"); + + let gear_block_listener = GearBlockListener::new(gear_client_args.clone(), from_gear_block); + + let checkpoints_extractor = + CheckpointsExtractor::new(gear_client_args.clone(), checkpoint_light_client_address); + + let gear_message_sender = MessageSender::new( + gear_client_args, + gear_suri, + eth_api, + beacon_client, + historical_proxy_address, + receiver_address, + receiver_route, + false, + ); + + let [gear_blocks] = gear_block_listener.run(); + let (deposit_events_sender, deposit_events_receiver) = channel(); + let checkpoints = checkpoints_extractor.run(gear_blocks); + gear_message_sender.run(deposit_events_receiver, checkpoints); + + deposit_events_sender + .send(TxHashWithSlot { + tx_hash, + slot_number: EthereumSlotNumber(slot), + }) + .expect("Failed to send message to channel"); +} diff --git a/relayer/src/message_relayer/eth_to_gear/mod.rs b/relayer/src/message_relayer/eth_to_gear/mod.rs index a85883fb..d960405e 100644 --- a/relayer/src/message_relayer/eth_to_gear/mod.rs +++ b/relayer/src/message_relayer/eth_to_gear/mod.rs @@ -1,2 +1,3 @@ pub mod all_token_transfers; +pub mod manual; pub mod paid_token_transfers; diff --git a/relayer/src/message_relayer/eth_to_gear/paid_token_transfers.rs b/relayer/src/message_relayer/eth_to_gear/paid_token_transfers.rs index fe4c0dd0..23123314 100644 --- a/relayer/src/message_relayer/eth_to_gear/paid_token_transfers.rs +++ b/relayer/src/message_relayer/eth_to_gear/paid_token_transfers.rs @@ -1,6 +1,7 @@ use std::iter; use primitive_types::{H160, H256}; +use sails_rs::calls::ActionIo; use ethereum_beacon_client::BeaconClient; use ethereum_client::EthApi; @@ -73,6 +74,9 @@ impl Relayer { let checkpoints_extractor = CheckpointsExtractor::new(args.clone(), checkpoint_light_client_address); + let route = + ::ROUTE.to_vec(); + let gear_message_sender = MessageSender::new( args, suri, @@ -80,6 +84,8 @@ impl Relayer { beacon_client, historical_proxy_address, vft_manager_address, + route, + true, ); Ok(Self { diff --git a/relayer/src/message_relayer/gear_to_eth/manual.rs b/relayer/src/message_relayer/gear_to_eth/manual.rs new file mode 100644 index 00000000..00b38c5e --- /dev/null +++ b/relayer/src/message_relayer/gear_to_eth/manual.rs @@ -0,0 +1,72 @@ +use std::sync::mpsc::channel; + +use primitive_types::H256; + +use ethereum_client::EthApi; +use gear_rpc_client::GearApi; + +use crate::message_relayer::common::{ + ethereum::{ + block_listener::BlockListener as EthereumBlockListener, + merkle_root_extractor::MerkleRootExtractor, message_sender::MessageSender, + }, + GearBlockNumber, MessageInBlock, +}; + +pub async fn relay( + gear_api: GearApi, + eth_api: EthApi, + message_nonce: H256, + gear_block: u32, + from_eth_block: Option, +) { + let from_eth_block = if let Some(block) = from_eth_block { + block + } else { + eth_api + .finalized_block_number() + .await + .expect("Failed to get finalized block number on ethereum") + }; + + let gear_block_hash = gear_api + .block_number_to_hash(gear_block) + .await + .expect("Failed to fetch block hash by number"); + + let message_queued_events = gear_api + .message_queued_events(gear_block_hash) + .await + .expect("Failed to fetch MessageQueued events from gear block"); + + let message = message_queued_events + .into_iter() + .find(|m| m.nonce_le == message_nonce.as_bytes()) + .unwrap_or_else(|| { + panic!( + "Message with nonce {} is not found in gear block {}", + hex::encode(message_nonce.as_bytes()), + gear_block + ) + }); + + let message_in_block = MessageInBlock { + message, + block: GearBlockNumber(gear_block), + block_hash: gear_block_hash, + }; + + let (queued_messages_sender, queued_messages_receiver) = channel(); + + let ethereum_block_listener = EthereumBlockListener::new(eth_api.clone(), from_eth_block); + let merkle_root_extractor = MerkleRootExtractor::new(eth_api.clone(), gear_api.clone()); + let message_sender = MessageSender::new(eth_api, gear_api); + + let ethereum_blocks = ethereum_block_listener.run(); + let merkle_roots = merkle_root_extractor.run(ethereum_blocks); + message_sender.run(queued_messages_receiver, merkle_roots); + + queued_messages_sender + .send(message_in_block) + .expect("Failed to send message to channel"); +} diff --git a/relayer/src/message_relayer/gear_to_eth/mod.rs b/relayer/src/message_relayer/gear_to_eth/mod.rs index a85883fb..d960405e 100644 --- a/relayer/src/message_relayer/gear_to_eth/mod.rs +++ b/relayer/src/message_relayer/gear_to_eth/mod.rs @@ -1,2 +1,3 @@ pub mod all_token_transfers; +pub mod manual; pub mod paid_token_transfers;