Skip to content

Commit

Permalink
(feat) Poll blocks for transactions - WIP for transit
Browse files Browse the repository at this point in the history
  • Loading branch information
rrw-zilliqa committed Aug 4, 2024
1 parent 1bfda35 commit e089bb5
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 60 deletions.
190 changes: 146 additions & 44 deletions bridge-validators/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@ use std::{marker::PhantomData, time::Duration};
use async_stream::try_stream;
use async_trait::async_trait;

use anyhow::Result;
use anyhow::{anyhow, Result};
use ethers::{
providers::Middleware,
types::{BlockNumber, Filter, Log, U64},
types::{Block, BlockNumber, Filter, Log, TransactionReceipt, TxHash, ValueOrArray, U64},
};
use ethers_contract::{parse_log, EthEvent};
use futures::{Stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::time::interval;
use tracing::{info, warn};

use crate::client::ChainClient;
use crate::client::{ChainClient, LogStrategy};

#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockByNumberRequest {
#[serde(rename = "blockNumber")]
pub block_number: BlockNumber,
#[serde(rename = "transaction_detail_flag")]
pub transaction_detail_flag: bool,
}

#[async_trait]
pub trait BlockPolling {
Expand All @@ -22,16 +31,100 @@ pub trait BlockPolling {
#[allow(dead_code)]
async fn get_historic_blocks(&self, from: u64, to: u64) -> Result<()>;

async fn get_events<D>(
&self,
event: Filter,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<Vec<D>>
async fn get_events<D>(&self, event: Filter, from_block: u64, to_block: u64) -> Result<Vec<D>>
where
D: EthEvent;
}

impl ChainClient {
async fn get_logs_from_blocks(&self, event: Filter) -> Result<Vec<Log>> {
// Fetch transactions for all the blocks in Filter.
let mut result: Vec<Log> = Vec::new();
let from_block: u64 = event
.get_from_block()
.ok_or(anyhow!(
"from_block is not present in get_logs_from_blocks()"
))?
.as_u64();
let to_block: u64 = event
.get_to_block()
.ok_or(anyhow!("to_block is not present in get_logs_from_blocks()"))?
.as_u64();
for block_number in from_block..to_block + 1 {
// eth_GetBlockReceipts is as broken as eth_getLogs, so we need to check each transaction
// individually. Joy!
let the_block: Option<Block<TxHash>> =
self.client.provider().get_block(block_number).await?;
if let Some(block) = the_block {
// go through alll the transactions
for txn_hash in block.transactions {
// We have a transaction. Did it have any logs?
println!("block {} txn {:#x}", block_number, txn_hash);
// Get the receipt
let maybe_receipt = self
.client
.provider()
.get_transaction_receipt(txn_hash)
.await?;
if let Some(receipt) = maybe_receipt {
// Yay!
println!("Got receipt for txn {:#x}", txn_hash);
for log in receipt.logs {
// Because FML, the filter doesn't actually include the address.
// so we have to include it manually.
let address_matches = log.address == self.chain_gateway_address;
if !address_matches {
println!("Address does not match");
continue;
}
println!("Case 3");
let mut matches: bool = true;
for topic_idx in 0..event.topics.len() {
if let Some(x) = &event.topics[topic_idx] {
if let Some(y) = &log.topics.get(topic_idx) {
let match_this_topic = match x {
ValueOrArray::Value(xv) => {
if let Some(xxv) = xv {
println!("Case 4 {:#x} vs {:#x}", xxv, y);
xxv == *y
} else {
true
}
}
ValueOrArray::Array(xvs) => xvs.iter().any(|cand| {
if let Some(xcand) = cand {
*xcand == **y
} else {
false
}
}),
};
if !match_this_topic {
matches = false;
break;
}
} else {
matches = false;
break;
}
}
// If there's no filter element for this topic, we're fine.
}
if matches {
println!("Match!");
result.push(log);
}
}
} else {
println!("WARNING: txn {:#x} has no receipt", txn_hash);
}
}
}
}
Ok(vec![])
}
}

#[async_trait]
impl BlockPolling for ChainClient {
async fn stream_finalized_blocks(&mut self) -> Result<()> {
Expand All @@ -56,38 +149,36 @@ impl BlockPolling for ChainClient {
Ok(())
}

async fn get_events<D>(
&self,
event: Filter,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<Vec<D>>
async fn get_events<D>(&self, event: Filter, from_block: u64, to_block: u64) -> Result<Vec<D>>
where
D: EthEvent,
{
let event = event.from_block(from_block).to_block(to_block);

let logs: Vec<serde_json::Value> = self
.client
.provider()
.request("eth_getLogs", [event])
.await?;

let logs = logs
.into_iter()
.map(|log| {
// Parse log values
let mut log = log;
match log["removed"].as_str() {
Some("true") => log["removed"] = serde_json::Value::Bool(true),
Some("false") => log["removed"] = serde_json::Value::Bool(false),
Some(&_) => warn!("invalid parsing"),
None => (),
};
let log: Log = serde_json::from_value(log)?;
Ok(log)
})
.collect::<Result<Vec<Log>>>()?;
let logs: Vec<Log> = match self.log_strategy {
LogStrategy::GetLogs => {
let logs: Vec<serde_json::Value> = self
.client
.provider()
.request("eth_getLogs", [event])
.await?;
logs.into_iter()
.map(|log| {
// Parse log values
let mut log = log;
match log["removed"].as_str() {
Some("true") => log["removed"] = serde_json::Value::Bool(true),
Some("false") => log["removed"] = serde_json::Value::Bool(false),
Some(&_) => warn!("invalid parsing"),
None => (),
};
let log: Log = serde_json::from_value(log)?;
Ok(log)
})
.collect::<Result<Vec<Log>>>()?
}
LogStrategy::GetTransactions => self.get_logs_from_blocks(event).await?,
};

let events: Vec<D> = logs
.into_iter()
Expand Down Expand Up @@ -133,25 +224,33 @@ impl<D: EthEvent> EventListener<D> {
where
D: EthEvent,
{
// Some chains (Zilliqa!) can't get it together to broadcast events at the block they are
// currently at, so there is an option to deliberately delay checking back a few blocks,
// until the node we are pointed at has the logs for the block and can therefore reply
// correctly.
let scan_behind_blocks = self.chain_client.scan_behind_blocks;
let new_block: U64 = match self.get_block_number().await {
Err(e) => {
warn!(?e);
let vec = Ok(vec![]);
return vec;
}
// Return early if smaller block
Ok(block) if block <= self.current_block => return Ok(vec![]),
Ok(block) => block,
};

// Don't worry about blocks we've already scanned.
let min_block = self.current_block + 1;
// Don't worry about blocks which are too recent for us to care about.
let max_block = new_block - scan_behind_blocks;
info!("min_block {}, max_block {}", min_block, max_block);
if max_block <= min_block {
// No point in checking, return early
return Ok(vec![]);
}
// `eth_getLogs`'s block_number is inclusive, so `current_block` is already retrieved
let events = match self
.chain_client
.get_events(
self.event.clone(),
(self.current_block + 1).into(),
new_block.into(),
)
.get_events(self.event.clone(), min_block.as_u64(), max_block.as_u64())
.await
{
Err(err) => {
Expand Down Expand Up @@ -182,6 +281,7 @@ impl<D: EthEvent> EventListener<D> {
)
}

info!("Assigning current_block to {}", new_block);
self.current_block = new_block;

Ok(events)
Expand All @@ -191,7 +291,9 @@ impl<D: EthEvent> EventListener<D> {
let stream = try_stream! {
// TODO: update block interval on config
let mut interval = interval(Duration::from_secs(3));
self.current_block = self.chain_client.client.get_block_number().await?;
// Set this down 1 because we (almost) certainly haven't scanned this block
// yet...
self.current_block = self.chain_client.client.get_block_number().await? - 1;

loop {
interval.tick().await;
Expand Down
25 changes: 15 additions & 10 deletions bridge-validators/src/bridge_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl BridgeNode {
async fn get_historic_events<D>(
&self,
event: Event<Arc<Client>, Client, D>,
to_block: BlockNumber,
to_block: u64,
) -> Result<Vec<D>>
where
D: EthEvent,
Expand All @@ -89,7 +89,10 @@ impl BridgeNode {
} else {
BlockNumber::Finalized
};
info!("Getting Historic Events {}", to_block);
info!(
"Getting Historic Events for chainId#{}: {}",
self.chain_client.chain_id, to_block
);

let to_block_number = self
.chain_client
Expand All @@ -107,10 +110,10 @@ impl BridgeNode {
let dispatch_events = self
.get_historic_events(
chain_gateway.event::<DispatchedFilter>(),
BlockNumber::Number(to_block_number.into()),
to_block_number.as_u64(),
)
.await?;
dbg!(dispatch_events.len());
info!(" .. dispatch_events: {}", dispatch_events.len());

for dispatch in dispatch_events {
self.handle_dispatch_event(dispatch)?;
Expand All @@ -119,17 +122,17 @@ impl BridgeNode {
let relay_events = self
.get_historic_events(
chain_gateway.event::<RelayedFilter>(),
BlockNumber::Number(to_block_number.into()),
to_block_number.as_u64(),
)
.await?;

dbg!(relay_events.len());
info!(" .. relay_events: {}", relay_events.len());

for relay in relay_events {
self.handle_relay_event(relay)?;
}

unimplemented!();
Ok(())
}

pub async fn listen_events(&mut self) -> Result<()> {
Expand Down Expand Up @@ -278,7 +281,7 @@ impl BridgeNode {
let Relay { signature, event } = echo;
let nonce = event.nonce;
let event_hash = event.hash();

info!("handling relay {:?}", echo);
let signature = Signature::try_from(signature.to_vec().as_slice())?;

// update validator set in case it has changed
Expand Down Expand Up @@ -329,9 +332,11 @@ impl BridgeNode {
};

info!(
"Handling received: {:?}, collected: {:?}",
"Handling received: {:?}, collected: {:?}, is_leader {:?}, has_supermajority {:?}",
&echo,
event_signatures.signatures.len()
event_signatures.signatures.len(),
self.is_leader,
self.has_supermajority(event_signatures.signatures.len())
);

// if leader and majority, create request to dispatch
Expand Down
19 changes: 19 additions & 0 deletions bridge-validators/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ use crate::ChainConfig;

pub type Client = NonceManagerMiddleware<SignerMiddleware<Provider<Http>, LocalWallet>>;

// ZQ1 seems to have given up responding to getLogs(), so we now have
// a way to query all transactions on the chain to obtain our logs.
#[derive(Debug, Clone)]
pub enum LogStrategy {
GetLogs,
GetTransactions,
}

#[derive(Debug, Clone)]
pub struct ChainClient {
pub client: Arc<Client>,
Expand All @@ -25,6 +33,8 @@ pub struct ChainClient {
pub chain_gateway_block_deployed: u64,
pub block_instant_finality: bool,
pub legacy_gas_estimation: bool,
pub scan_behind_blocks: u64,
pub log_strategy: LogStrategy,
}

impl fmt::Display for ChainClient {
Expand All @@ -50,6 +60,13 @@ impl ChainClient {
// TODO: get the validator_manager_address from chain_gateway itself
let chain_gateway = ChainGateway::new(config.chain_gateway_address, client.clone());
let validator_manager_address: Address = chain_gateway.validator_manager().call().await?;
let strategy = match config.use_get_transactions {
None => LogStrategy::GetLogs,
Some(v) => match v {
false => LogStrategy::GetLogs,
true => LogStrategy::GetTransactions,
},
};
info!("... success!");
Ok(ChainClient {
client,
Expand All @@ -60,6 +77,8 @@ impl ChainClient {
chain_gateway_block_deployed: config.chain_gateway_block_deployed,
block_instant_finality: config.block_instant_finality.unwrap_or_default(),
legacy_gas_estimation: config.legacy_gas_estimation.unwrap_or_default(),
scan_behind_blocks: config.scan_behind_blocks.unwrap_or_default(),
log_strategy: strategy,
})
}
}
Expand Down
Loading

0 comments on commit e089bb5

Please sign in to comment.