Skip to content

Commit

Permalink
feat: forward-backward message processor
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed May 14, 2024
1 parent 55b808e commit a70fcbd
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 34 deletions.
20 changes: 17 additions & 3 deletions rust/agents/relayer/src/merkle_tree/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
};

use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_core::{HyperlaneDomain, MerkleTreeInsertion};
Expand All @@ -18,12 +17,10 @@ use crate::processor::ProcessorExt;
use super::builder::MerkleTreeBuilder;

/// Finds unprocessed merkle tree insertions and adds them to the prover sync
#[derive(new)]
pub struct MerkleTreeProcessor {
db: HyperlaneRocksDB,
metrics: MerkleTreeProcessorMetrics,
prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
#[new(default)]
leaf_index: u32,
}

Expand Down Expand Up @@ -65,6 +62,23 @@ impl ProcessorExt for MerkleTreeProcessor {
}

impl MerkleTreeProcessor {
pub fn new(
db: HyperlaneRocksDB,
prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
metrics: MerkleTreeProcessorMetrics,
) -> Self {
Self {
db,
prover_sync,
metrics,
leaf_index: db
.retrieve_highest_processed_igp_sequence()
.ok()
.flatten()
.unwrap_or(0),
}
}

fn next_unprocessed_leaf(&mut self) -> Result<Option<MerkleTreeInsertion>> {
let leaf = if let Some(insertion) = self
.db
Expand Down
123 changes: 92 additions & 31 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
};

use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage};
Expand All @@ -20,7 +19,6 @@ use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};
/// Finds unprocessed messages from an origin and submits then through a channel
/// for to the appropriate destination.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct MessageProcessor {
db: HyperlaneRocksDB,
whitelist: Arc<MatchingList>,
Expand All @@ -32,16 +30,16 @@ pub struct MessageProcessor {
/// Needed context to send a message for each destination chain
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
metric_app_contexts: Vec<(MatchingList, String)>,
#[new(default)]
message_nonce: u32,
highest_message_nonce: u32,
lowest_message_nonce: u32,
}

impl Debug for MessageProcessor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MessageProcessor {{ whitelist: {:?}, blacklist: {:?}, message_nonce: {:?} }}",
self.whitelist, self.blacklist, self.message_nonce
self.whitelist, self.blacklist, self.highest_message_nonce
)
}
}
Expand All @@ -68,28 +66,28 @@ impl ProcessorExt for MessageProcessor {
// Skip if not whitelisted.
if !self.whitelist.msg_matches(&msg, true) {
debug!(?msg, whitelist=?self.whitelist, "Message not whitelisted, skipping");
self.message_nonce += 1;
self.highest_message_nonce += 1;
return Ok(());
}

// Skip if the message is blacklisted
if self.blacklist.msg_matches(&msg, false) {
debug!(?msg, blacklist=?self.blacklist, "Message blacklisted, skipping");
self.message_nonce += 1;
self.highest_message_nonce += 1;
return Ok(());
}

// Skip if the message is intended for this origin
if destination == self.domain().id() {
debug!(?msg, "Message destined for self, skipping");
self.message_nonce += 1;
self.highest_message_nonce += 1;
return Ok(());
}

// Skip if the message is intended for a destination we do not service
if !self.send_channels.contains_key(&destination) {
debug!(?msg, "Message destined for unknown domain, skipping");
self.message_nonce += 1;
self.highest_message_nonce += 1;
return Ok(());
}

Expand All @@ -106,7 +104,7 @@ impl ProcessorExt for MessageProcessor {
app_context,
);
self.send_channels[&destination].send(Box::new(pending_msg) as QueueOperation)?;
self.message_nonce += 1;
self.highest_message_nonce += 1;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand All @@ -115,35 +113,98 @@ impl ProcessorExt for MessageProcessor {
}

impl MessageProcessor {
pub fn new(
db: HyperlaneRocksDB,
whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
metrics: MessageProcessorMetrics,
send_channels: HashMap<u32, UnboundedSender<QueueOperation>>,
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
metric_app_contexts: Vec<(MatchingList, String)>,
) -> Self {
let highest_message_nonce = db
.retrieve_highest_processed_message_nonce()
.ok()
.flatten()
.unwrap_or(0);
Self {
db,
whitelist,
blacklist,
metrics,
send_channels,
destination_ctxs,
metric_app_contexts,
highest_message_nonce,
lowest_message_nonce: highest_message_nonce,
}
}

fn try_get_unprocessed_message(&mut self) -> Result<Option<HyperlaneMessage>> {
loop {
// First, see if we can find the message so we can update the gauge.
if let Some(message) = self.db.retrieve_message_by_nonce(self.message_nonce)? {
// Update the latest nonce gauges
self.metrics
.max_last_known_message_nonce_gauge
.set(message.nonce as i64);
if let Some(metrics) = self.metrics.get(message.destination) {
metrics.set(message.nonce as i64);
}

// If this message has already been processed, on to the next one.
if !self
.db
.retrieve_processed_by_nonce(&self.message_nonce)?
.unwrap_or(false)
{
return Ok(Some(message));
} else {
debug!(nonce=?self.message_nonce, "Message already marked as processed in DB");
self.message_nonce += 1;
}
if let (Some(message), new_nonce) =
self.try_get_next_unprocessed(self.highest_message_nonce, 1)?
{
self.highest_message_nonce = new_nonce;
return Ok(Some(message));
} else if let (Some(message), new_nonce) =
self.try_get_next_unprocessed(self.lowest_message_nonce, -1)?
{
self.lowest_message_nonce = new_nonce;
return Ok(Some(message));
} else {
trace!(nonce=?self.message_nonce, "No message found in DB for nonce");
trace!(nonce=?self.highest_message_nonce, "No message found in DB for nonce");
return Ok(None);
}
}
}

fn try_get_next_unprocessed(
&mut self,
mut nonce: u32,
increment: i32,
) -> Result<(Option<HyperlaneMessage>, u32)> {
if let Some(message) = self.indexed_message_with_nonce(nonce)? {
self.update_max_nonce_gauge(&message);

// If this message has already been processed, on to the next one.
if !self.processed_message_with_nonce(nonce)? {
return Ok((Some(message), nonce));
} else {
debug!(nonce=?nonce, "Message already marked as processed in DB");
nonce = (nonce as i32 + increment) as u32;
}
} else {
trace!(nonce=?nonce, "No message found in DB for nonce");
}
Ok((None, nonce))
}

fn update_max_nonce_gauge(&self, message: &HyperlaneMessage) {
self.metrics
.max_last_known_message_nonce_gauge
.set(message.nonce as i64);
if let Some(metrics) = self.metrics.get(message.destination) {
metrics.set(message.nonce as i64);
}
}

fn indexed_message_with_nonce(&self, nonce: u32) -> Result<Option<HyperlaneMessage>> {
if nonce < 0 {
return Ok(None);
}
let msg = self.db.retrieve_message_by_nonce(nonce)?;
Ok(msg)
}

fn processed_message_with_nonce(&self, nonce: u32) -> Result<bool> {
let processed = self
.db
.retrieve_processed_by_nonce(&nonce)?
.unwrap_or(false);
Ok(processed)
}
}

#[derive(Debug)]
Expand Down
38 changes: 38 additions & 0 deletions rust/hyperlane-base/src/db/rocks/hyperlane_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const MESSAGE_DISPATCHED_BLOCK_NUMBER: &str = "message_dispatched_block_number_"
const MESSAGE: &str = "message_";
const NONCE_PROCESSED: &str = "nonce_processed_";
const GAS_PAYMENT_BY_SEQUENCE: &str = "gas_payment_by_sequence_";
const HIGHEST_PROCESSED_IGP_SEQUENCE: &str = "highest_processed_igp_sequence_";
const HIGHEST_PROCESSED_MESSAGE_NONCE: &str = "highest_processed_message_nonce_";
const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_sequence_for_message_id_v2_";
const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v3_";
const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_";
Expand Down Expand Up @@ -108,6 +110,38 @@ impl HyperlaneRocksDB {
}
}

/// Update the nonce of the highest processed message we're aware of
pub fn update_max_seen_message_nonce(&self, nonce: u32) -> DbResult<()> {
let current_max = self.retrieve_highest_processed_message_nonce()?;
if let Some(current_max) = current_max {
if nonce > current_max {
self.store_highest_processed_message_nonce_number(&Default::default(), &nonce)?;
}
}
Ok(())
}

/// Retrieve the nonce of the highest processed message we're aware of
pub fn retrieve_highest_processed_message_nonce(&self) -> DbResult<Option<u32>> {
self.retrieve_highest_processed_message_nonce_number(&Default::default())
}

/// Update the nonce of the highest processed message we're aware of
pub fn update_max_seen_igp_sequence(&self, nonce: u32) -> DbResult<()> {
let current_max = self.retrieve_highest_processed_igp_sequence()?;
if let Some(current_max) = current_max {
if nonce > current_max {
self.store_highest_processed_igp_sequence_number(&Default::default(), &nonce)?;
}
}
Ok(())
}

/// Retrieve the nonce of the highest processed message we're aware of
pub fn retrieve_highest_processed_igp_sequence(&self) -> DbResult<Option<u32>> {
self.retrieve_highest_processed_igp_sequence_number(&Default::default())
}

/// If the provided gas payment, identified by its metadata, has not been
/// processed, processes the gas payment and records it as processed.
/// Returns whether the gas payment was processed for the first time.
Expand Down Expand Up @@ -479,3 +513,7 @@ make_store_and_retrieve!(
u32,
u64
);
// There's no unit struct Encode/Decode impl, so just use `bool`, have visibility be private (by omitting the first argument), and wrap
// with a function that always uses the `Default::default()` key
make_store_and_retrieve!(, highest_processed_igp_sequence_number, HIGHEST_PROCESSED_IGP_SEQUENCE, bool, u32);
make_store_and_retrieve!(, highest_processed_message_nonce_number, HIGHEST_PROCESSED_MESSAGE_NONCE, bool, u32);

0 comments on commit a70fcbd

Please sign in to comment.