From 1d35cd4892d0ca7e507c0ed04757456267a94537 Mon Sep 17 00:00:00 2001 From: mikelsr Date: Mon, 3 Feb 2025 16:02:50 +0100 Subject: [PATCH] Pre-open / and /ipfs directory --- README.md | 18 +++++++- lib/fs/src/lib.rs | 32 +++++++------ lib/net/src/dial.rs | 2 +- lib/proc/src/lib.rs | 15 +++--- src/main.rs | 46 +++++-------------- ...LUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ | 1 + tests/wasm/read_from_ipfs/src/main.rs | 11 ++--- 7 files changed, 59 insertions(+), 66 deletions(-) create mode 100644 tests/wasm/read_from_ipfs/res/QmeeLUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ diff --git a/README.md b/README.md index 806131c..375917f 100644 --- a/README.md +++ b/README.md @@ -1 +1,17 @@ -# ww \ No newline at end of file +# Wetware + +Rust implementation of Wetware. + +--- + +## Run Wetware + +> TODO: document IPFS dependency + +Run a WASM module directly from IPFS with, e.g. the one in `test/wasm/read_from_ipfs`: + +```sh +cargo run -- --load /ipfs/QmaGNHr18jf5RiWMYme63XJh5k4TuPSMiGbvAZueQEC8rH +``` + +The example WASM module will read `QmaGNHr18jf5RiWMYme63XJh5k4TuPSMiGbvAZueQEC8rH` from IPFS and assert its contents. diff --git a/lib/fs/src/lib.rs b/lib/fs/src/lib.rs index e782cf9..8c24ad9 100644 --- a/lib/fs/src/lib.rs +++ b/lib/fs/src/lib.rs @@ -1,8 +1,9 @@ +use bytes::BufMut; use futures::executor::block_on; use futures::future::BoxFuture; use futures::TryStreamExt; use std::fmt; -use std::io::{self, Cursor, SeekFrom}; +use std::io::{self, Cursor, Read, Seek, SeekFrom, Write}; use std::marker::{Send, Sync}; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -110,14 +111,6 @@ impl virtual_fs::FileSystem for IpfsFs { #[instrument(level = "trace", skip_all, fields(), ret)] fn new_open_options(&self) -> virtual_fs::OpenOptions { let mut open_options = virtual_fs::OpenOptions::new(self); - // let x = open_options.open("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH"); - // match x { - // Ok(y) => { - // println!("Reading file"); - // println!("{}", y.size()) - // } - // Err(z) => println!("err: {}", z), - // } open_options.read(true); open_options } @@ -190,10 +183,13 @@ impl fmt::Debug for IpfsFile { impl AsyncRead for IpfsFile { #[instrument(level = "trace", skip_all, fields(?cx, ?buf), ret)] fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + if usize::try_from(self.cursor.position()).unwrap() < self.size { + buf.put(self.cursor.get_mut().as_slice()); + } Poll::Ready(Ok(())) } } @@ -201,13 +197,17 @@ impl AsyncRead for IpfsFile { // TODO impl AsyncSeek for IpfsFile { #[instrument(level = "trace", skip_all, fields(?position), ret)] - fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { - Ok(()) + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { + let res = self.cursor.seek(position); + match res { + Ok(_) => Ok(()), + Err(e) => Err(e), + } } #[instrument(level = "trace", skip_all, fields(?cx), ret)] fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(0)) + Poll::Ready(Ok(self.cursor.position())) } } @@ -311,7 +311,11 @@ impl virtual_fs::VirtualFile for IpfsFile { #[instrument(level = "trace", skip_all, fields(?cx), ret)] fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(0)) + if self.size == 0 && self.cursor.position() == 0 { + return Poll::Ready(Ok(0)); + } + let left: usize = self.size - usize::try_from(self.cursor.position()).unwrap() - 1; + Poll::Ready(Ok(left)) } #[instrument(level = "trace", skip_all, fields(?cx), ret)] diff --git a/lib/net/src/dial.rs b/lib/net/src/dial.rs index dcb27f1..f173a94 100644 --- a/lib/net/src/dial.rs +++ b/lib/net/src/dial.rs @@ -38,7 +38,7 @@ fn handle_discovered(d: &mut dyn Dialer, peers: Vec<(PeerId, Multiaddr)>) { let result = d.dial(opts); match result { - Ok(_) => tracing::info!("Dialed peer: {peer}"), + Ok(_) => tracing::debug!("Dialed peer: {peer}"), Err(e) => tracing::debug!("Failed to dial peer: {e}"), } } diff --git a/lib/proc/src/lib.rs b/lib/proc/src/lib.rs index 4b032f8..45f314d 100644 --- a/lib/proc/src/lib.rs +++ b/lib/proc/src/lib.rs @@ -44,17 +44,16 @@ impl WasmRuntime { &mut self, bytecode: Vec, fs: virtual_fs::TmpFileSystem, + // fs: Box, ) -> Result> { let module = wasmer::Module::new(&self.store, bytecode).expect("couldn't load WASM module"); let uuid = Uuid::new_v4(); - let mut wasi_env = WasiEnv::builder(uuid) - .sandbox_fs(fs) - // .fs(fs) - // .preopen_build(|p| p.directory("/").read(true))? - // .preopen_build(|p| p.directory("./").read(true))? - // .args(&["arg1", "arg2"]) - // .env("KEY", "VALUE") - .finalize(self.store_mut())?; + let pre_opens: Vec = ["/", "/ipfs"].iter().map(|&s| s.to_string()).collect(); + let mut wasi_env_builder = WasiEnv::builder(uuid); + wasi_env_builder = wasi_env_builder.sandbox_fs(fs); + // wasi_env_builder = wasi_env_builder.fs(fs); + wasi_env_builder.preopen_vfs_dirs(pre_opens).unwrap(); + let mut wasi_env = wasi_env_builder.finalize(self.store_mut())?; let import_object = wasi_env.import_object(self.store_mut(), &module)?; let instance = wasmer::Instance::new(self.store_mut(), &module, &import_object)?; diff --git a/src/main.rs b/src/main.rs index 4c53388..e651dd9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,13 +21,12 @@ async fn main() -> Result<(), Box> { let subscriber = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .compact() // use abbreviated log format - // .with_file(true) - // .with_thread_ids(true) + // .with_max_level(tracing::Level::TRACE) .with_max_level(tracing::Level::INFO) .finish(); // Set the subscriber as global default - tracing::subscriber::set_global_default(subscriber).unwrap(); + tracing::subscriber::set_global_default(subscriber)?; // Create a MDNS network behaviour. let mdns_behaviour = mdns::tokio::Behaviour::new(mdns::Config::default(), config.peer_id())?; @@ -88,22 +87,22 @@ async fn main() -> Result<(), Box> { net::dial::default_mdns_handler(&mut swarm, event); } swarm::SwarmEvent::Behaviour(DefaultBehaviourEvent::Ping(event)) => { - tracing::info!("got PING event: {event:?}"); + tracing::debug!("got PING event: {event:?}"); } swarm::SwarmEvent::Behaviour(DefaultBehaviourEvent::Kad(event)) => { - tracing::info!("got KAD event: {event:?}"); + tracing::debug!("got KAD event: {event:?}"); } swarm::SwarmEvent::Behaviour(DefaultBehaviourEvent::Identify(event)) => { - tracing::info!("got IDENTIFY event: {event:?}"); + tracing::debug!("got IDENTIFY event: {event:?}"); } event => { - tracing::info!("got event: {event:?}"); + tracing::debug!("got event: {event:?}"); } } } }); - tracing::info!("Initialize IPFS client..."); + tracing::debug!("Initialize IPFS client..."); // The IPFS library we are using, ferristseng/rust-ipfs-api, requires multiformats::Multiaddr. let ipfs_client = net::ipfs::Client::new(config.ipfs_addr()); @@ -121,37 +120,14 @@ async fn main() -> Result<(), Box> { tracing::info!("Initialize WASM module instance..."); let ipfs_fs = IpfsFs::new(ipfs_client); let ipfs_path = ipfs_fs.path(); + // TODO: now that we have everything we need, we can set up an RPC listener that can be invoked + // an arbitrary number of time and keep server/client functionality appart. let shared_ipfs_fs = Arc::new(ipfs_fs) as Arc; let root_fs = RootFileSystemBuilder::new().build(); root_fs.mount(ipfs_path.clone(), &shared_ipfs_fs, ipfs_path)?; let mut wasm_process = wasm_runtime.build(bytecode, root_fs)?; + // let mut wasm_process = wasm_runtime.build(bytecode, Box::new(ipfs_fs))?; wasm_process.run(wasm_runtime.store_mut())?; + tracing::info!("WASM module executed successfully."); Ok(()) } - -#[cfg(test)] -mod tests { - use virtual_fs::FileSystem; - - use super::*; - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn read_file() { - let ipfs_fs = IpfsFs::new(net::ipfs::Client::new( - "/ip4/127.0.0.1/tcp/5001".parse().unwrap(), - )); - let ipfs_path = ipfs_fs.path(); - let shared_ipfs_fs = Arc::new(ipfs_fs) as Arc; - let root_fs = RootFileSystemBuilder::new().build(); - root_fs - .mount(ipfs_path.clone(), &shared_ipfs_fs, ipfs_path) - .unwrap(); - let file_res = root_fs - .new_open_options() - .read(true) - .open("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH"); - assert!(file_res.is_ok()); - let file = file_res.unwrap(); - assert_eq!(file.size(), 0); - } -} diff --git a/tests/wasm/read_from_ipfs/res/QmeeLUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ b/tests/wasm/read_from_ipfs/res/QmeeLUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ new file mode 100644 index 0000000..af5626b --- /dev/null +++ b/tests/wasm/read_from_ipfs/res/QmeeLUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ @@ -0,0 +1 @@ +Hello, world! diff --git a/tests/wasm/read_from_ipfs/src/main.rs b/tests/wasm/read_from_ipfs/src/main.rs index 4bb50f9..9f8dabf 100644 --- a/tests/wasm/read_from_ipfs/src/main.rs +++ b/tests/wasm/read_from_ipfs/src/main.rs @@ -1,12 +1,9 @@ use std::fs; fn main() { - loop { - let file_content = - fs::read_to_string("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH"); - match file_content { - Ok(s) => println!("{}", s), - Err(e) => println!("Error reading from file: {}", e), - } + let file_content = fs::read_to_string("/ipfs/QmeeLUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ"); + match file_content { + Ok(s) => assert_eq!(s, String::from("Hello, world!")), + Err(e) => panic!("Error reading from file: {}", e), } }