Skip to content

Commit

Permalink
Merge pull request #111 from Actyx/libp2p-0.48
Browse files Browse the repository at this point in the history
libp2p 0.48
  • Loading branch information
rkuhn authored Oct 18, 2022
2 parents 87ac5ac + 1b385e9 commit 5871589
Show file tree
Hide file tree
Showing 16 changed files with 1,605 additions and 1,049 deletions.
785 changes: 430 additions & 355 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ description = "small embeddable ipfs implementation"
repository = "https://github.com/ipfs-rust/ipfs-embed"

[features]
default = ["async_global"]
async_global = ["async-global-executor", "libp2p/tcp-async-io", "libp2p/dns-async-std"]
tokio = ["tokio-crate", "libp2p/tcp-tokio", "libp2p/dns-tokio"]
default = ["async_global", "rsa"]
rsa = ["libp2p/rsa"]
async_global = ["async-global-executor", "libp2p/tcp-async-io", "libp2p/dns-async-std", "libp2p/mdns-async-io"]
tokio = ["tokio-crate", "libp2p/tcp-tokio", "libp2p/dns-tokio", "libp2p/mdns-tokio"]
telemetry = ["tide", "async_global"]
# Makes it possible to exchange data via Bitswap with a go-ipfs node
compat = ["libp2p-bitswap/compat"]
Expand All @@ -28,11 +29,11 @@ chrono = "0.4.19"
fnv = "1.0.7"
futures = "0.3.21"
futures-timer = "3.0.2"
ipfs-sqlite-block-store = "0.12.0"
ipfs-sqlite-block-store = "0.13.0"
lazy_static = "1.4.0"
libipld = { version = "0.12.0", default-features = false }
libp2p-bitswap = "0.22.0"
libp2p-broadcast = "0.9.1"
libipld = { version = "0.14.0", default-features = false }
libp2p-bitswap = "0.23.0"
libp2p-broadcast = "0.10.0"
names = "0.13.0"
parking_lot = "0.11.2"
pin-project = "1.0.10"
Expand All @@ -42,30 +43,29 @@ thiserror = "1.0.30"
tide = { version = "0.16.0", optional = true }
tokio-crate = { package = "tokio", version = "1.17.0", features = ["rt"], optional = true }
tracing = "0.1.32"
trust-dns-resolver = "0.20"
trust-dns-resolver = "0.21.2"
void = "1.0.2"

[dependencies.libp2p]
version = "0.43.0"
version = "0.48.0"
default-features = false
features = [
"gossipsub",
"identify",
"kad",
"mdns",
"ping",
#"relay",
"mplex",
"noise",
"pnet",
"yamux",
]

[dev-dependencies]
anyhow = { version = "1", features = ["backtrace"] }
async-std = { version = "1.11.0", features = ["attributes"] }
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
libp2p-bitswap = { version = "0.22.0", default-features = false, features = ["compat"] }
multihash = { version = "0.14.0", default-features = false, features = ["blake3"] }
libipld = { version = "0.14.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
libp2p-bitswap = { version = "0.23.0", default-features = false, features = ["compat"] }
multihash = { version = "0.16.1", default-features = false, features = ["blake3"] }
rand = "0.8.5"
regex = "1.5.5"
tempdir = "0.3.7"
Expand Down
5 changes: 3 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ anyhow = "1.0.56"
async-process = "1.3.0"
async-std = { version = "1.11.0", features = ["attributes"] }
chrono = "0.4.19"
futures = "0.3.24"
ipfs-embed = { path = ".." }
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor"] }
multihash = { version = "0.14.0", default-features = false, features = ["blake3"] }
libipld = { version = "0.14.0", default-features = false, features = ["dag-cbor"] }
multihash = { version = "0.16.1", default-features = false, features = ["blake3"] }
parking_lot = "0.11.2"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
Expand Down
61 changes: 32 additions & 29 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use async_std::stream::StreamExt;
use futures::TryFutureExt;
use ipfs_embed::{DefaultParams, Ipfs, NetworkConfig, StorageConfig};
use ipfs_embed_cli::{keypair, Command, Config, Event};
use parking_lot::Mutex;
Expand Down Expand Up @@ -45,11 +46,11 @@ async fn run() -> Result<()> {
};
network.identify.as_mut().unwrap().agent_version = node_name;

let ipfs = Ipfs::<DefaultParams>::new(ipfs_embed::Config { storage, network }).await?;
let mut events = ipfs.swarm_events();
let mut ipfs = Ipfs::<DefaultParams>::new(ipfs_embed::Config { storage, network }).await?;
let mut events = ipfs.swarm_events().await?;

for addr in config.listen_on {
let _ = ipfs.listen_on(addr)?;
let _ = ipfs.listen_on(addr);
}

for addr in config.external {
Expand Down Expand Up @@ -111,36 +112,38 @@ async fn run() -> Result<()> {
loop {
line.clear();
stdin.read_line(&mut line)?;
match line.parse()? {
Command::AddAddress(peer, addr) => {
ipfs.lock().add_address(&peer, addr);
#[allow(clippy::unit_arg)]
let result = match line.parse() {
Ok(Command::AddAddress(peer, addr)) => Ok(ipfs.lock().add_address(peer, addr)),
Ok(Command::Dial(peer)) => Ok(ipfs.lock().dial(peer)),
Ok(Command::PrunePeers) => Ok(ipfs.lock().prune_peers(Duration::ZERO)),
Ok(Command::Get(cid)) => ipfs
.lock()
.get(&cid)
.map(|block| writeln!(stdout, "{}", Event::Block(block)).expect("print")),
Ok(Command::Insert(block)) => ipfs.lock().insert(block),
Ok(Command::Alias(alias, cid)) => ipfs.lock().alias(&alias, cid.as_ref()),
Ok(Command::Flush) => {
let f = ipfs
.lock()
.flush()
.inspect_ok(|_| writeln!(stdout, "{}", Event::Flushed).expect("print"));
f.await
}
Command::Dial(peer) => {
ipfs.lock().dial(&peer);
}
Command::PrunePeers => {
ipfs.lock().prune_peers(Duration::ZERO);
}
Command::Get(cid) => {
let block = ipfs.lock().get(&cid)?;
writeln!(stdout, "{}", Event::Block(block))?;
}
Command::Insert(block) => {
ipfs.lock().insert(block)?;
}
Command::Alias(alias, cid) => {
ipfs.lock().alias(&alias, cid.as_ref())?;
}
Command::Flush => {
ipfs.lock().flush().await?;
writeln!(stdout, "{}", Event::Flushed)?;
}
Command::Sync(cid) => {
Ok(Command::Sync(cid)) => {
let providers = ipfs.lock().peers();
tracing::debug!("sync {} from {:?}", cid, providers);
ipfs.lock().sync(&cid, providers).await?;
writeln!(stdout, "{}", Event::Synced)?;
let f = ipfs
.lock()
.sync(&cid, providers)
.and_then(|f| f)
.inspect_ok(|_| writeln!(stdout, "{}", Event::Synced).expect("print"));
f.await
}
Err(err) => Err(err),
};
if let Err(err) = result {
eprintln!("main loop error (line = {}): {}", line, err);
}
}
}
6 changes: 3 additions & 3 deletions examples/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ fn tracing_try_init() {
async fn main() -> anyhow::Result<()> {
tracing_try_init();
let config = Config::default();
let ipfs = Ipfs::<Sp>::new(config).await?;
let mut ipfs = Ipfs::<Sp>::new(config).await?;
let peer: PeerId = "QmRSGx67Kq8w7xSBDia7hQfbfuvauMQGgxcwSWw976x4BS".parse()?;
let addr: Multiaddr = "/ip4/54.173.33.96/tcp/4001".parse()?;
ipfs.dial_address(&peer, addr);
ipfs.dial_address(peer, addr);

// 10 random bytes
let _cid_rand10: Cid = "QmXQsqVRpp2W7fbYZHi4aB2Xkqfd3DpwWskZoLVEYigMKC".parse()?;
Expand All @@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
let block = ipfs.fetch(&cid_simple_dag, vec![peer]).await?;
println!("got single block. len = {}", block.data().len());

let mut updates = ipfs.sync(&cid_simple_dag, vec![peer]);
let mut updates = ipfs.sync(&cid_simple_dag, vec![peer]).await?;
println!("starting sync of large file");
while let Some(update) = updates.next().await {
println!("{:?}", update);
Expand Down
5 changes: 3 additions & 2 deletions examples/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ async fn main() -> Result<()> {
.init();
let mut config = Config::new("/tmp/local1".as_ref(), Keypair::generate());
config.network.kad = None;
let a = Ipfs::<DefaultParams>::new(config).await?;
a.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?
let mut a = Ipfs::<DefaultParams>::new(config).await?;
a.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)
.next()
.await
.unwrap();
Expand Down Expand Up @@ -76,6 +76,7 @@ async fn main() -> Result<()> {

b.alias(ROOT, builder.prev.as_ref())?;
b.sync(builder.prev.as_ref().unwrap(), vec![a.local_peer_id()])
.await?
.await?;
b.flush().await?;

Expand Down
6 changes: 3 additions & 3 deletions harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ async-std = "1.11.0"
escargot = "0.5.7"
futures = "0.3.21"
ipfs-embed-cli = { path = "../cli" }
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
libp2p = { version = "0.43.0", default-features = false }
libipld = { version = "0.14.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
libp2p = { version = "0.48.0", default-features = false }
maplit = "1.0.2"
multihash = { version = "0.14.0", default-features = false, features = ["blake3"] }
multihash = { version = "0.16.1", default-features = false, features = ["blake3"] }
netsim-embed = "0.7.1"
rand = "0.8.5"
structopt = "0.3.26"
Expand Down
37 changes: 24 additions & 13 deletions harness/src/bin/discover_nat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ fn main() -> anyhow::Result<()> {
// we can’t attempt to dial while the connection exists
i.addresses.get(&i.connections[0]).map(|s| s.as_str()) ==
Some("Candidate")
// can’t check for full hashmap equality since the state where only the
// Candidate is present may be lost to output race conditions
// can’t check for full hashmap equality since the state where only the
// Candidate is present may be lost to output race conditions
|| i.addresses.is_empty()
// if consumer sent identify first, then the NAT address wasn’t known
// and only falsifiable listen addresses are left
))
.then(|| ())
})
Expand All @@ -137,22 +140,30 @@ fn main() -> anyhow::Result<()> {
.deadline(started, 30)
.await
.unwrap();
m.drain_matching(|e| matches!(e, Event::DialFailure(p, ..) if p == peer));
m.drain_matching(|e| matches!(e, Event::DialFailure(p, ..) | Event::Unreachable(p) if p == peer));
tracing::info!("provider {} saw close from {}", id, m_id);
m.send(Command::Dial(*peer));
m.select(|e| matches!(e, Event::DialFailure(p, ..) if p == peer).then(|| ()))
let alive = m
.select(|e| match e {
Event::DialFailure(p, ..) | Event::Unreachable(p) if p == peer => Some(true),
Event::PeerRemoved(p) if p == peer => Some(false),
_ => None,
})
.timeout(10)
.await
.unwrap()
.unwrap();
m.send(Command::PrunePeers);
m.select(|e| {
// prune_peers will remove the peer when a failure happens while not
// connected
matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ())
})
.timeout(10)
.await
.unwrap();
if alive {
m.send(Command::PrunePeers);
m.select(|e| {
// prune_peers will remove the peer when a failure happens while not
// connected
matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ())
})
.timeout(10)
.await
.unwrap();
}
tracing::info!("provider {} done with {}", id, m_id);
}
}
Expand Down
27 changes: 20 additions & 7 deletions harness/src/bin/discover_nat_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ fn main() -> anyhow::Result<()> {
Event::PeerInfo(p, i) if p == peer => Some(i.connections[0].clone()),
_ => None
}).timeout(1).await.unwrap().unwrap();
tracing::info!("first address is {}", a_1);
// the NAT may give us the correct port in a_1 already, so no second entry to
// check
let a_nat = a_1
.replace(1, |_| Some(Protocol::Tcp(30000)))
.filter(|a| *m_id == m_nat && *a != a_1);
tracing::info!("first address is {}, a_nat={:?}", a_1, a_nat);
m.select(|e| {
matches!(e, Event::PeerInfo(p, i) if p == peer && (
// port_reuse unfortunately means that the NATed port is added to
Expand All @@ -156,10 +156,13 @@ fn main() -> anyhow::Result<()> {
a_nat.iter().all(|a_nat| {
i.addresses.get(a_nat).map(|x| x.as_str()) == Some("Dial")
}))
// if consumer sent identify first, then the NAT address wasn’t known
// and only falsifiable listen addresses are left
|| i.addresses.is_empty()
)
.then(|| ())
})
.deadline(started, 5).await.unwrap();
.deadline(started, 10).await.unwrap();
tracing::info!("provider {} identified {}", id, m_id);
}
m.drain();
Expand Down Expand Up @@ -190,11 +193,21 @@ fn main() -> anyhow::Result<()> {
let m = sim.machine(*id);
for (m_id, (peer, _addr)) in consumers.iter() {
m.send(Command::Dial(*peer));
m.select(|e| matches!(e, Event::DialFailure(p, ..) if p == peer).then(|| ()))
.timeout(10).await.unwrap();
m.send(Command::PrunePeers);
m.select(|e| matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ()))
.timeout(10).await.unwrap();
let alive = m
.select(|e| match e {
Event::DialFailure(p, ..) | Event::Unreachable(p) if p == peer => Some(true),
Event::PeerRemoved(p) if p == peer => Some(false),
_ => None,
})
.timeout(10)
.await
.unwrap()
.unwrap();
if alive {
m.send(Command::PrunePeers);
m.select(|e| matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ()))
.timeout(10).await.unwrap();
}
tracing::info!("provider {} done with {}", id, m_id);
}
}
Expand Down
34 changes: 16 additions & 18 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,31 +275,29 @@ where
self.rw("missing_blocks", |x| x.missing_blocks(cid))
}

pub async fn evict(&self) -> Result<()> {
pub fn evict(&self) -> impl Future<Output = Result<()>> {
let store = self.inner.store.clone();
let gc_min_blocks = self.inner.gc_min_blocks;
let gc_target_duration = self.inner.gc_target_duration;
self.inner
.executor
.spawn_blocking(move || {
while !store
.lock()
.incremental_gc(gc_min_blocks, gc_target_duration)?
{
tracing::trace!("x");
}
Ok(())
})
.await?
}

pub async fn flush(&self) -> Result<()> {
let evict = self.inner.executor.spawn_blocking(move || {
while !store
.lock()
.incremental_gc(gc_min_blocks, gc_target_duration)?
{
tracing::trace!("x");
}
Ok(())
});
async { evict.await? }
}

pub fn flush(&self) -> impl Future<Output = Result<()>> {
let store = self.inner.store.clone();
let flush = self
.inner
.executor
.spawn_blocking(move || store.lock().flush());
Ok(observe_future("flush", flush).await??)
async { Ok(observe_future("flush", flush).await??) }
}

pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
Expand Down Expand Up @@ -347,7 +345,7 @@ where
} else {
timer.stop_and_discard();
}
Ok(res?)
res
}

struct SqliteStoreCollector<S: StoreParams> {
Expand Down
Loading

0 comments on commit 5871589

Please sign in to comment.