From fd505004ac2a143d0cd5bb30df7092a6bbb09c58 Mon Sep 17 00:00:00 2001 From: clabby Date: Sat, 15 Jun 2024 19:29:43 -0400 Subject: [PATCH] feat(client/host): Oracle-backed Blob fetcher Implements the oracle-backed blob fetcher for the client program --- Cargo.lock | 1 + bin/host/Cargo.toml | 1 + bin/host/src/fetcher/mod.rs | 76 ++++++++++++++++- bin/host/src/main.rs | 35 +++++++- bin/programs/client/src/hint.rs | 3 +- bin/programs/client/src/l1/blob_provider.rs | 90 +++++++++++++++++++++ bin/programs/client/src/l1/mod.rs | 3 + crates/common/src/io.rs | 2 +- crates/derive/src/online/beacon_client.rs | 2 +- crates/derive/src/online/blob_provider.rs | 71 +++++++++------- 10 files changed, 247 insertions(+), 37 deletions(-) create mode 100644 bin/programs/client/src/l1/blob_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 908c46013..46de1e5f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1771,6 +1771,7 @@ dependencies = [ "futures", "kona-client", "kona-common", + "kona-derive", "kona-mpt", "kona-preimage", "reqwest", diff --git a/bin/host/Cargo.toml b/bin/host/Cargo.toml index 183ae954a..87ce616f5 100644 --- a/bin/host/Cargo.toml +++ b/bin/host/Cargo.toml @@ -22,6 +22,7 @@ kona-client = { path = "../programs/client", version = "0.1.0" } kona-common = { path = "../../crates/common", version = "0.0.1" } kona-preimage = { path = "../../crates/preimage", version = "0.0.1" } kona-mpt = { path = "../../crates/mpt", version = "0.0.1" } +kona-derive = { path = "../../crates/derive", version = "0.0.1", features = ["online"] } # external alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "cb95183" } diff --git a/bin/host/src/fetcher/mod.rs b/bin/host/src/fetcher/mod.rs index 06bbc3e59..2587f984c 100644 --- a/bin/host/src/fetcher/mod.rs +++ b/bin/host/src/fetcher/mod.rs @@ -3,13 +3,17 @@ use crate::{kv::KeyValueStore, util}; use alloy_consensus::{Header, TxEnvelope}; -use alloy_eips::eip2718::Encodable2718; +use alloy_eips::{eip2718::Encodable2718, eip4844::FIELD_ELEMENTS_PER_BLOB}; use alloy_primitives::{address, keccak256, Address, Bytes, B256}; use alloy_provider::{Provider, ReqwestProvider}; use alloy_rlp::Decodable; use alloy_rpc_types::{Block, BlockTransactions}; use anyhow::{anyhow, Result}; use kona_client::HintType; +use kona_derive::{ + online::{OnlineBeaconClient, OnlineBlobProvider, SimpleSlotDerivation}, + types::{BlockInfo, IndexedBlobHash}, +}; use kona_preimage::{PreimageKey, PreimageKeyType}; use std::sync::Arc; use tokio::sync::RwLock; @@ -26,6 +30,8 @@ where kv_store: Arc>, /// L1 chain provider. l1_provider: ReqwestProvider, + /// The blob provider + blob_provider: OnlineBlobProvider, /// L2 chain provider. /// TODO: OP provider, N = Optimism l2_provider: ReqwestProvider, @@ -43,10 +49,11 @@ where pub fn new( kv_store: Arc>, l1_provider: ReqwestProvider, + blob_provider: OnlineBlobProvider, l2_provider: ReqwestProvider, l2_head: B256, ) -> Self { - Self { kv_store, l1_provider, l2_provider, l2_head, last_hint: None } + Self { kv_store, l1_provider, blob_provider, l2_provider, l2_head, last_hint: None } } /// Set the last hint to be received. @@ -149,7 +156,70 @@ where .map_err(|e| anyhow!(e))?; self.store_trie_nodes(raw_receipts.as_slice()).await?; } - HintType::L1Blob => todo!(), + HintType::L1Blob => { + if hint_data.len() != 48 { + anyhow::bail!("Invalid hint data length: {}", hint_data.len()); + } + + let hash: B256 = hint_data[0..32] + .as_ref() + .try_into() + .map_err(|e| anyhow!("Failed to convert bytes to B256: {e}"))?; + let index = u64::from_be_bytes( + hint_data[32..40] + .as_ref() + .try_into() + .map_err(|e| anyhow!("Failed to convert bytes to u64: {e}"))?, + ); + let timestamp = u64::from_be_bytes( + hint_data[40..48] + .as_ref() + .try_into() + .map_err(|e| anyhow!("Failed to convert bytes to u64: {e}"))?, + ); + + let partial_block_ref = BlockInfo { timestamp, ..Default::default() }; + let indexed_hash = IndexedBlobHash { index: index as usize, hash }; + + // Fetch the blob sidecar from the blob provider. + let mut sidecars = self + .blob_provider + .fetch_filtered_sidecars(&partial_block_ref, &[indexed_hash]) + .await + .map_err(|e| anyhow!("Failed to fetch blob sidecars: {e}"))?; + if sidecars.len() != 1 { + anyhow::bail!("Expected 1 sidecar, got {}", sidecars.len()); + } + let sidecar = sidecars.remove(0); + + // Acquire a lock on the key-value store and set the preimages. + let mut kv_write_lock = self.kv_store.write().await; + + // Set the preimage for the blob commitment. + kv_write_lock.set( + PreimageKey::new(*hash, PreimageKeyType::Sha256).into(), + sidecar.kzg_commitment.to_vec(), + ); + + // Write all the field elements to the key-value store. There should be 4096. + // The preimage oracle key for each field element is the keccak256 hash of + // `abi.encodePacked(sidecar.KZGCommitment, uint256(i))` + let mut blob_key = [0u8; 80]; + blob_key[..48].copy_from_slice(sidecar.kzg_commitment.as_ref()); + for i in 0..FIELD_ELEMENTS_PER_BLOB { + blob_key[72..].copy_from_slice(i.to_be_bytes().as_ref()); + let blob_key_hash = keccak256(blob_key.as_ref()); + + kv_write_lock.set( + PreimageKey::new(*blob_key_hash, PreimageKeyType::Keccak256).into(), + blob_key.into(), + ); + kv_write_lock.set( + PreimageKey::new(*blob_key_hash, PreimageKeyType::Blob).into(), + sidecar.blob[(i as usize) << 5..(i as usize + 1) << 5].to_vec(), + ); + } + } HintType::L1Precompile => { // Validate the hint data length. if hint_data.len() < 20 { diff --git a/bin/host/src/main.rs b/bin/host/src/main.rs index 235d0f778..b6ec726ea 100644 --- a/bin/host/src/main.rs +++ b/bin/host/src/main.rs @@ -13,6 +13,7 @@ use command_fds::{CommandFdExt, FdMapping}; use fetcher::Fetcher; use futures::FutureExt; use kona_common::FileDescriptor; +use kona_derive::online::{OnlineBeaconClient, OnlineBlobProvider}; use kona_preimage::{HintReader, OracleServer, PipeHandle}; use kv::KeyValueStore; use std::{ @@ -59,10 +60,25 @@ async fn start_server(cfg: HostCli) -> Result<()> { let kv_store = cfg.construct_kv_store(); + let beacon_client = OnlineBeaconClient::new_http( + cfg.l1_beacon_address.clone().expect("Beacon API URL must be set"), + ); + let mut blob_provider = OnlineBlobProvider::new(beacon_client, None, None); + blob_provider + .load_configs() + .await + .map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?; + let fetcher = (!cfg.is_offline()).then(|| { let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set")); let l2_provider = util::http_provider(&cfg.l2_node_address.expect("Provider must be set")); - Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider, cfg.l2_head))) + Arc::new(RwLock::new(Fetcher::new( + kv_store.clone(), + l1_provider, + blob_provider, + l2_provider, + cfg.l2_head, + ))) }); // Start the server and wait for it to complete. @@ -80,12 +96,27 @@ async fn start_server_and_native_client(cfg: HostCli) -> Result<()> { let (preimage_pipe, hint_pipe, files) = util::create_native_pipes()?; let kv_store = cfg.construct_kv_store(); + let beacon_client = OnlineBeaconClient::new_http( + cfg.l1_beacon_address.clone().expect("Beacon API URL must be set"), + ); + let mut blob_provider = OnlineBlobProvider::new(beacon_client, None, None); + blob_provider + .load_configs() + .await + .map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?; + let fetcher = (!cfg.is_offline()).then(|| { let l1_provider = util::http_provider(cfg.l1_node_address.as_ref().expect("Provider must be set")); let l2_provider = util::http_provider(cfg.l2_node_address.as_ref().expect("Provider must be set")); - Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider, cfg.l2_head))) + Arc::new(RwLock::new(Fetcher::new( + kv_store.clone(), + l1_provider, + blob_provider, + l2_provider, + cfg.l2_head, + ))) }); // Create the server and start it. diff --git a/bin/programs/client/src/hint.rs b/bin/programs/client/src/hint.rs index ceea76631..de7efab93 100644 --- a/bin/programs/client/src/hint.rs +++ b/bin/programs/client/src/hint.rs @@ -1,9 +1,8 @@ //! This module contains the [HintType] enum. -use core::fmt::Display; - use alloc::{string::String, vec::Vec}; use alloy_primitives::hex; +use core::fmt::Display; /// The [HintType] enum is used to specify the type of hint that was received. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] diff --git a/bin/programs/client/src/l1/blob_provider.rs b/bin/programs/client/src/l1/blob_provider.rs new file mode 100644 index 000000000..cdf3a3f4c --- /dev/null +++ b/bin/programs/client/src/l1/blob_provider.rs @@ -0,0 +1,90 @@ +//! Contains the concrete implementation of the [BlobProvider] trait for the client program. + +use crate::{CachingOracle, HintType, HINT_WRITER}; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use alloy_consensus::Blob; +use alloy_eips::eip4844::FIELD_ELEMENTS_PER_BLOB; +use alloy_primitives::keccak256; +use async_trait::async_trait; +use kona_derive::{ + traits::BlobProvider, + types::{BlobProviderError, IndexedBlobHash}, +}; +use kona_preimage::{HintWriterClient, PreimageKey, PreimageKeyType, PreimageOracleClient}; +use kona_primitives::BlockInfo; + +/// An oracle-backed blob provider. +#[derive(Debug)] +pub struct OracleBlobProvider { + oracle: Arc, +} + +impl OracleBlobProvider { + /// Constructs a new `OracleBlobProvider`. + pub fn new(oracle: Arc) -> Self { + Self { oracle } + } + + /// Retrieves a blob from the oracle. + /// + /// ## Takes + /// - `block_ref`: The block reference. + /// - `blob_hash`: The blob hash. + /// + /// ## Returns + /// - `Ok(blob)`: The blob. + /// - `Err(e)`: The blob could not be retrieved. + async fn get_blob( + &self, + block_ref: &BlockInfo, + blob_hash: &IndexedBlobHash, + ) -> Result { + let mut blob_req_meta = [0u8; 48]; + blob_req_meta[0..32].copy_from_slice(blob_hash.hash.as_ref()); + blob_req_meta[32..40].copy_from_slice((blob_hash.index as u64).to_be_bytes().as_ref()); + blob_req_meta[40..48].copy_from_slice(block_ref.timestamp.to_be_bytes().as_ref()); + + // Send a hint for the blob commitment and field elements. + HINT_WRITER.write(&HintType::L1Blob.encode_with(&[blob_req_meta.as_ref()])).await?; + + // Fetch the blob commitment. + let mut commitment = [0u8; 48]; + self.oracle + .get_exact(PreimageKey::new(*blob_hash.hash, PreimageKeyType::Sha256), &mut commitment) + .await?; + + // Reconstruct the blob from the 4096 field elements. + let mut blob = Blob::default(); + let mut field_element_key = [0u8; 80]; + field_element_key[..48].copy_from_slice(commitment.as_ref()); + for i in 0..FIELD_ELEMENTS_PER_BLOB { + field_element_key[72..].copy_from_slice(i.to_be_bytes().as_ref()); + + let mut field_element = [0u8; 32]; + self.oracle + .get_exact( + PreimageKey::new(*keccak256(field_element_key), PreimageKeyType::Blob), + &mut field_element, + ) + .await?; + blob[(i as usize) << 5..(i as usize + 1) << 5].copy_from_slice(field_element.as_ref()); + } + + Ok(blob) + } +} + +#[async_trait] +impl BlobProvider for OracleBlobProvider { + async fn get_blobs( + &mut self, + block_ref: &BlockInfo, + blob_hashes: &[IndexedBlobHash], + ) -> Result, BlobProviderError> { + let mut blobs = Vec::with_capacity(blob_hashes.len()); + for hash in blob_hashes { + blobs.push(self.get_blob(block_ref, hash).await?); + } + Ok(blobs) + } +} diff --git a/bin/programs/client/src/l1/mod.rs b/bin/programs/client/src/l1/mod.rs index 262b49a66..355850feb 100644 --- a/bin/programs/client/src/l1/mod.rs +++ b/bin/programs/client/src/l1/mod.rs @@ -1,4 +1,7 @@ //! Contains the L1 constructs of the client program. +mod blob_provider; +pub use blob_provider::OracleBlobProvider; + mod chain_provider; pub use chain_provider::OracleL1ChainProvider; diff --git a/crates/common/src/io.rs b/crates/common/src/io.rs index 8393c1691..889c419f4 100644 --- a/crates/common/src/io.rs +++ b/crates/common/src/io.rs @@ -83,7 +83,7 @@ mod native_io { .map_err(|e| anyhow!("Error writing to buffer to file descriptor: {e}"))?; // Reset the cursor back to before the data we just wrote for the reader's consumption. - file.seek(SeekFrom::Current(-(buf.len() as i64))) + file.seek(SeekFrom::Current(-(n as i64))) .map_err(|e| anyhow!("Failed to reset file cursor to 0: {e}"))?; Ok(n) diff --git a/crates/derive/src/online/beacon_client.rs b/crates/derive/src/online/beacon_client.rs index 62ddf7e92..d27354e9b 100644 --- a/crates/derive/src/online/beacon_client.rs +++ b/crates/derive/src/online/beacon_client.rs @@ -41,7 +41,7 @@ pub trait BeaconClient { pub struct OnlineBeaconClient { /// The base URL of the beacon API. base: String, - /// The inner Ethereum JSON-RPC provider. + /// The inner reqwest client. inner: Client, } diff --git a/crates/derive/src/online/blob_provider.rs b/crates/derive/src/online/blob_provider.rs index d56e8bef6..9bdcded90 100644 --- a/crates/derive/src/online/blob_provider.rs +++ b/crates/derive/src/online/blob_provider.rs @@ -64,6 +64,42 @@ impl OnlineBlobProvider { .await .map_err(BlobProviderError::Custom) } + + /// Fetches blob sidecars for the given block reference and blob hashes. + pub async fn fetch_filtered_sidecars( + &self, + block_ref: &BlockInfo, + blob_hashes: &[IndexedBlobHash], + ) -> Result, BlobProviderError> { + if blob_hashes.is_empty() { + return Ok(Vec::new()); + } + + // Extract the genesis timestamp and slot interval from the loaded configs. + let genesis = self.genesis_time.expect("Genesis Config Loaded"); + let interval = self.slot_interval.expect("Config Spec Loaded"); + + // Calculate the slot for the given timestamp. + let slot = + S::slot(genesis, interval, block_ref.timestamp).map_err(BlobProviderError::Slot)?; + + // Fetch blob sidecars for the slot using the given blob hashes. + let sidecars = self.fetch_sidecars(slot, blob_hashes).await?; + + // Filter blob sidecars that match the indicies in the specified list. + let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); + let filtered = sidecars + .into_iter() + .filter(|s| blob_hash_indicies.contains(&(s.inner.index as usize))) + .collect::>(); + + // Validate the correct number of blob sidecars were retrieved. + if blob_hashes.len() != filtered.len() { + return Err(BlobProviderError::SidecarLengthMismatch(blob_hashes.len(), filtered.len())); + } + + Ok(filtered.into_iter().map(|s| s.inner).collect::>()) + } } /// Minimal slot derivation implementation. @@ -97,39 +133,14 @@ where block_ref: &BlockInfo, blob_hashes: &[IndexedBlobHash], ) -> Result, BlobProviderError> { - if blob_hashes.is_empty() { - return Ok(Vec::new()); - } - // Fetches the genesis timestamp and slot interval from the // [BeaconGenesis] and [ConfigSpec] if not previously loaded. self.load_configs().await?; - // Extract the genesis timestamp and slot interval from the loaded configs. - let genesis = self.genesis_time.expect("Genesis Config Loaded"); - let interval = self.slot_interval.expect("Config Spec Loaded"); - - // Calculate the slot for the given timestamp. - let slot = - S::slot(genesis, interval, block_ref.timestamp).map_err(BlobProviderError::Slot)?; - - // Fetch blob sidecars for the slot using the given blob hashes. - let sidecars = self.fetch_sidecars(slot, blob_hashes).await?; - - // Filter blob sidecars that match the indicies in the specified list. - let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); - let filtered = sidecars - .into_iter() - .filter(|s| blob_hash_indicies.contains(&(s.inner.index as usize))) - .collect::>(); - - // Validate the correct number of blob sidecars were retrieved. - if blob_hashes.len() != filtered.len() { - return Err(BlobProviderError::SidecarLengthMismatch(blob_hashes.len(), filtered.len())); - } + // Fetch the blob sidecars for the given block reference and blob hashes. + let sidecars = self.fetch_filtered_sidecars(block_ref, blob_hashes).await?; // Validate the blob sidecars straight away with the `IndexedBlobHash`es. - let sidecars = filtered.into_iter().map(|s| s.inner).collect::>(); let blobs = sidecars .into_iter() .enumerate() @@ -214,7 +225,11 @@ mod tests { #[tokio::test] async fn test_get_blobs_empty_hashes() { - let beacon_client = MockBeaconClient::default(); + let beacon_client = MockBeaconClient { + beacon_genesis: Some(APIGenesisResponse::new(10)), + config_spec: Some(APIConfigResponse::new(12)), + ..Default::default() + }; let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> = OnlineBlobProvider::new(beacon_client, None, None); let block_ref = BlockInfo::default();