From 49cc07e5de5997871a4bcf7399c31e69a00325ff Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 20 Sep 2018 08:53:01 +0200 Subject: [PATCH] Networking backport for 2018-09-20 --- Cargo.lock | 215 +++--- substrate/network-libp2p/Cargo.toml | 2 +- substrate/network-libp2p/src/node_handler.rs | 349 +++++---- substrate/network-libp2p/src/service.rs | 27 +- substrate/network-libp2p/src/service_task.rs | 51 +- substrate/network-libp2p/src/swarm.rs | 758 ++++++------------- 6 files changed, 591 insertions(+), 811 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0768239d57e9b..6e796b7cefb67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,7 +444,7 @@ dependencies = [ [[package]] name = "datastore" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "base64 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "chashmap 2.2.1 (git+https://github.com/redox-os/tfs)", @@ -1120,27 +1120,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "libp2p" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-dns 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-floodsub 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-identify 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-kad 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-mplex 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-ping 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-ratelimit 0.1.1 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-relay 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-secio 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-tcp-transport 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-transport-timeout 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-uds 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-websocket 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-yamux 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-dns 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-floodsub 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-identify 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-kad 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-mplex 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-ping 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-ratelimit 0.1.1 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-relay 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-secio 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-tcp-transport 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-transport-timeout 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-uds 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-websocket 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-yamux 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "stdweb 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-current-thread 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1150,20 +1150,20 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bs58 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "multihash 0.8.1-pre (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "multistream-select 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "multihash 0.8.1-pre (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "multistream-select 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "smallvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1173,12 +1173,12 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "tokio-dns-unofficial 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1186,16 +1186,16 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bs58 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1207,15 +1207,15 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1226,20 +1226,20 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "bigint 4.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "datastore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "datastore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-identify 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-ping 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-identify 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-ping 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1253,12 +1253,12 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1269,13 +1269,13 @@ dependencies = [ [[package]] name = "libp2p-peerstore" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bs58 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "datastore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "datastore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1284,14 +1284,14 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "multistream-select 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "multistream-select 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1301,11 +1301,11 @@ dependencies = [ [[package]] name = "libp2p-ratelimit" version = "0.1.1" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "aio-limited 0.1.0 (git+https://github.com/paritytech/aio-limited.git)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1314,14 +1314,14 @@ dependencies = [ [[package]] name = "libp2p-relay" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "protobuf 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1332,7 +1332,7 @@ dependencies = [ [[package]] name = "libp2p-secio" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "aes-ctr 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "asn1_der 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1340,12 +1340,12 @@ dependencies = [ "ctr 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "twofish 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "untrusted 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1354,12 +1354,12 @@ dependencies = [ [[package]] name = "libp2p-tcp-transport" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "tk-listen 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1368,10 +1368,10 @@ dependencies = [ [[package]] name = "libp2p-transport-timeout" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1379,25 +1379,25 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "tokio-uds 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "libp2p-websocket" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", - "rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", + "rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "stdweb 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "websocket 0.20.3 (git+https://github.com/tomaka/rust-websocket?branch=send)", @@ -1406,11 +1406,11 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1552,11 +1552,11 @@ dependencies = [ [[package]] name = "multiaddr" version = "0.3.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bs58 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "multihash 0.8.1-pre (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "multihash 0.8.1-pre (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1564,7 +1564,7 @@ dependencies = [ [[package]] name = "multihash" version = "0.8.1-pre" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "sha1 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1574,7 +1574,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1626,6 +1626,11 @@ name = "nodrop" version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "nohash-hasher" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "num-integer" version = "0.1.38" @@ -2169,7 +2174,7 @@ dependencies = [ "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2354,7 +2359,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.1.0" -source = "git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab#3e53a9dcc728d2e932d731bf90a8e81e0e4257ab" +source = "git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef#f2a5eee5e8363b5cf567206ab2c01c564943d2ef" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2717,7 +2722,7 @@ dependencies = [ "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2817,7 +2822,7 @@ dependencies = [ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)", + "libp2p 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3954,11 +3959,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "yamux" version = "0.1.0" -source = "git+https://github.com/paritytech/yamux#db6285826bed432fa599e6051fadc22920fe6614" +source = "git+https://github.com/paritytech/yamux#5acf79ecfb69ccdcb65c9f624f285b79716a029d" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "nohash-hasher 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4021,7 +4027,7 @@ dependencies = [ "checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda" "checksum ctr 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "50ac3add446ec1f8fe3dc007cd838f5b22bbf33186394feac505451ecc43c018" "checksum ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)" = "" -"checksum datastore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" +"checksum datastore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" "checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8" "checksum digest 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3cae2388d706b52f2f2f9afe280f9d768be36544bd71d1b8120cb34ea6450b55" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" @@ -4085,23 +4091,23 @@ dependencies = [ "checksum lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e6412c5e2ad9584b0b8e979393122026cdd6d2a80b933f890dcd694ddbe73739" "checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" "checksum libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)" = "76e3a3ef172f1a0b9a9ff0dd1491ae5e6c948b94479a3021819ba7d860c8645d" -"checksum libp2p 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-dns 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-floodsub 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-identify 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-kad 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-mplex 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-ping 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-ratelimit 0.1.1 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-relay 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-secio 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-tcp-transport 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-transport-timeout 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-uds 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-websocket 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum libp2p-yamux 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" +"checksum libp2p 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-dns 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-floodsub 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-identify 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-kad 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-mplex 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-peerstore 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-ping 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-ratelimit 0.1.1 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-relay 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-secio 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-tcp-transport 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-transport-timeout 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-uds 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-websocket 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum libp2p-yamux 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" "checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e" "checksum local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1ceb20f39ff7ae42f3ff9795f3986b1daad821caaa1e1732a0944103a5a1a66" "checksum lock_api 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "949826a5ccf18c1b3a7c3d57692778d21768b79e46eb9dd07bfc4c2160036c54" @@ -4117,14 +4123,15 @@ dependencies = [ "checksum mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)" = "6d771e3ef92d58a8da8df7d6976bfca9371ed1de6619d9d5a5ce5b1f29b85bfe" "checksum mio-uds 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "84c7b5caa3a118a6e34dbac36504503b1e8dc5835e833306b9d6af0e05929f79" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" -"checksum multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum multihash 0.8.1-pre (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" -"checksum multistream-select 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" +"checksum multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum multihash 0.8.1-pre (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" +"checksum multistream-select 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" "checksum names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef320dab323286b50fb5cdda23f61c796a72a89998ab565ca32525c5c556f2da" "checksum nan-preserving-float 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34d4f00fcc2f4c9efa8cc971db0da9e28290e28e97af47585e48691ef10ff31f" "checksum native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f74dbadc8b43df7864539cedb7bc91345e532fdd913cfdc23ad94f4d2d40fbc0" "checksum net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)" = "9044faf1413a1057267be51b5afba8eb1090bd2231c693664aa1db716fe1eae0" "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" +"checksum nohash-hasher 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "27593c72432b8cec9ae79e92792a73c38341064d525b6b612a9fccf8b7d17407" "checksum num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac0ea58d64a89d9d6b7688031b3be9358d6c919badcf7fbb0527ccfd891ee45" "checksum num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "775393e285254d2f5004596d69bb8bc1149754570dcc08cf30cabeba67955e28" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" @@ -4181,7 +4188,7 @@ dependencies = [ "checksum rustc-hex 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d2b03280c2813907a030785570c577fb27d3deec8da4c18566751ade94de0ace" "checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" "checksum rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a54aa04a10c68c1c4eacb4337fd883b435997ede17a9385784b990777686b09a" -"checksum rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=3e53a9dcc728d2e932d731bf90a8e81e0e4257ab)" = "" +"checksum rw-stream-sink 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=f2a5eee5e8363b5cf567206ab2c01c564943d2ef)" = "" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum schannel 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "85fd9df495640643ad2d00443b3d78aae69802ad488debab4f1dd52fc1806ade" "checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28" diff --git a/substrate/network-libp2p/Cargo.toml b/substrate/network-libp2p/Cargo.toml index 739d87fc2ccde..d4109c3eb0284 100644 --- a/substrate/network-libp2p/Cargo.toml +++ b/substrate/network-libp2p/Cargo.toml @@ -11,7 +11,7 @@ bytes = "0.4" error-chain = { version = "0.12", default-features = false } fnv = "1.0" futures = "0.1" -libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev = "3e53a9dcc728d2e932d731bf90a8e81e0e4257ab", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] } +libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev = "f2a5eee5e8363b5cf567206ab2c01c564943d2ef", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] } ethereum-types = "0.3" parking_lot = "0.5" libc = "0.2" diff --git a/substrate/network-libp2p/src/node_handler.rs b/substrate/network-libp2p/src/node_handler.rs index 56d5b41cd1331..6890c184fbd8a 100644 --- a/substrate/network-libp2p/src/node_handler.rs +++ b/substrate/network-libp2p/src/node_handler.rs @@ -18,6 +18,7 @@ use bytes::Bytes; use custom_proto::{RegisteredProtocols, RegisteredProtocolOutput}; use futures::{prelude::*, future, task}; use libp2p::core::{ConnectionUpgrade, Endpoint, PeerId, PublicKey, upgrade}; +use libp2p::core::nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}; use libp2p::kad::{KadConnecConfig, KadFindNodeRespond, KadIncomingRequest, KadConnecController}; use libp2p::{identify, ping}; use parking_lot::Mutex; @@ -47,7 +48,7 @@ const DELAY_TO_FIRST_IDENTIFY: Duration = Duration::from_secs(2); /// /// The node will be pinged at a regular interval to determine whether it's still alive. We will /// also regularly query the remote for identification information, for statistics purposes. -pub struct NodeHandler { +pub struct SubstrateNodeHandler { /// List of registered custom protocols. registered_custom: Arc>, /// Substreams open for "custom" protocols (eg. dot). @@ -55,6 +56,8 @@ pub struct NodeHandler { /// Substream open for Kademlia, if any. kademlia_substream: Option<(KadConnecController, Box + Send>)>, + /// If true, we need to send back a `KadOpen` event on the stream (if Kademlia is open). + need_report_kad_open: bool, /// Substream open for sending pings, if any. ping_out_substream: Option<(ping::Pinger, Box + Send>)>, @@ -81,10 +84,15 @@ pub struct NodeHandler { upgrades_in_progress_dial: Vec<(UpgradePurpose, Box, Error = IoError> + Send>)>, /// The substreams we want to open. queued_dial_upgrades: Vec, - /// Number of outbound substreams that the user should open. - /// While this is non-zero, polling the handler will produce `OutboundSubstreamRequested`. + /// Number of outbound substreams the outside should open for us. num_out_user_must_open: usize, + /// The `Multiaddr` event to produce. + multiaddr_event: Option>, + + /// The node has started its shutdown process. + is_shutting_down: bool, + /// Task to notify if we add an element to one of the lists from the public API. to_notify: Option, } @@ -98,8 +106,8 @@ enum UpgradePurpose { Ping, } -/// Event that can happen on the `NodeHandler`. -pub enum NodeEvent { +/// Event that can happen on the `SubstrateNodeHandler`. +pub enum SubstrateOutEvent { /// The node has been determined to be unresponsive. Unresponsive, @@ -112,6 +120,9 @@ pub enum NodeEvent { /// The node has successfully responded to a ping. PingSuccess(Duration), + /// The multiaddress of the node is now known. + Multiaddr(Result), + /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Identifier of the protocol. @@ -151,13 +162,6 @@ pub enum NodeEvent { /// The `IdentificationRequest` object should be used to send the information. IdentificationRequest(IdentificationRequest), - /// The emitter wants a new outbound substream to be opened. - /// - /// In the future, the user should answer that request by calling `inject_substream` with - /// `endpoint` set to `Dialer`. - /// If multiple such events are produced, the user should open a new substream once per event. - OutboundSubstreamRequested, - /// Opened a Kademlia substream with the node. KadOpen(KadConnecController), @@ -224,7 +228,26 @@ impl IdentificationRequest { } } -/// Ideally we would have a method on `NodeHandler` that builds this type, but in practice it's a +/// Event that can be received by a `SubstrateNodeHandler`. +#[derive(Debug, Clone)] +pub enum SubstrateInEvent { + /// Before anything happens on the node, we wait for an `Accept` event. This is used to deny + /// nodes based on their peer ID. + Accept, + + /// Sends a message through a custom protocol substream. + SendCustomMessage { + protocol: ProtocolId, + packet_id: PacketId, + data: Vec, + }, + + /// Requests to open a Kademlia substream. + // TODO: document better + OpenKademlia, +} + +/// Ideally we would have a method on `SubstrateNodeHandler` that builds this type, but in practice it's a /// bit tedious to express, even with the `impl Trait` syntax. /// Therefore we simply use a macro instead. macro_rules! listener_upgrade { @@ -238,7 +261,7 @@ macro_rules! listener_upgrade { ) } -impl NodeHandler +impl SubstrateNodeHandler where TSubstream: AsyncRead + AsyncWrite + Send + 'static, TUserData: Clone + Send + 'static, { @@ -251,9 +274,10 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, .map(|proto| UpgradePurpose::Custom(proto.id())) .collect(); - NodeHandler { + SubstrateNodeHandler { custom_protocols_substreams: Vec::with_capacity(registered_custom_len), kademlia_substream: None, + need_report_kad_open: false, identify_send_back: Arc::new(Mutex::new(Vec::with_capacity(1))), ping_in_substreams: Vec::with_capacity(1), ping_out_substream: None, @@ -265,59 +289,24 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, next_identify: Interval::new(Instant::now() + DELAY_TO_FIRST_IDENTIFY, PERIOD_IDENTIFY), queued_dial_upgrades, num_out_user_must_open: registered_custom_len, + multiaddr_event: None, + is_shutting_down: false, to_notify: None, } } +} - /// Closes the node and returns all the events that should be produced by gracefully closing - /// everything. - // TODO: stronger return type - pub fn close(self) -> Vec> { - let mut events = Vec::new(); - - if let Some(_) = self.kademlia_substream { - events.push(NodeEvent::KadClosed(Ok(()))); - } - - for proto in self.custom_protocols_substreams { - events.push(NodeEvent::CustomProtocolClosed { - protocol_id: proto.protocol_id, - result: Ok(()), - }); - } - - events - } - - /// Sends a message on a custom protocol substream. - pub fn send_custom_message( - &mut self, - protocol: ProtocolId, - packet_id: PacketId, - data: Vec, - ) { - debug_assert!(self.registered_custom.has_protocol(protocol), - "invalid protocol id requested in the API of the libp2p networking"); - let proto = match self.custom_protocols_substreams.iter().find(|p| p.protocol_id == protocol) { - Some(proto) => proto, - None => return, // TODO: diagnostic message? - }; - - let mut message = Bytes::with_capacity(1 + data.len()); - message.extend_from_slice(&[packet_id]); - message.extend_from_slice(&data); - - // TODO: report error? - let _ = proto.outgoing.unbounded_send(message); - } +impl NodeHandler for SubstrateNodeHandler +where TSubstream: AsyncRead + AsyncWrite + Send + 'static, + TUserData: Clone + Send + 'static, +{ + type InEvent = SubstrateInEvent; + type OutEvent = SubstrateOutEvent; + type OutboundOpenInfo = (); - /// Injects a substream that has been successfully opened with this node. - /// - /// If `endpoint` is `Listener`, the remote opened the substream. If `endpoint` is `Dialer`, - /// our node opened it. - pub fn inject_substream(&mut self, substream: TSubstream, endpoint: Endpoint) { + fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint) { // For listeners, propose all the possible upgrades. - if endpoint == Endpoint::Listener { + if endpoint == NodeHandlerEndpoint::Listener { let listener_upgrade = listener_upgrade!(self); // TODO: shouldn't be future::empty() ; requires a change in libp2p let upgrade = upgrade::apply(substream, listener_upgrade, Endpoint::Listener, future::empty()) @@ -332,8 +321,12 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, // If we're the dialer, we have to decide which upgrade we want. let purpose = if self.queued_dial_upgrades.is_empty() { - error!(target: "sub-libp2p", "Logic error: opened an outgoing substream \ - with no purpose"); + // Since we sometimes remove elements from `queued_dial_upgrades` before they succeed + // but after the outbound substream has started opening, it is possible that the queue + // is empty when we receive a substream. This is not an error. + // Example: we want to open a Kademlia substream, we start opening one, but in the + // meanwhile the remote opens a Kademlia substream. When we receive the new substream, + // we don't need it anymore. return; } else { self.queued_dial_upgrades.remove(0) @@ -384,20 +377,133 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } } - /// If we have a Kademlia substream open, returns a copy of the controller. Otherwise, the node - /// will try to open a Kademlia substream and produce a `KadOpen` event containing the - /// controller. - pub fn open_kademlia(&mut self) -> Option { - if let Some((ref ctrl, _)) = self.kademlia_substream { - Some(ctrl.clone()) + #[inline] + fn inject_inbound_closed(&mut self) { + } + + #[inline] + fn inject_outbound_closed(&mut self, _: Self::OutboundOpenInfo) { + } + + #[inline] + fn inject_multiaddr(&mut self, multiaddr: Result) { + self.multiaddr_event = Some(multiaddr); + if let Some(to_notify) = self.to_notify.take() { + to_notify.notify(); + } + } + + fn inject_event(&mut self, event: Self::InEvent) { + match event { + SubstrateInEvent::SendCustomMessage { protocol, packet_id, data } => { + self.send_custom_message(protocol, packet_id, data); + }, + SubstrateInEvent::OpenKademlia => self.open_kademlia(), + SubstrateInEvent::Accept => { + // TODO: implement + }, + } + } + + fn shutdown(&mut self) { + // TODO: close gracefully + self.is_shutting_down = true; + if let Some(to_notify) = self.to_notify.take() { + to_notify.notify(); + } + } + + fn poll(&mut self) -> Poll>, IoError> { + if self.is_shutting_down { + return Ok(Async::Ready(None)); + } + + if let Some(multiaddr_event) = self.multiaddr_event.take() { + let event = SubstrateOutEvent::Multiaddr(multiaddr_event); + return Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))); + } + + // Request new outbound substreams from the user if necessary. + if self.num_out_user_must_open >= 1 { + self.num_out_user_must_open -= 1; + return Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(())))); + } + + match self.poll_upgrades_in_progress()? { + Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))), + Async::NotReady => (), + }; + + match self.poll_custom_protocols()? { + Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))), + Async::NotReady => (), + }; + + match self.poll_kademlia()? { + Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))), + Async::NotReady => (), + }; + + match self.poll_ping()? { + Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))), + Async::NotReady => (), + }; + + match self.poll_identify()? { + Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))), + Async::NotReady => (), + }; + + // Nothing happened. Register our task to be notified and return. + self.to_notify = Some(task::current()); + Ok(Async::NotReady) + } +} + +impl SubstrateNodeHandler +where TSubstream: AsyncRead + AsyncWrite + Send + 'static, + TUserData: Clone + Send + 'static, +{ + /// Sends a message on a custom protocol substream. + fn send_custom_message( + &mut self, + protocol: ProtocolId, + packet_id: PacketId, + data: Vec, + ) { + debug_assert!(self.registered_custom.has_protocol(protocol), + "invalid protocol id requested in the API of the libp2p networking"); + let proto = match self.custom_protocols_substreams.iter().find(|p| p.protocol_id == protocol) { + Some(proto) => proto, + None => { + error!(target: "sub-libp2p", "Protocol {:?} isn't open", protocol); + return + }, + }; + + let mut message = Bytes::with_capacity(1 + data.len()); + message.extend_from_slice(&[packet_id]); + message.extend_from_slice(&data); + + if let Err(_) = proto.outgoing.unbounded_send(message) { + error!(target: "sub-libp2p", "Error while sending custom message to channel"); + } + } + + /// The node will try to open a Kademlia substream and produce a `KadOpen` event containing the + /// controller. If a Kademlia substream is already open, produces the event immediately. + fn open_kademlia(&mut self) { + if self.kademlia_substream.is_some() { + self.need_report_kad_open = true; + if let Some(to_notify) = self.to_notify.take() { + to_notify.notify(); + } } else if self.has_upgrade_purpose(&UpgradePurpose::Kad) { // We are currently upgrading a substream to Kademlia ; nothing more to do except wait. - None } else { // Opening a new substream for Kademlia. self.queued_dial_upgrades.push(UpgradePurpose::Kad); self.num_out_user_must_open += 1; - None } } @@ -429,24 +535,24 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, fn inject_fully_negotiated( &mut self, upgrade: FinalUpgrade - ) -> Option> { + ) -> Option> { match upgrade { FinalUpgrade::IdentifyListener(sender) => - Some(NodeEvent::IdentificationRequest(IdentificationRequest { + Some(SubstrateOutEvent::IdentificationRequest(IdentificationRequest { sender, identify_send_back: self.identify_send_back.clone(), protocols: self.supported_protocol_names(), })), FinalUpgrade::IdentifyDialer(info, observed_addr) => { self.cancel_dial_upgrade(&UpgradePurpose::Identify); - Some(NodeEvent::Identified { info, observed_addr }) + Some(SubstrateOutEvent::Identified { info, observed_addr }) }, FinalUpgrade::PingDialer(pinger, ping_process) => { self.cancel_dial_upgrade(&UpgradePurpose::Ping); // We always open the ping substream for a reason, which is to immediately ping. self.ping_out_substream = Some((pinger, ping_process)); if self.ping_remote() { - Some(NodeEvent::PingStart) + Some(SubstrateOutEvent::PingStart) } else { None } @@ -461,7 +567,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, // Refuse the substream if we already have Kademlia substream open. if self.kademlia_substream.is_none() { self.kademlia_substream = Some((controller.clone(), stream)); - Some(NodeEvent::KadOpen(controller)) + Some(SubstrateOutEvent::KadOpen(controller)) } else { None } @@ -473,7 +579,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, return None; } - let event = NodeEvent::CustomProtocolOpen { + let event = SubstrateOutEvent::CustomProtocolOpen { protocol_id: proto.protocol_id, version: proto.protocol_version, }; @@ -521,7 +627,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } /// Polls the upgrades in progress. - fn poll_upgrades_in_progress(&mut self) -> Poll>, IoError> { + fn poll_upgrades_in_progress(&mut self) -> Poll>, IoError> { // Continue negotiation of newly-opened substreams on the listening side. // We remove each element from `upgrades_in_progress_listen` one by one and add them back // if not ready. @@ -537,7 +643,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, self.upgrades_in_progress_listen.push(in_progress); }, Err(err) => { - return Ok(Async::Ready(Some(NodeEvent::SubstreamUpgradeFail(err)))); + return Ok(Async::Ready(Some(SubstrateOutEvent::SubstreamUpgradeFail(err)))); }, } } @@ -559,11 +665,11 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, // TODO: dispatch depending on actual error ; right now we assume that // error == not supported, which is not necessarily true in theory if let UpgradePurpose::Custom(_) = purpose { - return Ok(Async::Ready(Some(NodeEvent::Useless))); + return Ok(Async::Ready(Some(SubstrateOutEvent::Useless))); } else { let msg = format!("While upgrading to {:?}: {:?}", purpose, err); let err = IoError::new(IoErrorKind::Other, msg); - return Ok(Async::Ready(Some(NodeEvent::SubstreamUpgradeFail(err)))); + return Ok(Async::Ready(Some(SubstrateOutEvent::SubstreamUpgradeFail(err)))); } }, } @@ -573,7 +679,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } /// Polls the upgrades in progress. - fn poll_custom_protocols(&mut self) -> Poll>, IoError> { + fn poll_custom_protocols(&mut self) -> Poll>, IoError> { // Poll for messages on the custom protocol stream. for n in (0 .. self.custom_protocols_substreams.len()).rev() { let mut custom_proto = self.custom_protocols_substreams.swap_remove(n); @@ -582,7 +688,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, Ok(Async::Ready(Some((packet_id, data)))) => { let protocol_id = custom_proto.protocol_id; self.custom_protocols_substreams.push(custom_proto); - return Ok(Async::Ready(Some(NodeEvent::CustomMessage { + return Ok(Async::Ready(Some(SubstrateOutEvent::CustomMessage { protocol_id, packet_id, data, @@ -592,7 +698,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, // Trying to reopen the protocol. self.queued_dial_upgrades.push(UpgradePurpose::Custom(custom_proto.protocol_id)); self.num_out_user_must_open += 1; - return Ok(Async::Ready(Some(NodeEvent::CustomProtocolClosed { + return Ok(Async::Ready(Some(SubstrateOutEvent::CustomProtocolClosed { protocol_id: custom_proto.protocol_id, result: Ok(()), }))) @@ -601,7 +707,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, // Trying to reopen the protocol. self.queued_dial_upgrades.push(UpgradePurpose::Custom(custom_proto.protocol_id)); self.num_out_user_must_open += 1; - return Ok(Async::Ready(Some(NodeEvent::CustomProtocolClosed { + return Ok(Async::Ready(Some(SubstrateOutEvent::CustomProtocolClosed { protocol_id: custom_proto.protocol_id, result: Err(err), }))) @@ -613,18 +719,26 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } /// Polls the open Kademlia substream, if any. - fn poll_kademlia(&mut self) -> Poll>, IoError> { + fn poll_kademlia(&mut self) -> Poll>, IoError> { + // Produce a `KadOpen` event if necessary. + if self.need_report_kad_open { + self.need_report_kad_open = false; + if let Some((ref kad_ctrl, _)) = self.kademlia_substream { + return Ok(Async::Ready(Some(SubstrateOutEvent::KadOpen(kad_ctrl.clone())))); + } + } + // Poll for Kademlia events. if let Some((controller, mut stream)) = self.kademlia_substream.take() { match stream.poll() { Ok(Async::Ready(Some(KadIncomingRequest::FindNode { searched, responder }))) => { - return Ok(Async::Ready(Some(NodeEvent::KadFindNode { searched, responder }))); + return Ok(Async::Ready(Some(SubstrateOutEvent::KadFindNode { searched, responder }))); }, // We don't care about Kademlia pings, they are unused. Ok(Async::Ready(Some(KadIncomingRequest::PingPong))) => {}, Ok(Async::NotReady) => self.kademlia_substream = Some((controller, stream)), - Ok(Async::Ready(None)) => return Ok(Async::Ready(Some(NodeEvent::KadClosed(Ok(()))))), - Err(err) => return Ok(Async::Ready(Some(NodeEvent::KadClosed(Err(err))))), + Ok(Async::Ready(None)) => return Ok(Async::Ready(Some(SubstrateOutEvent::KadClosed(Ok(()))))), + Err(err) => return Ok(Async::Ready(Some(SubstrateOutEvent::KadClosed(Err(err))))), } } @@ -632,7 +746,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } /// Polls the ping substreams. - fn poll_ping(&mut self) -> Poll>, IoError> { + fn poll_ping(&mut self) -> Poll>, IoError> { // Poll for answering pings. for n in (0 .. self.ping_in_substreams.len()).rev() { let mut ping = self.ping_in_substreams.swap_remove(n); @@ -658,10 +772,10 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, match ping_attempt.poll() { Ok(Async::Ready(())) => { self.next_ping.reset(Instant::now() + DELAY_TO_NEXT_PING); - return Ok(Async::Ready(Some(NodeEvent::PingSuccess(started.elapsed())))); + return Ok(Async::Ready(Some(SubstrateOutEvent::PingSuccess(started.elapsed())))); }, Ok(Async::NotReady) => self.active_ping_out = Some((started, ping_attempt)), - Err(_) => return Ok(Async::Ready(Some(NodeEvent::Unresponsive))), + Err(_) => return Ok(Async::Ready(Some(SubstrateOutEvent::Unresponsive))), } } @@ -673,7 +787,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, // it again without having an accident. self.next_ping.reset(Instant::now() + Duration::from_secs(5 * 60)); if self.ping_remote() { - return Ok(Async::Ready(Some(NodeEvent::PingStart))); + return Ok(Async::Ready(Some(SubstrateOutEvent::PingStart))); } }, Err(err) => { @@ -686,7 +800,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } /// Polls the identify substreams. - fn poll_identify(&mut self) -> Poll>, IoError> { + fn poll_identify(&mut self) -> Poll>, IoError> { // Poll the future that fires when we need to identify the node again. loop { match self.next_identify.poll() { @@ -718,51 +832,6 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } } -impl Stream for NodeHandler -where TSubstream: AsyncRead + AsyncWrite + Send + 'static, - TUserData: Clone + Send + 'static, -{ - type Item = NodeEvent; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - // Request new outbound substreams from the user if necessary. - if self.num_out_user_must_open >= 1 { - self.num_out_user_must_open -= 1; - return Ok(Async::Ready(Some(NodeEvent::OutboundSubstreamRequested))); - } - - match self.poll_upgrades_in_progress()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - }; - - match self.poll_custom_protocols()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - }; - - match self.poll_kademlia()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - }; - - match self.poll_ping()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - }; - - match self.poll_identify()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - }; - - // Nothing happened. Register our task to be notified and return. - self.to_notify = Some(task::current()); - Ok(Async::NotReady) - } -} - /// Enum of all the possible protocols our service handles. enum FinalUpgrade { Kad(KadConnecController, Box + Send>), diff --git a/substrate/network-libp2p/src/service.rs b/substrate/network-libp2p/src/service.rs index e8befa459e84b..420ec064746ce 100644 --- a/substrate/network-libp2p/src/service.rs +++ b/substrate/network-libp2p/src/service.rs @@ -372,51 +372,56 @@ fn init_thread( local_address: None, // TODO: fill }); }, - ServiceEvent::NodeClosed { node_index, closed_custom_protocols } => { + ServiceEvent::NodeClosed { node_index, closed_custom_protocols: protocols } | + ServiceEvent::ClosedCustomProtocols { node_index, protocols } => { let old = peers.lock().remove(&node_index); debug_assert!(old.is_some()); - for protocol in closed_custom_protocols { + for protocol in protocols { registered_custom.find_protocol(protocol) - .expect("Invalid protocol ID") + .expect("we passed a list of protocols when building the service, and never \ + modify that list ; therefore all the reported ids should always be valid") .custom_data() .disconnected(&ctxt!(protocol, node_index), &node_index); } }, ServiceEvent::PingDuration(node_index, ping) => peers.lock().get_mut(&node_index) - .expect("State mismatch in the network service") + .expect("peers is kept in sync with the state in the service") .ping = Some(ping), ServiceEvent::NodeInfos { node_index, client_version } => peers.lock().get_mut(&node_index) - .expect("State mismatch in the network service") + .expect("peers is kept in sync with the state in the service") .client_version = Some(client_version), ServiceEvent::NodeAddress { node_index, address } => peers.lock().get_mut(&node_index) - .expect("State mismatch in the network service") + .expect("peers is kept in sync with the state in the service") .remote_address = Some(address), ServiceEvent::OpenedCustomProtocol { node_index, protocol, version } => { peers.lock().get_mut(&node_index) - .expect("State inconsistency in service") + .expect("peers is kept in sync with the state in the service") .protocols .push((protocol, version)); registered_custom.find_protocol(protocol) - .expect("Invalid protocol ID") + .expect("we passed a list of protocols when building the service, and never \ + modify that list ; therefore all the reported ids should always be valid") .custom_data() .connected(&ctxt!(protocol, node_index), &node_index) }, ServiceEvent::ClosedCustomProtocol { node_index, protocol } => { peers.lock().get_mut(&node_index) - .expect("State inconsistency in service") + .expect("peers is kept in sync with the state in the service") .protocols .retain(|&(ref p, _)| p != &protocol); registered_custom.find_protocol(protocol) - .expect("Invalid protocol ID") + .expect("we passed a list of protocols when building the service, and never \ + modify that list ; therefore all the reported ids should always be valid") .custom_data() .disconnected(&ctxt!(protocol, node_index), &node_index) }, ServiceEvent::CustomMessage { node_index, protocol_id, packet_id, data } => { registered_custom.find_protocol(protocol_id) - .expect("Invalid protocol ID") + .expect("we passed a list of protocols when building the service, and never \ + modify that list ; therefore all the reported ids should always be valid") .custom_data() .read(&ctxt!(protocol_id, node_index), &node_index, packet_id, &data) }, diff --git a/substrate/network-libp2p/src/service_task.rs b/substrate/network-libp2p/src/service_task.rs index 3d996a5f393fa..4172a45625f5d 100644 --- a/substrate/network-libp2p/src/service_task.rs +++ b/substrate/network-libp2p/src/service_task.rs @@ -230,6 +230,16 @@ pub enum ServiceEvent { protocol: ProtocolId, }, + /// Sustom protocol substreams has been closed. + /// + /// Same as `ClosedCustomProtocol` but with multiple protocols. + ClosedCustomProtocols { + /// Index of the node. + node_index: NodeIndex, + /// Protocols that have been closed. + protocols: Vec, + }, + /// Receives a message on a custom protocol stream. CustomMessage { /// Index of the node. @@ -333,7 +343,7 @@ impl Service { .nodes() .filter(|&n| { let peer_id = self.swarm.peer_id_of_node(n) - .expect("Logic error ; invalid node index which we just retreived"); + .expect("swarm.nodes() always returns valid node indices"); !self.reserved_peers.contains(peer_id) }) .collect(); @@ -383,12 +393,16 @@ impl Service { // Kill the node from the swarm, and inject an event about it. let closed_custom_protocols = self.swarm.drop_node(node_index) - .expect("Checked right above that node is valid"); + .expect("we checked right above that node is valid"); self.injected_events.push(ServiceEvent::NodeClosed { node_index, closed_custom_protocols, }); + if let Some(to_notify) = self.to_notify.take() { + to_notify.notify(); + } + if let Some(addr) = self.nodes_addresses.remove(&node_index) { let reason = if disable_duration.is_some() { DisconnectReason::Banned @@ -660,6 +674,21 @@ impl Service { } else { None }, + SwarmEvent::Reconnected { node_index, endpoint, closed_custom_protocols } => { + if let Some(addr) = self.nodes_addresses.remove(&node_index) { + self.topology.report_disconnected(&addr, DisconnectReason::ClosedGracefully); + } + if let ConnectedPoint::Dialer { address } = endpoint { + let peer_id = self.swarm.peer_id_of_node(node_index) + .expect("the swarm always produces events containing valid node indices"); + self.nodes_addresses.insert(node_index, address.clone()); + self.topology.report_connected(&address, peer_id); + } + Some(ServiceEvent::ClosedCustomProtocols { + node_index, + protocols: closed_custom_protocols, + }) + }, SwarmEvent::NodeClosed { node_index, peer_id, closed_custom_protocols } => { debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id); if let Some(addr) = self.nodes_addresses.get(&node_index) { @@ -679,7 +708,7 @@ impl Service { }, SwarmEvent::NodeAddress { node_index, address } => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Inconsistent state ; got NodeAddress event about non-existing node"); + .expect("the swarm always produces events containing valid node indices"); self.topology.report_connected(&address, &peer_id); self.nodes_addresses.insert(node_index, address.clone()); Some(ServiceEvent::NodeAddress { @@ -689,7 +718,7 @@ impl Service { }, SwarmEvent::UnresponsiveNode { node_index } => { let closed_custom_protocols = self.swarm.drop_node(node_index) - .expect("Got UnresponsiveNode event about a non-existing node"); + .expect("the swarm always produces events containing valid node indices"); if let Some(addr) = self.nodes_addresses.remove(&node_index) { self.topology.report_disconnected(&addr, DisconnectReason::ClosedGracefully); } @@ -700,10 +729,10 @@ impl Service { }, SwarmEvent::UselessNode { node_index } => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Got UselessNode event about non-existing node") + .expect("the swarm always produces events containing valid node indices") .clone(); let closed_custom_protocols = self.swarm.drop_node(node_index) - .expect("Got UselessNode event about a non-existing node"); + .expect("the swarm always produces events containing valid node indices"); self.topology.report_useless(&peer_id); if let Some(addr) = self.nodes_addresses.remove(&node_index) { self.topology.report_disconnected(&addr, DisconnectReason::ClosedGracefully); @@ -715,13 +744,13 @@ impl Service { }, SwarmEvent::PingDuration(node_index, ping) => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Got PingDuration event about non-existing node"); + .expect("the swarm always produces events containing valid node indices"); self.kad_system.update_kbuckets(peer_id.clone()); Some(ServiceEvent::PingDuration(node_index, ping)) }, SwarmEvent::NodeInfos { node_index, client_version, listen_addrs } => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Inconsistent state ; got NodeAddress event about non-existing node"); + .expect("the swarm always produces events containing valid node indices"); // TODO: wrong function name self.topology.add_kademlia_discovered_addrs( peer_id, @@ -734,7 +763,7 @@ impl Service { }, SwarmEvent::KadFindNode { node_index, searched, responder } => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Got KadFindNode event about non-existing node"); + .expect("the swarm always produces events containing valid node indices"); let response = self.build_kademlia_response(&searched); self.kad_system.update_kbuckets(peer_id.clone()); responder.respond(response); @@ -742,7 +771,7 @@ impl Service { }, SwarmEvent::KadOpen { node_index, controller } => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Got KadOpen event about non-existing node"); + .expect("the swarm always produces events containing valid node indices"); trace!(target: "sub-libp2p", "Opened Kademlia substream with {:?}", peer_id); self.kad_system.update_kbuckets(peer_id.clone()); if let Some(list) = self.kad_pending_ctrls.lock().remove(&peer_id) { @@ -768,7 +797,7 @@ impl Service { }), SwarmEvent::CustomMessage { node_index, protocol_id, packet_id, data } => { let peer_id = self.swarm.peer_id_of_node(node_index) - .expect("Got CustomMessage event about non-existing node"); + .expect("the swarm always produces events containing valid node indices"); self.kad_system.update_kbuckets(peer_id.clone()); Some(ServiceEvent::CustomMessage { node_index, diff --git a/substrate/network-libp2p/src/swarm.rs b/substrate/network-libp2p/src/swarm.rs index 7ae1adc56f8ad..3a359202ce6d4 100644 --- a/substrate/network-libp2p/src/swarm.rs +++ b/substrate/network-libp2p/src/swarm.rs @@ -17,21 +17,18 @@ use bytes::Bytes; use custom_proto::RegisteredProtocols; use fnv::FnvHashMap; -use futures::{prelude::*, Stream, sync::mpsc, task}; +use futures::{prelude::*, Stream}; use libp2p::{Multiaddr, multiaddr::AddrComponent, PeerId}; use libp2p::core::{muxing, Endpoint, PublicKey}; use libp2p::core::nodes::node::Substream; -use libp2p::core::nodes::swarm::{ConnectedPoint, Swarm as Libp2pSwarm}; +use libp2p::core::nodes::swarm::{ConnectedPoint, Swarm as Libp2pSwarm, HandlerFactory}; use libp2p::core::nodes::swarm::{SwarmEvent as Libp2pSwarmEvent, Peer as SwarmPeer}; use libp2p::core::transport::boxed::Boxed; use libp2p::kad::{KadConnecController, KadFindNodeRespond}; use libp2p::secio; -use node_handler::{NodeEvent, NodeHandler, IdentificationRequest}; +use node_handler::{SubstrateOutEvent, SubstrateNodeHandler, SubstrateInEvent, IdentificationRequest}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::mem; -use std::sync::Arc; -use std::time::Duration; -use tokio_executor; +use std::{mem, sync::Arc, time::Duration}; use transport; use {Error, NodeIndex, PacketId, ProtocolId}; @@ -41,7 +38,8 @@ use {Error, NodeIndex, PacketId, ProtocolId}; pub fn start_swarm( registered_custom: Arc>, local_private_key: secio::SecioKeyPair, -) -> Result, Error> { +) -> Result, Error> +where TUserData: Send + Sync + Clone + 'static { // Private and public keys. let local_public_key = local_private_key.to_public_key(); let local_peer_id = local_public_key.clone().into_peer_id(); @@ -50,25 +48,34 @@ pub fn start_swarm( let transport = transport::build_transport(local_private_key); // Build the underlying libp2p swarm. - let swarm = Libp2pSwarm::new(transport); + let swarm = Libp2pSwarm::with_handler_builder(transport, HandlerBuilder(registered_custom)); - let (node_tasks_events_tx, node_tasks_events_rx) = mpsc::unbounded(); Ok(Swarm { swarm, local_public_key, local_peer_id, listening_addrs: Vec::new(), - registered_custom, node_by_peer: Default::default(), nodes_info: Default::default(), next_node_index: 0, - node_tasks_events_rx, - node_tasks_events_tx, - tasks_to_spawn: Vec::new(), - to_notify: None, }) } +/// Dummy structure that exists because we need to be able to express the type. Otherwise we would +/// use a closure. +#[derive(Clone)] +struct HandlerBuilder(Arc>); +impl HandlerFactory for HandlerBuilder +where TUserData: Clone + Send + Sync + 'static +{ + type Handler = SubstrateNodeHandler, TUserData>; + + #[inline] + fn new_handler(&self) -> Self::Handler { + SubstrateNodeHandler::new(self.0.clone()) + } +} + /// Event produced by the swarm. pub enum SwarmEvent { /// We have successfully connected to a node. @@ -84,7 +91,17 @@ pub enum SwarmEvent { endpoint: ConnectedPoint, }, - /// Closed connection to a node. + /// The connection to a peer has changed. + Reconnected { + /// Index of the node. + node_index: NodeIndex, + /// The new endpoint. + endpoint: ConnectedPoint, + /// List of custom protocols that were closed in the process. + closed_custom_protocols: Vec, + }, + + /// Closed connection to a node, either gracefully or because of an error. /// /// It is guaranteed that this node has been opened with a `NewNode` event beforehand. However /// not all `ClosedCustomProtocol` events have been dispatched. @@ -202,7 +219,12 @@ pub enum SwarmEvent { /// Network swarm. Must be polled regularly in order for the networking to work. pub struct Swarm { /// Stream of events of the swarm. - swarm: Libp2pSwarm, Muxer, ()>, + swarm: Libp2pSwarm< + Boxed<(PeerId, Muxer)>, + SubstrateInEvent, + SubstrateOutEvent>, + HandlerBuilder + >, /// Public key of the local node. local_public_key: PublicKey, @@ -213,50 +235,14 @@ pub struct Swarm { /// Addresses we know we're listening on. Only includes NAT traversed addresses. listening_addrs: Vec, - /// List of registered custom protocols. - registered_custom: Arc>, - /// For each peer id, the corresponding node index. - /// This is filled when the swarm receives a node, and is emptied when the node handler closes. - /// - /// Note that we may temporarily have multiple node indices pointing to the same peer ID. This - /// hash map only contains the latest node for each given peer. - /// Nodes that are not in this list should all be in the closing state. node_by_peer: FnvHashMap, /// All the nodes tasks. Must be maintained consistent with `node_by_peer`. - /// - /// # How it works - /// - /// First, the `swarm` generates events about connected nodes. This creates an entry in - /// `nodes_info` and `node_by_peer`, where the node is in pending mode. Accepting the node - /// spawns a background task and puts a sender in `nodes_info`. - /// - /// If the `swarm` tells us that a node is closed, or if the user wants to drop a peer, we - /// destroy that sender, which tells the background task that it needs to stop. - /// - /// Once the background task stops, we remove the entries in `node_by_peer` and `nodes_info`. - /// - /// In order to maintain a consistent state, at no point we should close the sender without - /// removing the peer from the nework first (removing a peer from the network is - /// instantaneous), and at no point should we remove entries before the background task is - /// stopped. nodes_info: FnvHashMap, /// Next key to use when we insert a new entry in `nodes_info`. next_node_index: NodeIndex, - - /// Events received by the node tasks. If `None`, means that the task finished for this node. - node_tasks_events_rx: mpsc::UnboundedReceiver<(NodeIndex, Option>>)>, - - /// Sending side of `node_tasks_events_rx`. Meant to be cloned and sent to background tasks. - node_tasks_events_tx: mpsc::UnboundedSender<(NodeIndex, Option>>)>, - - /// List of tasks to spawn when we're in a polling context. - tasks_to_spawn: Vec>, - - /// Task to notify when an element is added to `tasks_to_spawn`. - to_notify: Option, } /// Local information about a peer. @@ -267,86 +253,11 @@ struct NodeInfo { /// Whether we opened the connection or the remote opened it. endpoint: Endpoint, - /// State of the node. - state: NodeState, - /// List of custom protocol substreams that are open. open_protocols: Vec, } -/// State of the node. -enum NodeState { - /// The node is waiting to be accepted or denied. - /// Contains both ends of a channel, so that we can start appending messages that will be - /// processed once the node gets accepted. - Pending(mpsc::UnboundedSender, mpsc::UnboundedReceiver), - - /// The node is active. The sender can be used to dispatch messages to the background task. - /// Destroying the sender will close the background task. - Accepted(mpsc::UnboundedSender), - - /// The node is closing. We dropped the sender, and thus. - Closing, - - /// The node has been closed by calling `drop_node`. Same as `closing`, except that we must - /// must not generate any event about this node anymore. - Closed, -} - -impl NodeState { - /// Returns the inner sender, if any. - #[inline] - fn as_sender(&mut self) -> Option<&mut mpsc::UnboundedSender> { - match *self { - NodeState::Pending(ref mut tx, _) => Some(tx), - NodeState::Accepted(ref mut tx) => Some(tx), - NodeState::Closing => None, - NodeState::Closed => None, - } - } - - /// Returns `true` for `NodeState::Closed`. - #[inline] - fn is_closed(&self) -> bool { - match *self { - NodeState::Pending(_, _) => false, - NodeState::Accepted(_) => false, - NodeState::Closing => false, - NodeState::Closed => true, - } - } - - /// Switches the state to `Closing`, unless we're already closing or closed. - #[inline] - fn close_if_necessary(&mut self) { - match *self { - NodeState::Pending(_, _) | NodeState::Accepted(_) => (), - NodeState::Closing | NodeState::Closed => return, - }; - - *self = NodeState::Closing; - } -} - -/// Message from the service to one of the background node tasks. -enum OutToTaskMsg { - /// Must call `inject_substream()` on the node handler. - InjectSubstream { - substream: Substream, - endpoint: Endpoint, - }, - - /// Must call `open_kademlia()` on the node handler. - OpenKademlia, - - /// Must call `send_custom_message()` on the node handler. - SendCustomMessage { - protocol: ProtocolId, - packet_id: PacketId, - data: Vec, - }, -} - +/// The muxer used by the transport. type Muxer = muxing::StreamMuxerBox; impl Swarm @@ -406,10 +317,15 @@ impl Swarm data: Vec ) { if let Some(info) = self.nodes_info.get_mut(&node_index) { - if let Some(ref mut sender) = info.state.as_sender() { - let msg = OutToTaskMsg::SendCustomMessage { protocol, packet_id, data }; - let _ = sender.unbounded_send(msg); + if let Some(mut connected) = self.swarm.peer(info.peer_id.clone()).as_connected() { + connected.send_event(SubstrateInEvent::SendCustomMessage { protocol, packet_id, data }); + } else { + error!(target: "sub-libp2p", "Tried to send message to {:?}, but we're not \ + connected to it", info.peer_id); } + } else { + error!(target: "sub-libp2p", "Tried to send message to invalid node index {:?}", + node_index); } } @@ -453,36 +369,24 @@ impl Swarm /// with the specified index. /// /// Returns an error if the node index is invalid, or if it was already accepted. - #[inline] pub fn accept_node(&mut self, node_index: NodeIndex) -> Result<(), ()> { - let info = match self.nodes_info.get_mut(&node_index) { - Some(i) => i, - None => return Err(()), + // TODO: detect if already accepted? + let peer_id = match self.nodes_info.get(&node_index) { + Some(info) => &info.peer_id, + None => return Err(()) }; - let out_commands_rx = match mem::replace(&mut info.state, NodeState::Closing) { - NodeState::Pending(tx, rx) => { - info.state = NodeState::Accepted(tx); - rx + match self.swarm.peer(peer_id.clone()) { + SwarmPeer::Connected(mut peer) => { + peer.send_event(SubstrateInEvent::Accept); + Ok(()) }, - other => { - info.state = other; - return Err(()) + SwarmPeer::PendingConnect(_) | SwarmPeer::NotConnected(_) => { + error!(target: "sub-libp2p", "State inconsistency detected in accept_node ; \ + nodes_info is not in sync with the underlying swarm"); + Err(()) }, - }; - - self.tasks_to_spawn.push(NodeTask { - node_index, - handler: Some(NodeHandler::new(self.registered_custom.clone())), - out_commands_rx, - node_tasks_events_tx: self.node_tasks_events_tx.clone(), - }); - - if let Some(to_notify) = self.to_notify.take() { - to_notify.notify(); } - - Ok(()) } /// Disconnects a peer. @@ -493,40 +397,25 @@ impl Swarm /// Returns the list of custom protocol substreams that were opened. #[inline] pub fn drop_node(&mut self, node_index: NodeIndex) -> Result, ()> { - let mut must_remove = false; - - let ret = { - let info = match self.nodes_info.get_mut(&node_index) { - Some(i) => i, - None => { - error!(target: "sub-libp2p", "Trying to close non-existing node #{}", node_index); - return Err(()); - }, - }; - - if let Some(connected) = self.swarm.peer(info.peer_id.clone()).as_connected() { - connected.close(); - } - - // If we don't have a background task yet, remove the entry immediately. - if let NodeState::Pending(_, _) = info.state { - must_remove = true; - Vec::new() - } else { - // There may be events pending on the rx side about this node, so we switch it to - // the `Closed` state in order to know not emit any more event about it. - info.state = NodeState::Closed; - info.open_protocols.clone() - } + let info = match self.nodes_info.remove(&node_index) { + Some(i) => i, + None => { + error!(target: "sub-libp2p", "Trying to close non-existing node #{}", node_index); + return Err(()); + }, }; - if must_remove { - let info = self.nodes_info.remove(&node_index) - .expect("We checked the key a few lines above"); - self.node_by_peer.remove(&info.peer_id); + let idx_in_hashmap = self.node_by_peer.remove(&info.peer_id); + debug_assert_eq!(idx_in_hashmap, Some(node_index)); + + if let Some(connected) = self.swarm.peer(info.peer_id.clone()).as_connected() { + connected.close(); + } else { + error!(target: "sub-libp2p", "State inconsistency: node_by_peer and nodes_info are \ + not in sync with the underlying swarm"); } - Ok(ret) + Ok(info.open_protocols) } /// Opens a Kademlia substream with the given node. A `KadOpen` event will later be produced @@ -537,13 +426,17 @@ impl Swarm /// Returns an error if the node index is invalid. pub fn open_kademlia(&mut self, node_index: NodeIndex) -> Result<(), ()> { if let Some(info) = self.nodes_info.get_mut(&node_index) { - if let Some(ref mut sender) = info.state.as_sender() { - let _ = sender.unbounded_send(OutToTaskMsg::OpenKademlia); + if let Some(mut connected) = self.swarm.peer(info.peer_id.clone()).as_connected() { + connected.send_event(SubstrateInEvent::OpenKademlia); Ok(()) } else { + error!(target: "sub-libp2p", "Tried to open Kademlia with {:?}, but we're not \ + connected to it", info.peer_id); Err(()) } } else { + error!(target: "sub-libp2p", "Tried to open Kademlia with invalid node index {:?}", + node_index); Err(()) } } @@ -599,75 +492,6 @@ impl Swarm } } - /// Handles the swarm opening a connection to the given peer. - /// - /// Returns the `NewNode` event to produce. - /// - /// > **Note**: Must be called from inside `poll()`, otherwise it will panic. This method - /// > shouldn't be made public because of this requirement. - fn handle_connection( - &mut self, - peer_id: PeerId, - endpoint: ConnectedPoint - ) -> Option { - let (tx, rx) = mpsc::unbounded(); - - // Assign the node index. - let node_index = self.next_node_index.clone(); - self.next_node_index += 1; - self.node_by_peer.insert(peer_id.clone(), node_index); - self.nodes_info.insert(node_index, NodeInfo { - peer_id: peer_id.clone(), - endpoint: match endpoint { - ConnectedPoint::Listener { .. } => Endpoint::Listener, - ConnectedPoint::Dialer { .. } => Endpoint::Dialer, - }, - state: NodeState::Pending(tx, rx), - open_protocols: Vec::new(), - }); - - Some(SwarmEvent::NodePending { - node_index, - peer_id, - endpoint - }) - } - - /// Handles a swarm event about a newly-opened substream for the given peer. - /// - /// Dispatches the substream to the corresponding task. - fn handle_new_substream( - &mut self, - peer_id: PeerId, - substream: Substream, - endpoint: Endpoint, - ) { - let node_index = match self.node_by_peer.get(&peer_id) { - Some(i) => *i, - None => { - error!(target: "sub-libp2p", "Logic error: new substream for closed node"); - return - }, - }; - - let info = match self.nodes_info.get_mut(&node_index) { - Some(i) => i, - None => { - error!(target: "sub-libp2p", "Logic error: new substream for closed node"); - return - }, - }; - - if let Some(ref mut sender) = info.state.as_sender() { - let _ = sender.unbounded_send(OutToTaskMsg::InjectSubstream { - substream, - endpoint, - }); - } else { - error!(target: "sub-libp2p", "Logic error: no background task for {:?}", peer_id); - } - } - /// Processes an event received by the swarm. /// /// Optionally returns an event to report back to the outside. @@ -676,32 +500,72 @@ impl Swarm /// > shouldn't be made public because of this requirement. fn process_network_event( &mut self, - event: Libp2pSwarmEvent, Muxer, ()> + event: Libp2pSwarmEvent, SubstrateOutEvent>> ) -> Option { match event { - Libp2pSwarmEvent::Connected { peer_id, endpoint } => - if let Some(event) = self.handle_connection(peer_id, endpoint) { - return Some(event); - }, + Libp2pSwarmEvent::Connected { peer_id, endpoint } => { + let node_index = self.next_node_index.clone(); + self.next_node_index += 1; + self.node_by_peer.insert(peer_id.clone(), node_index); + self.nodes_info.insert(node_index, NodeInfo { + peer_id: peer_id.clone(), + endpoint: match endpoint { + ConnectedPoint::Listener { .. } => Endpoint::Listener, + ConnectedPoint::Dialer { .. } => Endpoint::Dialer, + }, + open_protocols: Vec::new(), + }); + + return Some(SwarmEvent::NodePending { + node_index, + peer_id, + endpoint + }); + } Libp2pSwarmEvent::Replaced { peer_id, endpoint, .. } => { - let node_index = *self.node_by_peer.get(&peer_id).expect("State inconsistency"); - self.nodes_info.get_mut(&node_index).expect("State inconsistency") - .state.close_if_necessary(); - if let Some(event) = self.handle_connection(peer_id, endpoint) { - return Some(event); - } + let node_index = self.next_node_index.clone(); + self.next_node_index += 1; + debug_assert_eq!(self.node_by_peer.get(&peer_id), Some(&node_index)); + let infos = self.nodes_info.get_mut(&node_index) + .expect("nodes_info is always kept in sync with the swarm"); + debug_assert_eq!(infos.peer_id, peer_id); + infos.endpoint = match endpoint { + ConnectedPoint::Listener { .. } => Endpoint::Listener, + ConnectedPoint::Dialer { .. } => Endpoint::Dialer, + }; + let closed_custom_protocols = mem::replace(&mut infos.open_protocols, Vec::new()); + + return Some(SwarmEvent::Reconnected { + node_index, + endpoint, + closed_custom_protocols, + }); }, Libp2pSwarmEvent::NodeClosed { peer_id, .. } => { debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id); - let node_index = *self.node_by_peer.get(&peer_id).expect("State inconsistency"); - self.nodes_info.get_mut(&node_index).expect("State inconsistency") - .state.close_if_necessary(); + let node_index = self.node_by_peer.remove(&peer_id) + .expect("node_by_peer is always kept in sync with the inner swarm"); + let infos = self.nodes_info.remove(&node_index) + .expect("nodes_info is always kept in sync with the inner swarm"); + debug_assert_eq!(infos.peer_id, peer_id); + return Some(SwarmEvent::NodeClosed { + node_index, + peer_id, + closed_custom_protocols: infos.open_protocols, + }); }, Libp2pSwarmEvent::NodeError { peer_id, error, .. } => { debug!(target: "sub-libp2p", "Closing {:?} because of error: {:?}", peer_id, error); - let node_index = *self.node_by_peer.get(&peer_id).expect("State inconsistency"); - self.nodes_info.get_mut(&node_index).expect("State inconsistency") - .state.close_if_necessary(); + let node_index = self.node_by_peer.remove(&peer_id) + .expect("node_by_peer is always kept in sync with the inner swarm"); + let infos = self.nodes_info.remove(&node_index) + .expect("nodes_info is always kept in sync with the inner swarm"); + debug_assert_eq!(infos.peer_id, peer_id); + return Some(SwarmEvent::NodeClosed { + node_index, + peer_id, + closed_custom_protocols: infos.open_protocols, + }); }, Libp2pSwarmEvent::DialError { multiaddr, error, .. } => return Some(SwarmEvent::DialFail { @@ -732,139 +596,72 @@ impl Swarm warn!(target: "sub-libp2p", "No listener left"); } }, - Libp2pSwarmEvent::NodeMultiaddr { peer_id, address: Ok(address) } => { - trace!(target: "sub-libp2p", "Determined the multiaddr of {:?} => {}", - peer_id, address); - if let Some(&node_index) = self.node_by_peer.get(&peer_id) { - return Some(SwarmEvent::NodeAddress { - node_index, - address, - }); - } else { - error!(target: "sub-libp2p", "Logic error: no index for {:?}", peer_id); - } - }, - Libp2pSwarmEvent::NodeMultiaddr { peer_id, address: Err(err) } => - trace!(target: "sub-libp2p", "Error when determining the multiaddr of {:?} => {:?}", - peer_id, err), + Libp2pSwarmEvent::NodeEvent { peer_id, event } => + if let Some(event) = self.handle_node_event(peer_id, event) { + return Some(event); + }, Libp2pSwarmEvent::IncomingConnection { listen_addr } => trace!(target: "sub-libp2p", "Incoming connection on listener {}", listen_addr), Libp2pSwarmEvent::IncomingConnectionError { listen_addr, error } => trace!(target: "sub-libp2p", "Incoming connection on listener {} errored: {:?}", listen_addr, error), - Libp2pSwarmEvent::InboundSubstream { peer_id, substream } => - self.handle_new_substream(peer_id, substream, Endpoint::Listener), - Libp2pSwarmEvent::OutboundSubstream { peer_id, substream, .. } => - self.handle_new_substream(peer_id, substream, Endpoint::Dialer), - Libp2pSwarmEvent::InboundClosed { .. } => {}, - Libp2pSwarmEvent::OutboundClosed { .. } => {}, } None } - /// Polls for what happened on the main network side. - fn poll_network(&mut self) -> Poll, IoError> { - loop { - match self.swarm.poll() { - Ok(Async::Ready(Some(event))) => - if let Some(event) = self.process_network_event(event) { - return Ok(Async::Ready(Some(event))); - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"), - // TODO: this `Err` contains a `Void` ; remove variant when Rust allows that - Err(_) => unreachable!("The Swarm stream never errors"), - } - } - } - - /// Polls for what happened on the background node tasks. - fn poll_node_tasks(&mut self) -> Poll, IoError> { - loop { - match self.node_tasks_events_rx.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(Some((node_index, event)))) => - if let Some(event) = self.handle_node_event(node_index, event) { - return Ok(Async::Ready(Some(event))); - }, - Ok(Async::Ready(None)) => unreachable!("The tx is in self so the rx never closes"), - Err(()) => unreachable!("An UnboundedReceiver never errors"), - } - } - } - - /// Processes an event obtained by a node background task. - /// - /// If the `event` is `None`, that means that the background task finished. + /// Processes an event obtained by a node in the swarm. /// /// Optionally returns an event that the service must emit. /// - /// > **Note**: The event **must** have been produced by a background task, otherwise state + /// > **Note**: The event **must** have been produced by the swarm, otherwise state /// > inconsistencies will likely happen. fn handle_node_event( &mut self, - node_index: NodeIndex, - event: Option>> + peer_id: PeerId, + event: SubstrateOutEvent> ) -> Option { // Obtain the peer id and whether the node has been closed earlier. // If the node has been closed, do not generate any additional event about it. - let (peer_id, node_is_closed) = { - let info = self.nodes_info.get_mut(&node_index) - .expect("Handlers are created when we fill nodes_info, and nodes_info is cleared \ - only when a background task ends"); - (info.peer_id.clone(), info.state.is_closed()) - }; + let node_index = *self.node_by_peer.get(&peer_id) + .expect("node_by_peer is always kept in sync with the underlying swarm"); match event { - None => { - let _info = self.nodes_info.remove(&node_index).expect("Inconsistent state"); - // It is possible that the entry in `node_by_peer` doesn't match our node, if a new - // node was created with the same peer. - if self.node_by_peer.get(&peer_id) == Some(&node_index) { - self.node_by_peer.remove(&peer_id); - } - - // Only generate an event if `drop_node` hasn't been called with this node index. - if !node_is_closed { - Some(SwarmEvent::NodeClosed { - node_index, - peer_id, - closed_custom_protocols: Vec::new(), - }) - } else { - None - } - }, - Some(NodeEvent::Unresponsive) => { + SubstrateOutEvent::Unresponsive => { debug!(target: "sub-libp2p", "Node {:?} is unresponsive", peer_id); - if !node_is_closed { - Some(SwarmEvent::UnresponsiveNode { node_index }) - } else { - None - } + Some(SwarmEvent::UnresponsiveNode { node_index }) }, - Some(NodeEvent::Useless) => { + SubstrateOutEvent::Useless => { debug!(target: "sub-libp2p", "Node {:?} is useless", peer_id); - if !node_is_closed { - Some(SwarmEvent::UselessNode { node_index }) - } else { - None - } + Some(SwarmEvent::UselessNode { node_index }) }, - Some(NodeEvent::PingStart) => { + SubstrateOutEvent::PingStart => { trace!(target: "sub-libp2p", "Pinging {:?}", peer_id); None }, - Some(NodeEvent::PingSuccess(ping)) => { + SubstrateOutEvent::PingSuccess(ping) => { trace!(target: "sub-libp2p", "Pong from {:?} in {:?}", peer_id, ping); - if !node_is_closed { - Some(SwarmEvent::PingDuration(node_index, ping)) + Some(SwarmEvent::PingDuration(node_index, ping)) + }, + SubstrateOutEvent::Multiaddr(Ok(address)) => { + trace!(target: "sub-libp2p", "Determined the multiaddr of {:?} => {}", + peer_id, address); + if let Some(&node_index) = self.node_by_peer.get(&peer_id) { + Some(SwarmEvent::NodeAddress { + node_index, + address, + }) } else { + error!(target: "sub-libp2p", "Logic error: no index for {:?}", peer_id); None } }, - Some(NodeEvent::Identified { info, observed_addr }) => { + SubstrateOutEvent::Multiaddr(Err(err)) => { + trace!(target: "sub-libp2p", "Error when determining the multiaddr of {:?} => {:?}", + peer_id, err); + None + }, + SubstrateOutEvent::Identified { info, observed_addr } => { self.add_observed_addr(&peer_id, &observed_addr); trace!(target: "sub-libp2p", "Client version of {:?}: {:?}", peer_id, info.agent_version); if !info.agent_version.contains("substrate") { @@ -872,91 +669,57 @@ impl Swarm peer_id, info.agent_version); } - if !node_is_closed { - Some(SwarmEvent::NodeInfos { - node_index, - client_version: info.agent_version, - listen_addrs: info.listen_addrs, - }) - } else { - None - } + Some(SwarmEvent::NodeInfos { + node_index, + client_version: info.agent_version, + listen_addrs: info.listen_addrs, + }) }, - Some(NodeEvent::IdentificationRequest(request)) => { + SubstrateOutEvent::IdentificationRequest(request) => { self.respond_to_identify_request(&peer_id, request); None }, - Some(NodeEvent::KadFindNode { searched, responder }) => { - if !node_is_closed { - Some(SwarmEvent::KadFindNode { node_index, searched, responder }) - } else { - None - } + SubstrateOutEvent::KadFindNode { searched, responder } => { + Some(SwarmEvent::KadFindNode { node_index, searched, responder }) }, - Some(NodeEvent::KadOpen(ctrl)) => { + SubstrateOutEvent::KadOpen(ctrl) => { trace!(target: "sub-libp2p", "Opened Kademlia substream with {:?}", peer_id); - if !node_is_closed { - Some(SwarmEvent::KadOpen { node_index, controller: ctrl }) - } else { - None - } + Some(SwarmEvent::KadOpen { node_index, controller: ctrl }) }, - Some(NodeEvent::KadClosed(result)) => { + SubstrateOutEvent::KadClosed(result) => { trace!(target: "sub-libp2p", "Closed Kademlia substream with {:?}: {:?}", peer_id, result); - if !node_is_closed { - Some(SwarmEvent::KadClosed { node_index, result }) - } else { - None - } + Some(SwarmEvent::KadClosed { node_index, result }) }, - Some(NodeEvent::OutboundSubstreamRequested) => { - if let Some(mut peer) = self.swarm.peer(peer_id.clone()).as_connected() { - peer.open_substream(()); - } else { - error!(target: "sub-libp2p", "Inconsistent state in the service task"); - } - None - }, - Some(NodeEvent::CustomProtocolOpen { protocol_id, version }) => { + SubstrateOutEvent::CustomProtocolOpen { protocol_id, version } => { trace!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id); - self.nodes_info.get_mut(&node_index).expect("Inconsistent state") + self.nodes_info.get_mut(&node_index) + .expect("nodes_info is kept in sync with the underlying swarm") .open_protocols.push(protocol_id); - if !node_is_closed { - Some(SwarmEvent::OpenedCustomProtocol { - node_index, - protocol: protocol_id, - version, - }) - } else { - None - } + Some(SwarmEvent::OpenedCustomProtocol { + node_index, + protocol: protocol_id, + version, + }) }, - Some(NodeEvent::CustomProtocolClosed { protocol_id, result }) => { + SubstrateOutEvent::CustomProtocolClosed { protocol_id, result } => { trace!(target: "sub-libp2p", "Closed custom protocol with {:?}: {:?}", peer_id, result); - self.nodes_info.get_mut(&node_index).expect("Inconsistent state") + self.nodes_info.get_mut(&node_index) + .expect("nodes_info is kept in sync with the underlying swarm") .open_protocols.retain(|p| p != &protocol_id); - if !node_is_closed { - Some(SwarmEvent::ClosedCustomProtocol { - node_index, - protocol: protocol_id, - }) - } else { - None - } + Some(SwarmEvent::ClosedCustomProtocol { + node_index, + protocol: protocol_id, + }) }, - Some(NodeEvent::CustomMessage { protocol_id, packet_id, data }) => { - if !node_is_closed { - Some(SwarmEvent::CustomMessage { - node_index, - protocol_id, - packet_id, - data, - }) - } else { - None - } + SubstrateOutEvent::CustomMessage { protocol_id, packet_id, data } => { + Some(SwarmEvent::CustomMessage { + node_index, + protocol_id, + packet_id, + data, + }) }, - Some(NodeEvent::SubstreamUpgradeFail(err)) => { + SubstrateOutEvent::SubstreamUpgradeFail(err) => { debug!(target: "sub-libp2p", "Error while negotiating final protocol \ with {:?}: {:?}", peer_id, err); None @@ -971,110 +734,17 @@ impl Stream for Swarm type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { - for task in self.tasks_to_spawn.drain(..) { - tokio_executor::spawn(task); - } - - match self.poll_network()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - } - - match self.poll_node_tasks()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - } - - // The only way we reach this is if we went through all the `NotReady` paths above, - // ensuring the current task is registered everywhere. - self.to_notify = Some(task::current()); - Ok(Async::NotReady) - } -} - -/// Wraps around a `NodeHandler` and adds communication with the outside through channels. -struct NodeTask { - node_index: NodeIndex, - handler: Option, TUserData>>, - out_commands_rx: mpsc::UnboundedReceiver, - node_tasks_events_tx: mpsc::UnboundedSender<(NodeIndex, Option>>)>, -} - -impl NodeTask -where TUserData: Clone + Send + Sync + 'static -{ - fn handle_out_command(&mut self, command: OutToTaskMsg) { - match command { - OutToTaskMsg::InjectSubstream { substream, endpoint } => - if let Some(handler) = self.handler.as_mut() { - handler.inject_substream(substream, endpoint); - } else { - error!(target: "sub-libp2p", "Received message after handler is closed"); - }, - OutToTaskMsg::OpenKademlia => - if let Some(handler) = self.handler.as_mut() { - if let Some(ctrl) = handler.open_kademlia() { - let event = NodeEvent::KadOpen(ctrl); - let _ = self.node_tasks_events_tx.unbounded_send((self.node_index, Some(event))); - } - } else { - error!(target: "sub-libp2p", "Received message after handler is closed"); - }, - OutToTaskMsg::SendCustomMessage { protocol, packet_id, data } => - if let Some(handler) = self.handler.as_mut() { - handler.send_custom_message(protocol, packet_id, data); - } else { - error!(target: "sub-libp2p", "Received message after handler is closed"); - }, - } - } -} - -impl Future for NodeTask -where TUserData: Clone + Send + Sync + 'static -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - // Poll for commands sent from the service. loop { - match self.out_commands_rx.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(command))) => self.handle_out_command(command), - Ok(Async::Ready(None)) => { - if let Some(handler) = self.handler.take() { - for event in handler.close() { - let _ = self.node_tasks_events_tx.unbounded_send((self.node_index, Some(event))); - } - } - let _ = self.node_tasks_events_tx.unbounded_send((self.node_index, None)); - return Ok(Async::Ready(())) - }, - Err(_) => unreachable!("An UnboundedReceiver never errors"), - } - } - - // Poll events from the node. - loop { - match self.handler.as_mut().map(|h| h.poll()).unwrap_or(Ok(Async::Ready(None))) { - Ok(Async::Ready(event)) => { - let finished = event.is_none(); - let _ = self.node_tasks_events_tx.unbounded_send((self.node_index, event)); - // If the node's events stream ends, end the task as well. - if finished { - return Ok(Async::Ready(())); + match self.swarm.poll() { + Ok(Async::Ready(Some(event))) => + if let Some(event) = self.process_network_event(event) { + return Ok(Async::Ready(Some(event))); } - }, - Ok(Async::NotReady) => break, - Err(err) => { - warn!(target: "sub-libp2p", "Error in node handler: {:?}", err); - return Ok(Async::Ready(())); - } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"), + // TODO: this `Err` contains a `Void` ; remove variant when Rust allows that + Err(_) => unreachable!("The Swarm stream never errors"), } } - - // If we reach here, that means nothing is ready. - Ok(Async::NotReady) } }