Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(preimage): Async server components #183

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ kona-primitives = { path = "../primitives", version = "0.0.1" }
alloy-sol-types = { version = "0.7.1", default-features = false }
op-alloy-consensus = { git = "https://github.com/clabby/op-alloy", branch = "refcell/consensus-port", default-features = false }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
async-trait = "0.1.77"
async-trait = "0.1.80"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }
Expand Down
2 changes: 1 addition & 1 deletion crates/plasma/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ kona-derive = { path = "../derive" }
# External
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
alloy-primitives = { workspace = true, features = ["rlp"] }
async-trait = "0.1.77"
async-trait = "0.1.80"

# `serde` feature dependencies
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/preimage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ alloy-primitives.workspace = true
# local
kona-common = { path = "../common", version = "0.0.1" }

# external
async-trait = "0.1.80"

[dev-dependencies]
tokio = { version = "1.36.0", features = ["full"] }
tempfile = "3.10.0"
48 changes: 32 additions & 16 deletions crates/preimage/src/hint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{traits::HintWriterClient, HintReaderServer, PipeHandle};
use alloc::{string::String, vec};
use alloc::{boxed::Box, string::String, vec};
use anyhow::Result;
use core::future::Future;
use tracing::{debug, error};

/// A [HintWriter] is a high-level interface to the hint pipe. It provides a way to write hints to
Expand Down Expand Up @@ -58,8 +59,13 @@ impl HintReader {
}
}

#[async_trait::async_trait]
impl HintReaderServer for HintReader {
fn next_hint(&self, mut route_hint: impl FnMut(String) -> Result<()>) -> Result<()> {
async fn next_hint<F, Fut>(&mut self, mut route_hint: F) -> Result<()>
where
F: FnMut(String) -> Fut + Send,
Fut: Future<Output = Result<()>> + Send,
{
// Read the length of the raw hint payload.
let mut len_buf = [0u8; 4];
self.pipe_handle.read_exact(&mut len_buf)?;
Expand All @@ -74,7 +80,7 @@ impl HintReaderServer for HintReader {
debug!(target: "hint_reader", "Successfully read hint: \"{payload}\"");

// Route the hint
if let Err(e) = route_hint(payload) {
if let Err(e) = route_hint(payload).await {
// Write back on error to prevent blocking the client.
self.pipe_handle.write(&[0x00])?;

Expand All @@ -90,15 +96,18 @@ impl HintReaderServer for HintReader {
Ok(())
}
}

#[cfg(test)]
mod test {
extern crate std;

use super::*;
use alloc::vec::Vec;
use alloc::{sync::Arc, vec::Vec};
use core::pin::Pin;
use kona_common::FileDescriptor;
use std::{fs::File, os::fd::AsRawFd};
use tempfile::tempfile;
use tokio::sync::Mutex;

/// Test struct containing the [HintReader] and [HintWriter]. The [File]s are stored in this
/// struct so that they are not dropped until the end of the test.
Expand Down Expand Up @@ -132,20 +141,27 @@ mod test {
const MOCK_DATA: &str = "test-hint 0xfacade";

let sys = client_and_host();
let (hint_writer, hint_reader) = (sys.hint_writer, sys.hint_reader);
let (hint_writer, mut hint_reader) = (sys.hint_writer, sys.hint_reader);
let incoming_hints = Arc::new(Mutex::new(Vec::new()));

let client = tokio::task::spawn(async move { hint_writer.write(MOCK_DATA) });
let host = tokio::task::spawn(async move {
let mut v = Vec::new();
let route_hint = |hint: String| {
v.push(hint.clone());
Ok(())
};
hint_reader.next_hint(route_hint).unwrap();

assert_eq!(v.len(), 1);

v.remove(0)
let host = tokio::task::spawn({
let incoming_hints_ref = Arc::clone(&incoming_hints);
async move {
let route_hint =
move |hint: String| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let hints = Arc::clone(&incoming_hints_ref);
Box::pin(async move {
hints.lock().await.push(hint.clone());
Ok(())
})
};
hint_reader.next_hint(&route_hint).await.unwrap();

let mut hints = incoming_hints.lock().await;
assert_eq!(hints.len(), 1);
hints.remove(0)
}
});

let (_, h) = tokio::join!(client, host);
Expand Down
47 changes: 34 additions & 13 deletions crates/preimage/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer};
use alloc::vec::Vec;
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use anyhow::{bail, Result};
use core::future::Future;
use tracing::debug;

/// An [OracleReader] is a high-level interface to the preimage oracle.
Expand Down Expand Up @@ -85,11 +86,13 @@ impl OracleServer {
}
}

#[async_trait::async_trait]
impl PreimageOracleServer for OracleServer {
fn next_preimage_request<'a>(
&self,
mut get_preimage: impl FnMut(PreimageKey) -> Result<&'a Vec<u8>>,
) -> Result<()> {
async fn next_preimage_request<F, Fut>(&mut self, mut get_preimage: F) -> Result<()>
where
F: FnMut(PreimageKey) -> Fut + Send,
Fut: Future<Output = Result<Arc<Vec<u8>>>> + Send,
{
// Read the preimage request from the client, and throw early if there isn't is any.
let mut buf = [0u8; 32];
self.pipe_handle.read_exact(&mut buf)?;
Expand All @@ -98,7 +101,7 @@ impl PreimageOracleServer for OracleServer {
debug!(target: "oracle_server", "Fetching preimage for key {preimage_key}");

// Fetch the preimage value from the preimage getter.
let value = get_preimage(preimage_key)?;
let value = get_preimage(preimage_key).await?;

// Write the length as a big-endian u64 followed by the data.
let data = [(value.len() as u64).to_be_bytes().as_ref(), value.as_ref()]
Expand All @@ -121,9 +124,11 @@ mod test {
use super::*;
use crate::PreimageKeyType;
use alloy_primitives::keccak256;
use core::pin::Pin;
use kona_common::FileDescriptor;
use std::{collections::HashMap, fs::File, os::fd::AsRawFd};
use tempfile::tempfile;
use tokio::sync::Mutex;

/// Test struct containing the [OracleReader] and a [OracleServer] for the host, plus the open
/// [File]s. The [File]s are stored in this struct so that they are not dropped until the
Expand Down Expand Up @@ -167,12 +172,15 @@ mod test {
let key_b: PreimageKey =
PreimageKey::new(*keccak256(MOCK_DATA_B), PreimageKeyType::Keccak256);

let mut preimages = HashMap::new();
preimages.insert(key_a, MOCK_DATA_A.to_vec());
preimages.insert(key_b, MOCK_DATA_B.to_vec());
let preimages = {
let mut preimages = HashMap::new();
preimages.insert(key_a, Arc::new(MOCK_DATA_A.to_vec()));
preimages.insert(key_b, Arc::new(MOCK_DATA_B.to_vec()));
Arc::new(Mutex::new(preimages))
};

let sys = client_and_host();
let (oracle_reader, oracle_server) = (sys.oracle_reader, sys.oracle_server);
let (oracle_reader, mut oracle_server) = (sys.oracle_reader, sys.oracle_server);

let client = tokio::task::spawn(async move {
let contents_a = oracle_reader.get(key_a).unwrap();
Expand All @@ -185,11 +193,24 @@ mod test {
(contents_a, contents_b)
});
let host = tokio::task::spawn(async move {
let get_preimage =
|key| preimages.get(&key).ok_or(anyhow::anyhow!("Preimage not available"));
#[allow(clippy::type_complexity)]
let get_preimage = move |key: PreimageKey| -> Pin<
Box<dyn Future<Output = Result<Arc<Vec<u8>>>> + Send>,
> {
let preimages = Arc::clone(&preimages);
Box::pin(async move {
// Simulate fetching preimage data
preimages
.lock()
.await
.get(&key)
.ok_or(anyhow::anyhow!("Preimage not available"))
.cloned()
})
};

loop {
if oracle_server.next_preimage_request(get_preimage).is_err() {
if oracle_server.next_preimage_request(&get_preimage).await.is_err() {
break;
}
}
Expand Down
18 changes: 12 additions & 6 deletions crates/preimage/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::PreimageKey;
use alloc::{string::String, vec::Vec};
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use anyhow::Result;
use core::future::Future;

/// A [PreimageOracleClient] is a high-level interface to read data from the host, keyed by a
/// [PreimageKey].
Expand Down Expand Up @@ -36,26 +37,31 @@ pub trait HintWriterClient {

/// A [PreimageOracleServer] is a high-level interface to accept read requests from the client and
/// write the preimage data to the client pipe.
#[async_trait::async_trait]
pub trait PreimageOracleServer {
/// Get the next preimage request and return the response to the client.
///
/// # Returns
/// - `Ok(())` if the data was successfully written into the client pipe.
/// - `Err(_)` if the data could not be written to the client.
fn next_preimage_request<'a>(
&self,
get_preimage: impl FnMut(PreimageKey) -> Result<&'a Vec<u8>>,
) -> Result<()>;
async fn next_preimage_request<F, Fut>(&mut self, get_preimage: F) -> Result<()>
where
F: FnMut(PreimageKey) -> Fut + Send,
Fut: Future<Output = Result<Arc<Vec<u8>>>> + Send;
}

/// A [HintReaderServer] is a high-level interface to read preimage hints from the
/// [HintWriterClient] and prepare them for consumption by the client program.
#[async_trait::async_trait]
pub trait HintReaderServer {
/// Get the next hint request and return the acknowledgement to the client.
///
/// # Returns
/// - `Ok(())` if the hint was received and the client was notified of the host's
/// acknowledgement.
/// - `Err(_)` if the hint was not received correctly.
fn next_hint(&self, route_hint: impl FnMut(String) -> Result<()>) -> Result<()>;
async fn next_hint<F, Fut>(&mut self, route_hint: F) -> Result<()>
where
F: FnMut(String) -> Fut + Send,
Fut: Future<Output = Result<()>> + Send;
}
Loading