Skip to content

Commit

Permalink
Pre-open / and /ipfs directory
Browse files Browse the repository at this point in the history
  • Loading branch information
mikelsr committed Feb 3, 2025
1 parent eb4c16d commit 1d35cd4
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 66 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
# ww
# Wetware

Rust implementation of Wetware.

---

## Run Wetware

> TODO: document IPFS dependency
Run a WASM module directly from IPFS with, e.g. the one in `test/wasm/read_from_ipfs`:

```sh
cargo run -- --load /ipfs/QmaGNHr18jf5RiWMYme63XJh5k4TuPSMiGbvAZueQEC8rH
```

The example WASM module will read `QmaGNHr18jf5RiWMYme63XJh5k4TuPSMiGbvAZueQEC8rH` from IPFS and assert its contents.
32 changes: 18 additions & 14 deletions lib/fs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use bytes::BufMut;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::TryStreamExt;
use std::fmt;
use std::io::{self, Cursor, SeekFrom};
use std::io::{self, Cursor, Read, Seek, SeekFrom, Write};
use std::marker::{Send, Sync};
use std::path::{Path, PathBuf};
use std::pin::Pin;
Expand Down Expand Up @@ -110,14 +111,6 @@ impl virtual_fs::FileSystem for IpfsFs {
#[instrument(level = "trace", skip_all, fields(), ret)]
fn new_open_options(&self) -> virtual_fs::OpenOptions {
let mut open_options = virtual_fs::OpenOptions::new(self);
// let x = open_options.open("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH");
// match x {
// Ok(y) => {
// println!("Reading file");
// println!("{}", y.size())
// }
// Err(z) => println!("err: {}", z),
// }
open_options.read(true);
open_options
}
Expand Down Expand Up @@ -190,24 +183,31 @@ impl fmt::Debug for IpfsFile {
impl AsyncRead for IpfsFile {
#[instrument(level = "trace", skip_all, fields(?cx, ?buf), ret)]
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if usize::try_from(self.cursor.position()).unwrap() < self.size {
buf.put(self.cursor.get_mut().as_slice());
}
Poll::Ready(Ok(()))
}
}

// TODO
impl AsyncSeek for IpfsFile {
#[instrument(level = "trace", skip_all, fields(?position), ret)]
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
Ok(())
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
let res = self.cursor.seek(position);
match res {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(0))
Poll::Ready(Ok(self.cursor.position()))
}
}

Expand Down Expand Up @@ -311,7 +311,11 @@ impl virtual_fs::VirtualFile for IpfsFile {

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
if self.size == 0 && self.cursor.position() == 0 {
return Poll::Ready(Ok(0));
}
let left: usize = self.size - usize::try_from(self.cursor.position()).unwrap() - 1;
Poll::Ready(Ok(left))
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
Expand Down
2 changes: 1 addition & 1 deletion lib/net/src/dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn handle_discovered(d: &mut dyn Dialer, peers: Vec<(PeerId, Multiaddr)>) {

let result = d.dial(opts);
match result {
Ok(_) => tracing::info!("Dialed peer: {peer}"),
Ok(_) => tracing::debug!("Dialed peer: {peer}"),
Err(e) => tracing::debug!("Failed to dial peer: {e}"),
}
}
Expand Down
15 changes: 7 additions & 8 deletions lib/proc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ impl WasmRuntime {
&mut self,
bytecode: Vec<u8>,
fs: virtual_fs::TmpFileSystem,
// fs: Box<dyn virtual_fs::FileSystem + Send + Sync>,
) -> Result<WasmProcess, Box<dyn std::error::Error>> {
let module = wasmer::Module::new(&self.store, bytecode).expect("couldn't load WASM module");
let uuid = Uuid::new_v4();
let mut wasi_env = WasiEnv::builder(uuid)
.sandbox_fs(fs)
// .fs(fs)
// .preopen_build(|p| p.directory("/").read(true))?
// .preopen_build(|p| p.directory("./").read(true))?
// .args(&["arg1", "arg2"])
// .env("KEY", "VALUE")
.finalize(self.store_mut())?;
let pre_opens: Vec<String> = ["/", "/ipfs"].iter().map(|&s| s.to_string()).collect();
let mut wasi_env_builder = WasiEnv::builder(uuid);
wasi_env_builder = wasi_env_builder.sandbox_fs(fs);
// wasi_env_builder = wasi_env_builder.fs(fs);
wasi_env_builder.preopen_vfs_dirs(pre_opens).unwrap();
let mut wasi_env = wasi_env_builder.finalize(self.store_mut())?;
let import_object = wasi_env.import_object(self.store_mut(), &module)?;
let instance = wasmer::Instance::new(self.store_mut(), &module, &import_object)?;

Expand Down
46 changes: 11 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
let subscriber = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact() // use abbreviated log format
// .with_file(true)
// .with_thread_ids(true)
// .with_max_level(tracing::Level::TRACE)
.with_max_level(tracing::Level::INFO)
.finish();

// Set the subscriber as global default
tracing::subscriber::set_global_default(subscriber).unwrap();
tracing::subscriber::set_global_default(subscriber)?;

// Create a MDNS network behaviour.
let mdns_behaviour = mdns::tokio::Behaviour::new(mdns::Config::default(), config.peer_id())?;
Expand Down Expand Up @@ -88,22 +87,22 @@ async fn main() -> Result<(), Box<dyn Error>> {
net::dial::default_mdns_handler(&mut swarm, event);
}
swarm::SwarmEvent::Behaviour(DefaultBehaviourEvent::Ping(event)) => {
tracing::info!("got PING event: {event:?}");
tracing::debug!("got PING event: {event:?}");
}
swarm::SwarmEvent::Behaviour(DefaultBehaviourEvent::Kad(event)) => {
tracing::info!("got KAD event: {event:?}");
tracing::debug!("got KAD event: {event:?}");
}
swarm::SwarmEvent::Behaviour(DefaultBehaviourEvent::Identify(event)) => {
tracing::info!("got IDENTIFY event: {event:?}");
tracing::debug!("got IDENTIFY event: {event:?}");
}
event => {
tracing::info!("got event: {event:?}");
tracing::debug!("got event: {event:?}");
}
}
}
});

tracing::info!("Initialize IPFS client...");
tracing::debug!("Initialize IPFS client...");
// The IPFS library we are using, ferristseng/rust-ipfs-api, requires multiformats::Multiaddr.
let ipfs_client = net::ipfs::Client::new(config.ipfs_addr());

Expand All @@ -121,37 +120,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
tracing::info!("Initialize WASM module instance...");
let ipfs_fs = IpfsFs::new(ipfs_client);
let ipfs_path = ipfs_fs.path();
// TODO: now that we have everything we need, we can set up an RPC listener that can be invoked
// an arbitrary number of time and keep server/client functionality appart.
let shared_ipfs_fs = Arc::new(ipfs_fs) as Arc<dyn virtual_fs::FileSystem + Send + Sync>;
let root_fs = RootFileSystemBuilder::new().build();
root_fs.mount(ipfs_path.clone(), &shared_ipfs_fs, ipfs_path)?;
let mut wasm_process = wasm_runtime.build(bytecode, root_fs)?;
// let mut wasm_process = wasm_runtime.build(bytecode, Box::new(ipfs_fs))?;
wasm_process.run(wasm_runtime.store_mut())?;
tracing::info!("WASM module executed successfully.");
Ok(())
}

#[cfg(test)]
mod tests {
use virtual_fs::FileSystem;

use super::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn read_file() {
let ipfs_fs = IpfsFs::new(net::ipfs::Client::new(
"/ip4/127.0.0.1/tcp/5001".parse().unwrap(),
));
let ipfs_path = ipfs_fs.path();
let shared_ipfs_fs = Arc::new(ipfs_fs) as Arc<dyn virtual_fs::FileSystem + Send + Sync>;
let root_fs = RootFileSystemBuilder::new().build();
root_fs
.mount(ipfs_path.clone(), &shared_ipfs_fs, ipfs_path)
.unwrap();
let file_res = root_fs
.new_open_options()
.read(true)
.open("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH");
assert!(file_res.is_ok());
let file = file_res.unwrap();
assert_eq!(file.size(), 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello, world!
11 changes: 4 additions & 7 deletions tests/wasm/read_from_ipfs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::fs;

fn main() {
loop {
let file_content =
fs::read_to_string("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH");
match file_content {
Ok(s) => println!("{}", s),
Err(e) => println!("Error reading from file: {}", e),
}
let file_content = fs::read_to_string("/ipfs/QmeeLUVdiSTTKQqhWqsffYDtNvvvcTfJdotkNyi1KDEJtQ");
match file_content {
Ok(s) => assert_eq!(s, String::from("Hello, world!")),
Err(e) => panic!("Error reading from file: {}", e),
}
}

0 comments on commit 1d35cd4

Please sign in to comment.