Skip to content

Commit

Permalink
feat(host): Interop optimistic block re-execution hint (#983)
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby authored Jan 30, 2025
1 parent 57f59b7 commit ebdc804
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 61 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ kona-proof = { workspace = true, features = ["std"] }
kona-proof-interop.workspace = true
kona-client.workspace = true
kona-providers-alloy.workspace = true
kona-executor.workspace = true
kona-driver.workspace = true

# Maili
maili-rpc.workspace = true
Expand Down
196 changes: 160 additions & 36 deletions bin/host/src/interop/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,46 @@
//! preimages from a remote source serving the super-chain (interop) proof mode.
use super::InteropHostCli;
use alloy_consensus::{Header, TxEnvelope, EMPTY_ROOT_HASH};
use crate::single::SingleChainFetcher;
use alloy_consensus::{Header, Sealed, TxEnvelope, EMPTY_ROOT_HASH};
use alloy_eips::{
eip2718::Encodable2718,
eip4844::{IndexedBlobHash, FIELD_ELEMENTS_PER_BLOB},
BlockId,
};
use alloy_primitives::{address, keccak256, map::HashMap, Address, Bytes, B256};
use alloy_provider::{Provider, ReqwestProvider};
use alloy_rlp::{Decodable, EMPTY_STRING_CODE};
use alloy_rlp::{Decodable, Encodable, EMPTY_STRING_CODE};
use alloy_rpc_types::{
debug::ExecutionWitness, Block, BlockNumberOrTag, BlockTransactions, BlockTransactionsKind,
Transaction,
Block, BlockNumberOrTag, BlockTransactions, BlockTransactionsKind, Transaction,
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use kona_host::KeyValueStore;
use kona_driver::Driver;
use kona_executor::TrieDBProvider;
use kona_host::{KeyValueStore, PreimageServer};
use kona_preimage::{
errors::{PreimageOracleError, PreimageOracleResult},
HintRouter, PreimageFetcher, PreimageKey, PreimageKeyType,
BidirectionalChannel, HintReader, HintRouter, HintWriter, OracleReader, OracleServer,
PreimageFetcher, PreimageKey, PreimageKeyType,
};
use kona_proof::{
executor::KonaExecutor,
l1::{OracleBlobProvider, OracleL1ChainProvider, OraclePipeline},
l2::OracleL2ChainProvider,
sync::new_pipeline_cursor,
CachingOracle,
};
use kona_proof_interop::{Hint, HintType, PreState};
use kona_providers_alloy::{OnlineBeaconClient, OnlineBlobProvider};
use maili_protocol::BlockInfo;
use maili_registry::ROLLUP_CONFIGS;
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::{sync::RwLock, task};
use tracing::{error, trace, warn};

/// The [InteropFetcher] struct is responsible for fetching preimages from a remote source.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InteropFetcher<KV>
where
KV: KeyValueStore + ?Sized,
Expand All @@ -55,7 +64,7 @@ where

impl<KV> InteropFetcher<KV>
where
KV: KeyValueStore + ?Sized,
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
{
/// Create a new [InteropFetcher] with the given [KeyValueStore].
pub fn new(
Expand Down Expand Up @@ -601,39 +610,154 @@ where
Ok::<(), anyhow::Error>(())
})?;
}
HintType::L2PayloadWitness => {
if hint_data.len() < 32 {
HintType::L2BlockData => {
if hint_data.len() != 72 {
anyhow::bail!("Invalid hint data length: {}", hint_data.len());
}
let parent_block_hash = B256::from_slice(&hint_data.as_ref()[..32]);
let payload_attributes: OpPayloadAttributes =
serde_json::from_slice(&hint_data[32..])?;

let execute_payload_response: ExecutionWitness = self
let agreed_block_hash = B256::from_slice(&hint_data.as_ref()[..32]);
let disputed_block_hash = B256::from_slice(&hint_data.as_ref()[32..64]);
let chain_id = u64::from_be_bytes(
hint_data.as_ref()[64..72]
.try_into()
.map_err(|e| anyhow!("Error converting hint data to u64: {e}"))?,
);

let l2_provider = self
.l2_providers
.get(&self.active_l2_chain_id)
.ok_or(anyhow!("No active L2 chain ID"))?
.client()
.request::<(B256, OpPayloadAttributes), ExecutionWitness>(
"debug_executePayload",
(parent_block_hash, payload_attributes),
.get(&chain_id)
.ok_or(anyhow!("No provider found for chain ID {chain_id}"))?;
let rollup_config = ROLLUP_CONFIGS
.get(&chain_id)
.cloned()
.or_else(|| {
let local_cfgs = self.cfg.read_rollup_configs().ok()?;
local_cfgs.get(&chain_id).cloned()
})
.map(Arc::new)
.ok_or(anyhow!("No rollup config found for chain ID: {chain_id}"))?;

// Check if the block is canonical before continuing.
let parent_block = l2_provider
.get_block_by_hash(agreed_block_hash, BlockTransactionsKind::Hashes)
.await?
.ok_or(anyhow!("Block not found."))?;
let disputed_block = l2_provider
.get_block_by_number(
(parent_block.header.number + 1).into(),
BlockTransactionsKind::Hashes,
)
.await
.map_err(|e| anyhow!("Failed to fetch preimage: {e}"))?;
.await?
.ok_or(anyhow!("Block not found."))?;

let mut merged = HashMap::<B256, Bytes>::default();
merged.extend(execute_payload_response.state);
merged.extend(execute_payload_response.codes);
merged.extend(execute_payload_response.keys);
// Return early if the disputed block is canonical.
if disputed_block.header.hash == disputed_block_hash {
return Ok(());
}

let mut kv_write_lock = self.kv_store.write().await;
for (hash, preimage) in merged.into_iter() {
let computed_hash = keccak256(preimage.as_ref());
assert_eq!(computed_hash, hash, "Preimage hash does not match expected hash");
// Reproduce the preimages for the optimistic block's derivation + execution and
// store them in the key-value store.
let hint = BidirectionalChannel::new()?;
let preimage = BidirectionalChannel::new()?;
let fetcher = SingleChainFetcher::new(
self.kv_store.clone(),
self.l1_provider.clone(),
self.blob_provider.clone(),
l2_provider.clone(),
agreed_block_hash,
);
let server_task = task::spawn(
PreimageServer::new(
OracleServer::new(preimage.host),
HintReader::new(hint.host),
self.kv_store.clone(),
Some(Arc::new(RwLock::new(fetcher))),
)
.start(),
);
let client_task = task::spawn({
let InteropHostCli { l1_head, .. } = self.cfg;
async move {
let oracle = Arc::new(CachingOracle::new(
1024,
OracleReader::new(preimage.client),
HintWriter::new(hint.client),
));

let mut l1_provider = OracleL1ChainProvider::new(l1_head, oracle.clone());
let mut l2_provider = OracleL2ChainProvider::new(
agreed_block_hash,
rollup_config.as_ref().clone(),
oracle.clone(),
);
let beacon = OracleBlobProvider::new(oracle.clone());

let safe_head = l2_provider
.header_by_hash(agreed_block_hash)
.map(|header| Sealed::new_unchecked(header, agreed_block_hash))?;
let target_block = safe_head.number + 1;

let cursor = new_pipeline_cursor(
rollup_config.as_ref(),
safe_head,
&mut l1_provider,
&mut l2_provider,
)
.await?;
l2_provider.set_cursor(cursor.clone());

let pipeline = OraclePipeline::new(
rollup_config.clone(),
cursor.clone(),
oracle,
beacon,
l1_provider,
l2_provider.clone(),
);
let executor = KonaExecutor::new(
rollup_config.as_ref(),
l2_provider.clone(),
l2_provider,
None,
None,
);
let mut driver = Driver::new(cursor, executor, pipeline);

driver
.advance_to_target(rollup_config.as_ref(), Some(target_block))
.await?;

Ok::<_, anyhow::Error>(driver.safe_head_artifacts.unwrap_or_default())
}
});

let key = PreimageKey::new(*hash, PreimageKeyType::Keccak256);
kv_write_lock.set(key.into(), preimage.into())?;
}
// Wait on both the server and client tasks to complete.
let (_, client_result) = tokio::try_join!(server_task, client_task)?;
let (execution_artifacts, raw_transactions) = client_result?;

// Store optimistic block hash preimage.
let mut kv_lock = self.kv_store.write().await;
let mut rlp_buf = Vec::with_capacity(execution_artifacts.block_header.length());
execution_artifacts.block_header.encode(&mut rlp_buf);
kv_lock.set(
PreimageKey::new(
*execution_artifacts.block_header.hash(),
PreimageKeyType::Keccak256,
)
.into(),
rlp_buf,
)?;

// Store receipts root preimages.
let raw_receipts = execution_artifacts
.receipts
.into_iter()
.map(|receipt| Ok::<_, anyhow::Error>(receipt.encoded_2718()))
.collect::<Result<Vec<_>>>()?;
self.store_trie_nodes(raw_receipts.as_slice()).await?;

// Store tx root preimages.
self.store_trie_nodes(raw_transactions.as_slice()).await?;
}
}

Expand Down Expand Up @@ -706,7 +830,7 @@ where
#[async_trait]
impl<KV> PreimageFetcher for InteropFetcher<KV>
where
KV: KeyValueStore + Send + Sync + ?Sized,
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
{
/// Get the preimage for the given key.
async fn get_preimage(&self, key: PreimageKey) -> PreimageOracleResult<Vec<u8>> {
Expand Down
32 changes: 19 additions & 13 deletions crates/driver/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use crate::{DriverError, DriverPipeline, DriverResult, Executor, PipelineCursor, TipCursor};
use alloc::{sync::Arc, vec::Vec};
use alloy_consensus::BlockBody;
use alloy_primitives::B256;
use alloy_primitives::{Bytes, B256};
use alloy_rlp::Decodable;
use core::fmt::Debug;
use kona_derive::{
errors::{PipelineError, PipelineErrorKind},
traits::{Pipeline, SignalReceiver},
types::Signal,
};
use kona_executor::ExecutionArtifacts;
use maili_genesis::RollupConfig;
use maili_protocol::L2BlockInfo;
use maili_rpc::OpAttributesWithParent;
Expand All @@ -25,16 +26,16 @@ where
DP: DriverPipeline<P> + Send + Sync + Debug,
P: Pipeline + SignalReceiver + Send + Sync + Debug,
{
/// Marker for the executor.
_marker: core::marker::PhantomData<E>,
/// Marker for the pipeline.
_marker2: core::marker::PhantomData<P>,
/// A pipeline abstraction.
pub pipeline: DP,
_marker: core::marker::PhantomData<P>,
/// Cursor to keep track of the L2 tip
pub cursor: Arc<RwLock<PipelineCursor>>,
/// The Executor.
pub executor: E,
/// A pipeline abstraction.
pub pipeline: DP,
/// The safe head's execution artifacts + Transactions
pub safe_head_artifacts: Option<(ExecutionArtifacts, Vec<Bytes>)>,
}

impl<E, DP, P> Driver<E, DP, P>
Expand All @@ -47,10 +48,10 @@ where
pub const fn new(cursor: Arc<RwLock<PipelineCursor>>, executor: E, pipeline: DP) -> Self {
Self {
_marker: core::marker::PhantomData,
_marker2: core::marker::PhantomData,
pipeline,
cursor,
executor,
pipeline,
safe_head_artifacts: None,
}
}

Expand All @@ -66,8 +67,8 @@ where
/// - `target`: The target block number.
///
/// ## Returns
/// - `Ok((number, output_root))` - A tuple containing the number of the produced block and the
/// output root.
/// - `Ok((l2_safe_head, output_root))` - A tuple containing the [L2BlockInfo] of the produced
/// block and the output root.
/// - `Err(e)` - An error if the block could not be produced.
pub async fn advance_to_target(
&mut self,
Expand Down Expand Up @@ -162,8 +163,9 @@ where
body: BlockBody {
transactions: attributes
.transactions
.unwrap_or_default()
.into_iter()
.as_ref()
.unwrap_or(&Vec::new())
.iter()
.map(|tx| OpTxEnvelope::decode(&mut tx.as_ref()).map_err(DriverError::Rlp))
.collect::<DriverResult<Vec<OpTxEnvelope>, E::Error>>()?,
ommers: Vec::new(),
Expand All @@ -179,13 +181,17 @@ where
)?;
let tip_cursor = TipCursor::new(
l2_info,
execution_result.block_header,
execution_result.block_header.clone(),
self.executor.compute_output_root().map_err(DriverError::Executor)?,
);

// Advance the derivation pipeline cursor
drop(pipeline_cursor);
self.cursor.write().advance(origin, tip_cursor);

// Update the latest safe head artifacts.
self.safe_head_artifacts =
Some((execution_result, attributes.transactions.unwrap_or_default()));
}
}
}
2 changes: 1 addition & 1 deletion crates/executor/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use util::encode_holocene_eip_1559_params;

/// The [ExecutionArtifacts] holds the produced block header and receipts from the execution of a
/// block.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct ExecutionArtifacts {
/// The block header.
pub block_header: Sealed<Header>,
Expand Down
Loading

0 comments on commit ebdc804

Please sign in to comment.