From 6c0c41c7acecbc4d149a3715eccae24d3d3de933 Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Thu, 7 Dec 2023 04:39:59 -0500 Subject: [PATCH] upgrade libp2p to v0.53.* (#4935) * update libp2p and address compiler errors * remove bandwidth logging from transport * use libp2p registry * make clippy happy * use rust 1.73 * correct rpc keep alive * remove comments and obsolte code * remove libp2p prefix * make clippy happy * use quic under facade * remove fast msg id * bubble up close statements * fix wrong comment --- Cargo.lock | 527 +++++++++--------- Dockerfile | 4 +- beacon_node/client/src/builder.rs | 14 +- beacon_node/lighthouse_network/Cargo.toml | 9 +- beacon_node/lighthouse_network/src/config.rs | 7 - .../lighthouse_network/src/discovery/mod.rs | 22 +- beacon_node/lighthouse_network/src/lib.rs | 1 - beacon_node/lighthouse_network/src/metrics.rs | 46 -- .../src/peer_manager/network_behaviour.rs | 22 +- .../lighthouse_network/src/rpc/handler.rs | 210 +++---- beacon_node/lighthouse_network/src/rpc/mod.rs | 88 ++- .../lighthouse_network/src/service/mod.rs | 91 +-- .../lighthouse_network/src/service/utils.rs | 48 +- .../lighthouse_network/tests/common.rs | 2 +- beacon_node/network/src/metrics.rs | 29 +- beacon_node/network/src/service.rs | 9 +- lighthouse/Cargo.toml | 2 +- 17 files changed, 526 insertions(+), 605 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a1af70bb15..fba036d7086 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -336,6 +336,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "attohttpc" version = "0.16.3" @@ -478,9 +491,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "base64ct" @@ -1748,7 +1761,7 @@ dependencies = [ "hex", "hkdf", "lazy_static", - "libp2p-core", + "libp2p-core 0.40.1", "libp2p-identity", "lru 0.7.8", "more-asserts", @@ -1932,7 +1945,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe81b5c06ecfdbc71dd845216f225f53b62a10cb8a16c946836a3467f701d05b" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "ed25519-dalek", "hex", @@ -1945,18 +1958,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "enum-as-inner" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "enum-as-inner" version = "0.6.0" @@ -2115,7 +2116,7 @@ dependencies = [ "proto_array", "psutil", "reqwest", - "ring", + "ring 0.16.20", "sensitive_url", "serde", "serde_json", @@ -2156,7 +2157,7 @@ dependencies = [ "bls", "hex", "num-bigint-dig", - "ring", + "ring 0.16.20", "sha2 0.9.9", "zeroize", ] @@ -2329,7 +2330,7 @@ checksum = "233dc6f434ce680dbabf4451ee3380cec46cb3c45d66660445a435619710dd35" dependencies = [ "cpufeatures", "lazy_static", - "ring", + "ring 0.16.20", "sha2 0.10.8", ] @@ -2780,9 +2781,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -2795,9 +2796,9 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.1.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b07bbbe7d7e78809544c6f718d875627addc73a7c3582447abc052cd3dc67e0" +checksum = "e1e2774cc104e198ef3d3e1ff4ab40f86fa3245d6cb6a3a46174f21463cee173" dependencies = [ "futures-timer", "futures-util", @@ -2805,9 +2806,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -2815,15 +2816,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -2833,9 +2834,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-lite" @@ -2854,9 +2855,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -2875,15 +2876,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-ticker" @@ -2904,9 +2905,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -3141,7 +3142,7 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "headers-core", "http", @@ -3192,6 +3193,52 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" +[[package]] +name = "hickory-proto" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "once_cell", + "rand", + "socket2 0.5.5", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot 0.12.1", + "rand", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "hkdf" version = "0.12.3" @@ -3449,17 +3496,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.4.0" @@ -3690,7 +3726,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2 0.5.4", + "socket2 0.5.5", "widestring 1.0.2", "windows-sys 0.48.0", "winreg", @@ -3772,9 +3808,9 @@ version = "8.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ - "base64 0.21.4", - "pem", - "ring", + "base64 0.21.5", + "pem 1.1.1", + "ring 0.16.20", "serde", "serde_json", "simple_asn1", @@ -3848,7 +3884,7 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" dependencies = [ - "spin", + "spin 0.5.2", ] [[package]] @@ -3979,9 +4015,9 @@ dependencies = [ [[package]] name = "libp2p" -version = "0.52.4" +version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94495eb319a85b70a68b85e2389a95bb3555c71c49025b78c691a854a7e6464" +checksum = "1252a34c693386829c34d44ccfbce86679d2a9a2c61f582863649bbf57f26260" dependencies = [ "bytes", "either", @@ -3991,7 +4027,7 @@ dependencies = [ "instant", "libp2p-allow-block-list", "libp2p-connection-limits", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-dns", "libp2p-gossipsub", "libp2p-identify", @@ -4013,11 +4049,11 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55b46558c5c0bf99d3e2a1a38fd54ff5476ca66dd1737b12466a1824dd219311" +checksum = "107b238b794cb83ab53b74ad5dcf7cca3200899b72fe662840cfb52f5b0a32e6" dependencies = [ - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-swarm", "void", @@ -4025,11 +4061,11 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f5107ad45cb20b2f6c3628c7b6014b996fcb13a88053f4569c872c6e30abf58" +checksum = "f2af4b1e1a1d6c5005a59b42287c0a526bcce94d8d688e2e9233b18eb843ceb4" dependencies = [ - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-swarm", "void", @@ -4063,30 +4099,58 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-core" +version = "0.41.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59c61b924474cf2c7edccca306693e798d797b85d004f4fef5689a7a3e6e8fe5" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "instant", + "libp2p-identity", + "multiaddr", + "multihash", + "multistream-select", + "once_cell", + "parking_lot 0.12.1", + "pin-project", + "quick-protobuf", + "rand", + "rw-stream-sink", + "smallvec", + "thiserror", + "tracing", + "unsigned-varint 0.7.2", + "void", +] + [[package]] name = "libp2p-dns" -version = "0.40.1" +version = "0.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6a18db73084b4da2871438f6239fef35190b05023de7656e877c18a00541a3b" +checksum = "d17cbcf7160ff35c3e8e560de4a068fe9d6cb777ea72840e48eb76ff9576c4b6" dependencies = [ "async-trait", "futures", - "libp2p-core", + "hickory-resolver", + "libp2p-core 0.41.1", "libp2p-identity", - "log", "parking_lot 0.12.1", "smallvec", - "trust-dns-resolver", + "tracing", ] [[package]] name = "libp2p-gossipsub" -version = "0.45.2" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f9624e2a843b655f1c1b8262b8d5de6f309413fca4d66f01bb0662429f84dc" +checksum = "201f0626acd8985fae7fdd318e86c954574b9eef2e5dec433936a19a0338393d" dependencies = [ - "asynchronous-codec", - "base64 0.21.4", + "asynchronous-codec 0.6.2", + "base64 0.21.5", "byteorder", "bytes", "either", @@ -4096,10 +4160,9 @@ dependencies = [ "getrandom", "hex_fmt", "instant", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-swarm", - "log", "prometheus-client", "quick-protobuf", "quick-protobuf-codec", @@ -4107,30 +4170,31 @@ dependencies = [ "regex", "sha2 0.10.8", "smallvec", + "tracing", "unsigned-varint 0.7.2", "void", ] [[package]] name = "libp2p-identify" -version = "0.43.1" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a96638a0a176bec0a4bcaebc1afa8cf909b114477209d7456ade52c61cd9cd" +checksum = "0544703553921214556f7567278b4f00cdd5052d29b0555ab88290cbfe54d81c" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "either", "futures", "futures-bounded", "futures-timer", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-swarm", - "log", "lru 0.12.0", "quick-protobuf", "quick-protobuf-codec", "smallvec", "thiserror", + "tracing", "void", ] @@ -4159,72 +4223,73 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.44.0" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a2567c305232f5ef54185e9604579a894fd0674819402bb0ac0246da82f52a" +checksum = "68f273a551ee9d0a79695f75afaeafb1371459dec69c29555e8a73a35608e96a" dependencies = [ "data-encoding", "futures", + "hickory-proto", "if-watch", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-swarm", - "log", "rand", "smallvec", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", - "trust-dns-proto 0.22.0", + "tracing", "void", ] [[package]] name = "libp2p-metrics" -version = "0.13.1" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239ba7d28f8d0b5d77760dc6619c05c7e88e74ec8fbbe97f856f20a56745e620" +checksum = "fdac91ae4f291046a3b2660c039a2830c931f84df2ee227989af92f7692d3357" dependencies = [ + "futures", "instant", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-swarm", - "once_cell", + "pin-project", "prometheus-client", ] [[package]] name = "libp2p-mplex" -version = "0.40.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93959ed08b6caf9810e067655e25f1362098797fef7c44d3103e63dcb6f0fabe" +checksum = "a5e895765e27e30217b25f7cb7ac4686dad1ff80bf2fdeffd1d898566900a924" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes", "futures", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", - "log", "nohash-hasher", "parking_lot 0.12.1", "rand", "smallvec", + "tracing", "unsigned-varint 0.7.2", ] [[package]] name = "libp2p-noise" -version = "0.43.2" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2eeec39ad3ad0677551907dd304b2f13f17208ccebe333bef194076cd2e8921" +checksum = "8ecd0545ce077f6ea5434bcb76e8d0fe942693b4380aaad0d34a358c2bd05793" dependencies = [ + "asynchronous-codec 0.7.0", "bytes", "curve25519-dalek", "futures", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", - "log", "multiaddr", "multihash", "once_cell", @@ -4234,81 +4299,81 @@ dependencies = [ "snow", "static_assertions", "thiserror", + "tracing", "x25519-dalek", "zeroize", ] [[package]] name = "libp2p-plaintext" -version = "0.40.1" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53cc5390cc2f77b7de2452fb6105892d0bb64e3cafa3bb346abb603f4cc93a09" +checksum = "67330af40b67217e746d42551913cfb7ad04c74fa300fb329660a56318590b3f" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes", "futures", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", - "log", "quick-protobuf", - "unsigned-varint 0.7.2", + "quick-protobuf-codec", + "tracing", ] [[package]] name = "libp2p-quic" -version = "0.9.3" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130d451d83f21b81eb7b35b360bc7972aeafb15177784adc56528db082e6b927" +checksum = "c02570b9effbc7c33331803104a8e9e53af7f2bdb4a2b61be420d6667545a0f5" dependencies = [ "bytes", "futures", "futures-timer", "if-watch", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-tls", - "log", "parking_lot 0.12.1", "quinn", "rand", - "ring", + "ring 0.16.20", "rustls", - "socket2 0.5.4", + "socket2 0.5.5", "thiserror", "tokio", + "tracing", ] [[package]] name = "libp2p-swarm" -version = "0.43.6" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48ff0e918a45fec0b6f27b30b0547a57c6c214aa8b13be3647b7701bfd8b8797" +checksum = "643ce11d87db56387631c9757b61b83435b434f94dc52ec267c1666e560e78b0" dependencies = [ "either", "fnv", "futures", "futures-timer", "instant", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "libp2p-swarm-derive", - "log", "multistream-select", "once_cell", "rand", "smallvec", "tokio", + "tracing", "void", ] [[package]] name = "libp2p-swarm-derive" -version = "0.33.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4d5ec2a3df00c7836d7696c136274c9c59705bac69133253696a6c932cd1d74" +checksum = "9b27d257436d01433a21da8da7688c83dba35826726161a328ff0989cd7af2dd" dependencies = [ "heck", - "proc-macro-warning", "proc-macro2", "quote", "syn 2.0.38", @@ -4316,33 +4381,33 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.40.1" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b558dd40d1bcd1aaaed9de898e9ec6a436019ecc2420dd0016e712fbb61c5508" +checksum = "8b2460fc2748919adff99ecbc1aab296e4579e41f374fb164149bd2c9e529d4c" dependencies = [ "futures", "futures-timer", "if-watch", "libc", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", - "log", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", + "tracing", ] [[package]] name = "libp2p-tls" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8218d1d5482b122ccae396bbf38abdcb283ecc96fa54760e1dfd251f0546ac61" +checksum = "93ce7e3c2e7569d685d08ec795157981722ff96e9e9f9eae75df3c29d02b07a5" dependencies = [ "futures", "futures-rustls", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-identity", "rcgen", - "ring", + "ring 0.16.20", "rustls", "rustls-webpki", "thiserror", @@ -4352,30 +4417,30 @@ dependencies = [ [[package]] name = "libp2p-upnp" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82775a47b34f10f787ad3e2a22e2c1541e6ebef4fe9f28f3ac553921554c94c1" +checksum = "963eb8a174f828f6a51927999a9ab5e45dfa9aa2aa5fed99aa65f79de6229464" dependencies = [ "futures", "futures-timer", "igd-next", - "libp2p-core", + "libp2p-core 0.41.1", "libp2p-swarm", - "log", "tokio", + "tracing", "void", ] [[package]] name = "libp2p-yamux" -version = "0.44.1" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eedcb62824c4300efb9cfd4e2a6edaf3ca097b9e68b36dabe45a44469fd6a85" +checksum = "751f4778f71bc3db1ccf2451e7f4484463fec7f00c1ac2680e39c8368c23aae8" dependencies = [ "futures", - "libp2p-core", - "log", + "libp2p-core 0.41.1", "thiserror", + "tracing", "yamux", ] @@ -4518,7 +4583,6 @@ dependencies = [ "lazy_static", "libp2p", "libp2p-mplex", - "libp2p-quic", "lighthouse_metrics", "lighthouse_version", "lru 0.7.8", @@ -5604,6 +5668,16 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "pem" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" +dependencies = [ + "base64 0.21.5", + "serde", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5790,7 +5864,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "byteorder", "bytes", "fallible-iterator", @@ -5928,17 +6002,6 @@ version = "0.5.20+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" -[[package]] -name = "proc-macro-warning" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1eaa7fa0aa1929ffdf7eeb6eac234dde6268914a14ad44d23521ab6a9b258e" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.38", -] - [[package]] name = "proc-macro2" version = "1.0.69" @@ -5980,9 +6043,9 @@ dependencies = [ [[package]] name = "prometheus-client" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" +checksum = "510c4f1c9d81d556458f94c98f857748130ea9737bbd6053da497503b26ea63c" dependencies = [ "dtoa", "itoa", @@ -6060,7 +6123,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ededb1cd78531627244d51dd0c7139fbe736c7d57af0092a76f0ffb2f56e98" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes", "quick-protobuf", "thiserror", @@ -6115,7 +6178,7 @@ checksum = "2c78e758510582acc40acb90458401172d41f1016f8c9dde89e49677afb7eec1" dependencies = [ "bytes", "rand", - "ring", + "ring 0.16.20", "rustc-hash", "rustls", "slab", @@ -6132,7 +6195,7 @@ checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" dependencies = [ "bytes", "libc", - "socket2 0.5.4", + "socket2 0.5.5", "tracing", "windows-sys 0.48.0", ] @@ -6240,12 +6303,12 @@ dependencies = [ [[package]] name = "rcgen" -version = "0.10.0" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffbe84efe2f38dea12e9bfc1f65377fdf03e53a18cb3b995faedf7934c7e785b" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" dependencies = [ - "pem", - "ring", + "pem 3.0.2", + "ring 0.16.20", "time", "yasna", ] @@ -6338,7 +6401,7 @@ version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "encoding_rs", "futures-core", @@ -6417,12 +6480,26 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys 0.48.0", +] + [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -6568,12 +6645,12 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.7" +version = "0.21.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" +checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", - "ring", + "ring 0.17.5", "rustls-webpki", "sct", ] @@ -6584,17 +6661,17 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", ] [[package]] name = "rustls-webpki" -version = "0.101.6" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring", - "untrusted", + "ring 0.17.5", + "untrusted 0.9.0", ] [[package]] @@ -6714,8 +6791,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -7267,7 +7344,7 @@ dependencies = [ "chacha20poly1305", "curve25519-dalek", "rand_core", - "ring", + "ring 0.16.20", "rustc_version", "sha2 0.10.8", "subtle", @@ -7285,9 +7362,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys 0.48.0", @@ -7299,6 +7376,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "spki" version = "0.6.0" @@ -7682,18 +7765,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", @@ -7827,7 +7910,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -7883,7 +7966,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", "tokio-util 0.7.9", "whoami", @@ -8136,78 +8219,6 @@ dependencies = [ "rlp", ] -[[package]] -name = "trust-dns-proto" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner 0.5.1", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.2.3", - "ipnet", - "lazy_static", - "rand", - "smallvec", - "socket2 0.4.9", - "thiserror", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "trust-dns-proto" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "559ac980345f7f5020883dd3bcacf176355225e01916f8c2efecad7534f682c6" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner 0.6.0", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.4.0", - "ipnet", - "once_cell", - "rand", - "smallvec", - "thiserror", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "trust-dns-resolver" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c723b0e608b24ad04c73b2607e0241b2c98fd79795a95e98b068b6966138a29d" -dependencies = [ - "cfg-if", - "futures-util", - "ipconfig", - "lru-cache", - "once_cell", - "parking_lot 0.12.1", - "rand", - "resolv-conf", - "smallvec", - "thiserror", - "tokio", - "tracing", - "trust-dns-proto 0.23.1", -] - [[package]] name = "try-lock" version = "0.2.4" @@ -8359,7 +8370,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes", ] @@ -8369,6 +8380,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "unused_port" version = "0.1.0" @@ -8385,7 +8402,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" dependencies = [ "form_urlencoded", - "idna 0.4.0", + "idna", "percent-encoding", ] @@ -8432,7 +8449,7 @@ dependencies = [ "parking_lot 0.12.1", "rand", "reqwest", - "ring", + "ring 0.16.20", "safe_arith", "sensitive_url", "serde", diff --git a/Dockerfile b/Dockerfile index 878a3602bd2..a8dadf2ad57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.69.0-bullseye AS builder +FROM rust:1.73.0-bullseye AS builder RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev COPY . lighthouse ARG FEATURES @@ -15,4 +15,4 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-reco ca-certificates \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -COPY --from=builder /usr/local/cargo/bin/lighthouse /usr/local/bin/lighthouse \ No newline at end of file +COPY --from=builder /usr/local/cargo/bin/lighthouse /usr/local/bin/lighthouse diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index cedf347b9a8..bfd55c3beb0 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -68,7 +68,7 @@ pub struct ClientBuilder { eth1_service: Option, network_globals: Option>>, network_senders: Option>, - gossipsub_registry: Option, + libp2p_registry: Option, db_path: Option, freezer_db_path: Option, http_api_config: http_api::Config, @@ -102,7 +102,7 @@ where eth1_service: None, network_globals: None, network_senders: None, - gossipsub_registry: None, + libp2p_registry: None, db_path: None, freezer_db_path: None, http_api_config: <_>::default(), @@ -531,7 +531,7 @@ where .ok_or("network requires beacon_processor_channels")?; // If gossipsub metrics are required we build a registry to record them - let mut gossipsub_registry = if config.metrics_enabled { + let mut libp2p_registry = if config.metrics_enabled { Some(Registry::default()) } else { None @@ -541,9 +541,7 @@ where beacon_chain, config, context.executor, - gossipsub_registry - .as_mut() - .map(|registry| registry.sub_registry_with_prefix("gossipsub")), + libp2p_registry.as_mut(), beacon_processor_channels.beacon_processor_tx.clone(), beacon_processor_channels.work_reprocessing_tx.clone(), ) @@ -552,7 +550,7 @@ where self.network_globals = Some(network_globals); self.network_senders = Some(network_senders); - self.gossipsub_registry = gossipsub_registry; + self.libp2p_registry = libp2p_registry; Ok(self) } @@ -718,7 +716,7 @@ where chain: self.beacon_chain.clone(), db_path: self.db_path.clone(), freezer_db_path: self.freezer_db_path.clone(), - gossipsub_registry: self.gossipsub_registry.take().map(std::sync::Mutex::new), + gossipsub_registry: self.libp2p_registry.take().map(std::sync::Mutex::new), log: log.clone(), }); diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 125bbe9bc2f..356a6a203b6 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -39,17 +39,16 @@ directory = { workspace = true } regex = { workspace = true } strum = { workspace = true } superstruct = { workspace = true } -prometheus-client = "0.21.0" +prometheus-client = "0.22.0" unused_port = { workspace = true } delay_map = { workspace = true } void = "1" -libp2p-quic= { version = "0.9.2", features=["tokio"]} -libp2p-mplex = "0.40.0" +libp2p-mplex = "0.41.0" [dependencies.libp2p] -version = "0.52" +version = "0.53" default-features = false -features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa"] +features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"] [dev-dependencies] slog-term = { workspace = true } diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 47bd7b86679..f24b94c9ecb 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -455,12 +455,6 @@ pub fn gossipsub_config( fork_context: Arc, gossipsub_config_params: GossipsubConfigParams, ) -> gossipsub::Config { - // The function used to generate a gossipsub message id - // We use the first 8 bytes of SHA256(topic, data) for content addressing - let fast_gossip_message_id = |message: &gossipsub::RawMessage| { - let data = [message.topic.as_str().as_bytes(), &message.data].concat(); - gossipsub::FastMessageId::from(&Sha256::digest(&data)[..8]) - }; fn prefix( prefix: [u8; 4], message: &gossipsub::Message, @@ -518,7 +512,6 @@ pub fn gossipsub_config( .validation_mode(gossipsub::ValidationMode::Anonymous) .duplicate_cache_time(DUPLICATE_CACHE_TIME) .message_id_fn(gossip_message_id) - .fast_message_id_fn(fast_gossip_message_id) .allow_self_origin(true) .build() .expect("valid gossipsub configuration") diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 388790568f0..0894dc65bd6 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -29,7 +29,7 @@ pub use libp2p::{ identity::PeerId, swarm::{ dummy::ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, - PollParameters, SubstreamProtocol, ToSwarm, + SubstreamProtocol, ToSwarm, }, }; use lru::LruCache; @@ -955,11 +955,7 @@ impl NetworkBehaviour for Discovery { } // Main execution loop to drive the behaviour - fn poll( - &mut self, - cx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { if !self.started { return Poll::Pending; } @@ -1041,7 +1037,7 @@ impl NetworkBehaviour for Discovery { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { self.on_dial_failure(peer_id, error) @@ -1114,17 +1110,7 @@ impl NetworkBehaviour for Discovery { Err(e) => warn!(self.log, "Failed to update ENR"; "error" => ?e), } } - FromSwarm::ConnectionEstablished(_) - | FromSwarm::ConnectionClosed(_) - | FromSwarm::AddressChange(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => { + _ => { // Ignore events not relevant to discovery } } diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index 7467fb7f067..ea1ab07e3e3 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -115,7 +115,6 @@ pub use config::Config as NetworkConfig; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discv5; pub use libp2p; -pub use libp2p::bandwidth::BandwidthSinks; pub use libp2p::gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index 4650553d89d..ae02b689d81 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -1,6 +1,3 @@ -use libp2p::bandwidth::BandwidthSinks; -use std::sync::Arc; - pub use lighthouse_metrics::*; lazy_static! { @@ -187,46 +184,3 @@ pub fn scrape_discovery_metrics() { set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64); set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64); } - -/// Aggregated `BandwidthSinks` of tcp and quic transports -/// used in libp2p. -pub struct AggregatedBandwidthSinks { - tcp_sinks: Arc, - quic_sinks: Option>, -} - -impl AggregatedBandwidthSinks { - /// Create a new `AggregatedBandwidthSinks`. - pub fn new(tcp_sinks: Arc, quic_sinks: Option>) -> Self { - AggregatedBandwidthSinks { - tcp_sinks, - quic_sinks, - } - } - - /// Total QUIC inbound bandwidth. - pub fn total_quic_inbound(&self) -> u64 { - self.quic_sinks - .as_ref() - .map(|q| q.total_inbound()) - .unwrap_or_default() - } - - /// Total TCP inbound bandwidth. - pub fn total_tcp_inbound(&self) -> u64 { - self.tcp_sinks.total_inbound() - } - - /// Total QUIC outbound bandwidth. - pub fn total_quic_outbound(&self) -> u64 { - self.quic_sinks - .as_ref() - .map(|q| q.total_outbound()) - .unwrap_or_default() - } - - /// Total TCP outbound bandwidth. - pub fn total_tcp_outbound(&self) -> u64 { - self.tcp_sinks.total_outbound() - } -} diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 0617c8fa372..da205d169ac 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -9,7 +9,7 @@ use libp2p::identity::PeerId; use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::dummy::ConnectionHandler; -use libp2p::swarm::{ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, ToSwarm}; +use libp2p::swarm::{ConnectionDenied, ConnectionId, NetworkBehaviour, ToSwarm}; use slog::{debug, error, trace}; use types::EthSpec; @@ -36,11 +36,7 @@ impl NetworkBehaviour for PeerManager { // no events from the dummy handler } - fn poll( - &mut self, - cx: &mut Context<'_>, - _params: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { // perform the heartbeat when necessary while self.heartbeat.poll_tick(cx).is_ready() { self.heartbeat(); @@ -121,7 +117,7 @@ impl NetworkBehaviour for PeerManager { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, @@ -155,15 +151,9 @@ impl NetworkBehaviour for PeerManager { // TODO: we likely want to check this against our assumed external tcp // address } - FromSwarm::AddressChange(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) => { + _ => { + // NOTE: FromSwarm is a non exhaustive enum so updates should be based on release + // notes more than compiler feedback // The rest of the events we ignore since they are handled in their associated // `SwarmEvent` } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 8d1563fafa8..03f4761ff08 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -12,8 +12,7 @@ use futures::prelude::*; use futures::{Sink, SinkExt}; use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, - SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p::swarm::Stream; use slog::{crit, debug, trace, warn}; @@ -51,7 +50,12 @@ impl SubstreamId { type InboundSubstream = InboundFramed; /// Events the handler emits to the behaviour. -pub type HandlerEvent = Result, HandlerErr>; +#[derive(Debug)] +pub enum HandlerEvent { + Ok(RPCReceived), + Err(HandlerErr), + Close(RPCError), +} /// An error encountered by the handler. #[derive(Debug)] @@ -249,11 +253,12 @@ where } // We now drive to completion communications already dialed/established while let Some((id, req)) = self.dial_queue.pop() { - self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::Disconnected, - proto: req.versioned_protocol().protocol(), - id, - })); + self.events_out + .push(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto: req.versioned_protocol().protocol(), + id, + })); } // Queue our goodbye message. @@ -273,11 +278,13 @@ where HandlerState::Active => { self.dial_queue.push((id, req)); } - _ => self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::Disconnected, - proto: req.versioned_protocol().protocol(), - id, - })), + _ => self + .events_out + .push(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto: req.versioned_protocol().protocol(), + id, + })), } } @@ -296,7 +303,7 @@ where }; // If the response we are sending is an error, report back for handling if let RPCCodedResponse::Error(ref code, ref reason) = response { - self.events_out.push(Err(HandlerErr::Inbound { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { error: RPCError::ErrorResponse(*code, reason.to_string()), proto: inbound_info.protocol, id: inbound_id, @@ -320,7 +327,6 @@ where { type FromBehaviour = RPCSend; type ToBehaviour = HandlerEvent; - type Error = RPCError; type InboundProtocol = RPCProtocol; type OutboundProtocol = OutboundRequestContainer; type OutboundOpenInfo = (Id, OutboundRequest); // Keep track of the id and the request @@ -342,28 +348,23 @@ where } } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { // Check that we don't have outbound items pending for dialing, nor dialing, nor // established. Also check that there are no established inbound substreams. // Errors and events need to be reported back, so check those too. - let should_shutdown = match self.state { + match self.state { HandlerState::ShuttingDown(_) => { - self.dial_queue.is_empty() - && self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - && self.events_out.is_empty() - && self.dial_negotiated == 0 + !self.dial_queue.is_empty() + || !self.outbound_substreams.is_empty() + || !self.inbound_substreams.is_empty() + || !self.events_out.is_empty() + || !self.dial_negotiated != 0 } HandlerState::Deactivated => { // Regardless of events, the timeout has expired. Force the disconnect. - true + false } - _ => false, - }; - if should_shutdown { - KeepAlive::No - } else { - KeepAlive::Yes + _ => true, } } @@ -371,12 +372,7 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { if let Some(waker) = &self.waker { if waker.will_wake(cx.waker()) { @@ -400,7 +396,9 @@ where Poll::Ready(_) => { self.state = HandlerState::Deactivated; debug!(self.log, "Handler deactivated"); - return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::Disconnected)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Close(RPCError::Disconnected), + )); } Poll::Pending => {} }; @@ -414,7 +412,7 @@ where if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) { // the delay has been removed info.delay_key = None; - self.events_out.push(Err(HandlerErr::Inbound { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { error: RPCError::StreamTimeout, proto: info.protocol, id: *inbound_id.get_ref(), @@ -432,9 +430,11 @@ where Poll::Ready(Some(Err(e))) => { warn!(self.log, "Inbound substream poll failed"; "error" => ?e); // drops the peer if we cannot read the delay queue - return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError( - "Could not poll inbound stream timer", - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Close(RPCError::InternalError( + "Could not poll inbound stream timer", + )), + )); } Poll::Pending | Poll::Ready(None) => break, } @@ -453,18 +453,20 @@ where error: RPCError::StreamTimeout, }; // notify the user - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( - outbound_err, - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Err(outbound_err), + )); } else { crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref()); } } Poll::Ready(Some(Err(e))) => { warn!(self.log, "Outbound substream poll failed"; "error" => ?e); - return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError( - "Could not poll outbound stream timer", - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Close(RPCError::InternalError( + "Could not poll outbound stream timer", + )), + )); } Poll::Pending | Poll::Ready(None) => break, } @@ -516,7 +518,7 @@ where // If there was an error in shutting down the substream report the // error if let Err(error) = res { - self.events_out.push(Err(HandlerErr::Inbound { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { error, proto: info.protocol, id: *id, @@ -528,7 +530,7 @@ where if info.pending_items.back().map(|l| l.close_after()) == Some(false) { // if the request was still active, report back to cancel it - self.events_out.push(Err(HandlerErr::Inbound { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { error: RPCError::Disconnected, proto: info.protocol, id: *id, @@ -613,7 +615,7 @@ where self.inbound_substreams_delay.remove(delay_key); } // Report the error that occurred during the send process - self.events_out.push(Err(HandlerErr::Inbound { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { error, proto: info.protocol, id: *id, @@ -666,11 +668,12 @@ where } if deactivated => { // the handler is deactivated. Close the stream entry.get_mut().state = OutboundSubstreamState::Closing(substream); - self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::Disconnected, - proto: entry.get().proto, - id: entry.get().req_id, - })) + self.events_out + .push(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto: entry.get().proto, + id: entry.get().req_id, + })) } OutboundSubstreamState::RequestPendingResponse { mut substream, @@ -711,14 +714,18 @@ where let received = match response { RPCCodedResponse::StreamTermination(t) => { - Ok(RPCReceived::EndOfStream(id, t)) + HandlerEvent::Ok(RPCReceived::EndOfStream(id, t)) + } + RPCCodedResponse::Success(resp) => { + HandlerEvent::Ok(RPCReceived::Response(id, resp)) + } + RPCCodedResponse::Error(ref code, ref r) => { + HandlerEvent::Err(HandlerErr::Outbound { + id, + proto, + error: RPCError::ErrorResponse(*code, r.to_string()), + }) } - RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)), - RPCCodedResponse::Error(ref code, ref r) => Err(HandlerErr::Outbound { - id, - proto, - error: RPCError::ErrorResponse(*code, r.to_string()), - }), }; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(received)); @@ -736,9 +743,12 @@ where // notify the application error if request.expected_responses() > 1 { // return an end of stream result - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok( - RPCReceived::EndOfStream(request_id, request.stream_termination()), - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Ok(RPCReceived::EndOfStream( + request_id, + request.stream_termination(), + )), + )); } // else we return an error, stream should not have closed early. @@ -747,9 +757,9 @@ where proto: request.versioned_protocol().protocol(), error: RPCError::IncompleteStream, }; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( - outbound_err, - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Err(outbound_err), + )); } Poll::Pending => { entry.get_mut().state = @@ -765,9 +775,9 @@ where error: e, }; entry.remove_entry(); - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( - outbound_err, - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Err(outbound_err), + )); } }, OutboundSubstreamState::Closing(mut substream) => { @@ -788,9 +798,12 @@ where // termination to the application if let Some(termination) = protocol.terminator() { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok( - RPCReceived::EndOfStream(request_id, termination), - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Ok(RPCReceived::EndOfStream( + request_id, + termination, + )), + )); } } Poll::Pending => { @@ -831,7 +844,9 @@ where && self.events_out.is_empty() && self.dial_negotiated == 0 { - return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::Disconnected)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Close(RPCError::Disconnected), + )); } } @@ -859,24 +874,9 @@ where ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { self.on_dial_upgrade_error(info, error) } - ConnectionEvent::ListenUpgradeError(libp2p::swarm::handler::ListenUpgradeError { - info: _, - error: _, /* RPCError */ - }) => { - // This is going to be removed in the next libp2p release. I think its fine to do - // nothing. - } - ConnectionEvent::LocalProtocolsChange(_) => { - // This shouldn't effect this handler, we will still negotiate streams if we support - // the protocol as usual. - } - ConnectionEvent::RemoteProtocolsChange(_) => { - // This shouldn't effect this handler, we will still negotiate streams if we support - // the protocol as usual. - } - ConnectionEvent::AddressChange(_) => { - // We dont care about these changes as they have no bearing on our RPC internal - // logic. + _ => { + // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on + // release notes more than compiler feedback } } } @@ -919,7 +919,7 @@ where }, ); } else { - self.events_out.push(Err(HandlerErr::Inbound { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { id: self.current_inbound_substream_id, proto: req.versioned_protocol().protocol(), error: RPCError::HandlerRejected, @@ -933,7 +933,7 @@ where self.shutdown(None); } - self.events_out.push(Ok(RPCReceived::Request( + self.events_out.push(HandlerEvent::Ok(RPCReceived::Request( self.current_inbound_substream_id, req, ))); @@ -953,11 +953,12 @@ where // accept outbound connections only if the handler is not deactivated if matches!(self.state, HandlerState::Deactivated) { - self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::Disconnected, - proto, - id, - })); + self.events_out + .push(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto, + id, + })); } // add the stream to substreams if we expect a response, otherwise drop the stream. @@ -1030,11 +1031,12 @@ where self.dial_negotiated -= 1; self.outbound_io_error_retries = 0; - self.events_out.push(Err(HandlerErr::Outbound { - error, - proto: req.versioned_protocol().protocol(), - id, - })); + self.events_out + .push(HandlerEvent::Err(HandlerErr::Outbound { + error, + proto: req.versioned_protocol().protocol(), + id, + })); } } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index ab87a533d69..d6686ff1b11 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -5,10 +5,9 @@ //! syncing. use futures::future::FutureExt; -use handler::{HandlerEvent, RPCHandler}; +use handler::RPCHandler; use libp2p::swarm::{ - handler::ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, - ToSwarm, + handler::ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm, }; use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent}; use libp2p::PeerId; @@ -20,7 +19,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use types::{EthSpec, ForkContext}; -pub(crate) use handler::HandlerErr; +pub(crate) use handler::{HandlerErr, HandlerEvent}; pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse}; pub(crate) use protocol::InboundRequest; @@ -282,25 +281,9 @@ where Ok(handler) } - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionClosed(_) - | FromSwarm::ConnectionEstablished(_) - | FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => { - // Rpc Behaviour does not act on these swarm events. We use a comprehensive match - // statement to ensure future events are dealt with appropriately. - } - } + fn on_swarm_event(&mut self, _event: FromSwarm) { + // NOTE: FromSwarm is a non exhaustive enum so updates should be based on release notes more + // than compiler feedback } fn on_connection_handler_event( @@ -309,7 +292,7 @@ where conn_id: ConnectionId, event: ::ToBehaviour, ) { - if let Ok(RPCReceived::Request(ref id, ref req)) = event { + if let HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) = event { if let Some(limiter) = self.limiter.as_mut() { // check if the request is conformant to the quota match limiter.allows(&peer_id, req) { @@ -374,11 +357,7 @@ where } } - fn poll( - &mut self, - cx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { // let the rate limiter prune. if let Some(limiter) = self.limiter.as_mut() { let _ = limiter.poll_unpin(cx); @@ -409,27 +388,38 @@ where serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?; - let (msg_kind, protocol) = match &self.event { - Ok(received) => match received { - RPCReceived::Request(_, req) => ("request", req.versioned_protocol().protocol()), - RPCReceived::Response(_, res) => ("response", res.protocol()), - RPCReceived::EndOfStream(_, end) => ( - "end_of_stream", - match end { - ResponseTermination::BlocksByRange => Protocol::BlocksByRange, - ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, - ResponseTermination::BlobsByRange => Protocol::BlobsByRange, - ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, - }, - ), - }, - Err(error) => match &error { - HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto), - HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto), - }, + match &self.event { + HandlerEvent::Ok(received) => { + let (msg_kind, protocol) = match received { + RPCReceived::Request(_, req) => { + ("request", req.versioned_protocol().protocol()) + } + RPCReceived::Response(_, res) => ("response", res.protocol()), + RPCReceived::EndOfStream(_, end) => ( + "end_of_stream", + match end { + ResponseTermination::BlocksByRange => Protocol::BlocksByRange, + ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::BlobsByRange => Protocol::BlobsByRange, + ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, + }, + ), + }; + serializer.emit_str("msg_kind", msg_kind)?; + serializer.emit_arguments("protocol", &format_args!("{}", protocol))?; + } + HandlerEvent::Err(error) => { + let (msg_kind, protocol) = match &error { + HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto), + HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto), + }; + serializer.emit_str("msg_kind", msg_kind)?; + serializer.emit_arguments("protocol", &format_args!("{}", protocol))?; + } + HandlerEvent::Close(err) => { + serializer.emit_arguments("handler_close", &format_args!("{}", err))?; + } }; - serializer.emit_str("msg_kind", msg_kind)?; - serializer.emit_arguments("protocol", &format_args!("{}", protocol))?; slog::Result::Ok(()) } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index a38b7b2f2ef..3c2a3f5a95f 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -4,7 +4,6 @@ use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad}; use crate::discovery::{ subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS, }; -use crate::metrics::AggregatedBandwidthSinks; use crate::peer_manager::{ config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, @@ -127,8 +126,6 @@ pub struct Network { /// The interval for updating gossipsub scores update_gossipsub_scores: tokio::time::Interval, gossip_cache: GossipCache, - /// The bandwidth logger for the underlying libp2p transport. - pub bandwidth: AggregatedBandwidthSinks, /// This node's PeerId. pub local_peer_id: PeerId, /// Logger for behaviour actions. @@ -139,10 +136,11 @@ pub struct Network { impl Network { pub async fn new( executor: task_executor::TaskExecutor, - ctx: ServiceContext<'_>, + mut ctx: ServiceContext<'_>, log: &slog::Logger, ) -> error::Result<(Self, Arc>)> { let log = log.new(o!("service"=> "libp2p")); + let mut config = ctx.config.clone(); trace!(log, "Libp2p Service starting"); // initialise the node's ID @@ -257,10 +255,13 @@ impl Network { gossipsub_config_params, ); - // If metrics are enabled for gossipsub build the configuration - let gossipsub_metrics = ctx - .gossipsub_registry - .map(|registry| (registry, Default::default())); + // If metrics are enabled for libp2p build the configuration + let gossipsub_metrics = ctx.libp2p_registry.as_mut().map(|registry| { + ( + registry.sub_registry_with_prefix("gossipsub"), + Default::default(), + ) + }); let snappy_transform = SnappyTransform::new(config.gs_config.max_transmit_size()); let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform( @@ -366,9 +367,8 @@ impl Network { }; // Set up the transport - tcp/quic with noise and mplex - let (transport, bandwidth) = - build_transport(local_keypair.clone(), !config.disable_quic_support) - .map_err(|e| format!("Failed to build transport: {:?}", e))?; + let transport = build_transport(local_keypair.clone(), !config.disable_quic_support) + .map_err(|e| format!("Failed to build transport: {:?}", e))?; // use the executor for libp2p struct Executor(task_executor::TaskExecutor); @@ -379,20 +379,41 @@ impl Network { } // sets up the libp2p swarm. - let swarm = SwarmBuilder::with_existing_identity(local_keypair) - .with_tokio() - .with_other_transport(|_key| transport) - .expect("infalible") - .with_behaviour(|_| behaviour) - .expect("infalible") - .with_swarm_config(|_| { - libp2p::swarm::Config::with_executor(Executor(executor)) - .with_notify_handler_buffer_size( - std::num::NonZeroUsize::new(7).expect("Not zero"), - ) - .with_per_connection_event_buffer_size(4) - }) - .build(); + + let swarm = { + let builder = SwarmBuilder::with_existing_identity(local_keypair) + .with_tokio() + .with_other_transport(|_key| transport) + .expect("infalible"); + + // NOTE: adding bandwidth metrics changes the generics of the swarm, so types diverge + if let Some(libp2p_registry) = ctx.libp2p_registry { + builder + .with_bandwidth_metrics(libp2p_registry) + .with_behaviour(|_| behaviour) + .expect("infalible") + .with_swarm_config(|_| { + libp2p::swarm::Config::with_executor(Executor(executor)) + .with_notify_handler_buffer_size( + std::num::NonZeroUsize::new(7).expect("Not zero"), + ) + .with_per_connection_event_buffer_size(4) + }) + .build() + } else { + builder + .with_behaviour(|_| behaviour) + .expect("infalible") + .with_swarm_config(|_| { + libp2p::swarm::Config::with_executor(Executor(executor)) + .with_notify_handler_buffer_size( + std::num::NonZeroUsize::new(7).expect("Not zero"), + ) + .with_per_connection_event_buffer_size(4) + }) + .build() + } + }; let mut network = Network { swarm, @@ -403,7 +424,6 @@ impl Network { score_settings, update_gossipsub_scores, gossip_cache, - bandwidth, local_peer_id, log, }; @@ -1251,7 +1271,7 @@ impl Network { let handler_id = event.conn_id; // The METADATA and PING RPC responses are handled within the behaviour and not propagated match event.event { - Err(handler_err) => { + HandlerEvent::Err(handler_err) => { match handler_err { HandlerErr::Inbound { id: _, @@ -1286,7 +1306,7 @@ impl Network { } } } - Ok(RPCReceived::Request(id, request)) => { + HandlerEvent::Ok(RPCReceived::Request(id, request)) => { let peer_request_id = (handler_id, id); match request { /* Behaviour managed protocols: Ping and Metadata */ @@ -1385,7 +1405,7 @@ impl Network { } } } - Ok(RPCReceived::Response(id, resp)) => { + HandlerEvent::Ok(RPCReceived::Response(id, resp)) => { match resp { /* Behaviour managed protocols */ RPCResponse::Pong(ping) => { @@ -1422,7 +1442,7 @@ impl Network { } } } - Ok(RPCReceived::EndOfStream(id, termination)) => { + HandlerEvent::Ok(RPCReceived::EndOfStream(id, termination)) => { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), @@ -1431,6 +1451,11 @@ impl Network { }; self.build_response(id, peer_id, response) } + HandlerEvent::Close(_) => { + let _ = self.swarm.disconnect_peer_id(peer_id); + // NOTE: we wait for the swarm to report the connection as actually closed + None + } } } @@ -1624,7 +1649,11 @@ impl Network { None } } - SwarmEvent::Dialing { .. } => None, + _ => { + // NOTE: SwarmEvent is a non exhaustive enum so updates should be based on + // release notes more than compiler feedback + None + } }; if let Some(ev) = maybe_event { diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index ab6b3a771bd..5fe5946ce29 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -1,4 +1,3 @@ -use crate::metrics::AggregatedBandwidthSinks; use crate::multiaddr::Protocol; use crate::rpc::{MetaData, MetaDataV1, MetaDataV2}; use crate::types::{ @@ -9,8 +8,8 @@ use futures::future::Either; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::gossipsub; use libp2p::identity::{secp256k1, Keypair}; -use libp2p::{core, noise, yamux, PeerId, Transport, TransportExt}; -use libp2p_quic; +use libp2p::quic; +use libp2p::{core, noise, yamux, PeerId, Transport}; use prometheus_client::registry::Registry; use slog::{debug, warn}; use ssz::Decode; @@ -34,7 +33,7 @@ pub struct Context<'a> { pub enr_fork_id: EnrForkId, pub fork_context: Arc, pub chain_spec: &'a ChainSpec, - pub gossipsub_registry: Option<&'a mut Registry>, + pub libp2p_registry: Option<&'a mut Registry>, } type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; @@ -44,7 +43,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; pub fn build_transport( local_private_key: Keypair, quic_support: bool, -) -> std::io::Result<(BoxedTransport, AggregatedBandwidthSinks)> { +) -> std::io::Result { // mplex config let mut mplex_config = libp2p_mplex::MplexConfig::new(); mplex_config.set_max_buffer_size(256); @@ -53,44 +52,35 @@ pub fn build_transport( // yamux config let mut yamux_config = yamux::Config::default(); yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read()); - // Creates the TCP transport layer - let (tcp, tcp_bandwidth) = - libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) - .upgrade(core::upgrade::Version::V1) - .authenticate(generate_noise_config(&local_private_key)) - .multiplex(core::upgrade::SelectUpgrade::new( - yamux_config, - mplex_config, - )) - .timeout(Duration::from_secs(10)) - .with_bandwidth_logging(); - - let (transport, bandwidth) = if quic_support { + let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) + .upgrade(core::upgrade::Version::V1) + .authenticate(generate_noise_config(&local_private_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux_config, + mplex_config, + )) + .timeout(Duration::from_secs(10)); + let transport = if quic_support { // Enables Quic // The default quic configuration suits us for now. - let quic_config = libp2p_quic::Config::new(&local_private_key); - let (quic, quic_bandwidth) = - libp2p_quic::tokio::Transport::new(quic_config).with_bandwidth_logging(); + let quic_config = quic::Config::new(&local_private_key); + let quic = quic::tokio::Transport::new(quic_config); let transport = tcp .or_transport(quic) .map(|either_output, _| match either_output { Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed(); - ( - transport, - AggregatedBandwidthSinks::new(tcp_bandwidth, Some(quic_bandwidth)), - ) + }); + transport.boxed() } else { - (tcp, AggregatedBandwidthSinks::new(tcp_bandwidth, None)) + tcp.boxed() }; // Enables DNS over the transport. let transport = libp2p::dns::tokio::Transport::system(transport)?.boxed(); - Ok((transport, bandwidth)) + Ok(transport) } // Useful helper functions for debugging. Currently not used in the client. diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index dc77e3efe21..9585dcf5af1 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -113,7 +113,7 @@ pub async fn build_libp2p_instance( enr_fork_id: EnrForkId::default(), fork_context: Arc::new(fork_context(fork_name)), chain_spec: spec, - gossipsub_registry: None, + libp2p_registry: None, }; Libp2pInstance( LibP2PService::new(executor, libp2p_context, &log) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 799953538a4..0509ed1ea7d 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -7,8 +7,8 @@ use beacon_chain::{ use fnv::FnvHashMap; pub use lighthouse_metrics::*; use lighthouse_network::{ - metrics::AggregatedBandwidthSinks, peer_manager::peerdb::client::ClientKind, types::GossipKind, - GossipTopic, Gossipsub, NetworkGlobals, + peer_manager::peerdb::client::ClientKind, types::GossipKind, GossipTopic, Gossipsub, + NetworkGlobals, }; use std::sync::Arc; use strum::IntoEnumIterator; @@ -223,12 +223,6 @@ lazy_static! { lazy_static! { - /* - * Bandwidth metrics - */ - pub static ref LIBP2P_BYTES: Result = - try_create_int_counter_vec("libp2p_inbound_bytes", "The bandwidth over libp2p", &["direction", "transport"]); - /* * Sync related metrics */ @@ -327,25 +321,6 @@ lazy_static! { ); } -pub fn update_bandwidth_metrics(bandwidth: &AggregatedBandwidthSinks) { - if let Some(tcp_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "tcp"]) { - tcp_in_bandwidth.reset(); - tcp_in_bandwidth.inc_by(bandwidth.total_tcp_inbound()); - } - if let Some(tcp_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "tcp"]) { - tcp_out_bandwidth.reset(); - tcp_out_bandwidth.inc_by(bandwidth.total_tcp_outbound()); - } - if let Some(quic_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "quic"]) { - quic_in_bandwidth.reset(); - quic_in_bandwidth.inc_by(bandwidth.total_quic_inbound()); - } - if let Some(quic_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "quic"]) { - quic_out_bandwidth.reset(); - quic_out_bandwidth.inc_by(bandwidth.total_quic_outbound()); - } -} - pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) { inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]); } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 03715dd99f2..17760cef592 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -219,7 +219,7 @@ impl NetworkService { beacon_chain: Arc>, config: &NetworkConfig, executor: task_executor::TaskExecutor, - gossipsub_registry: Option<&'_ mut Registry>, + libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, beacon_processor_reprocess_tx: mpsc::Sender, ) -> error::Result<( @@ -285,7 +285,7 @@ impl NetworkService { enr_fork_id, fork_context: fork_context.clone(), chain_spec: &beacon_chain.spec, - gossipsub_registry, + libp2p_registry, }; // launch libp2p service @@ -380,7 +380,7 @@ impl NetworkService { beacon_chain: Arc>, config: &NetworkConfig, executor: task_executor::TaskExecutor, - gossipsub_registry: Option<&'_ mut Registry>, + libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, beacon_processor_reprocess_tx: mpsc::Sender, ) -> error::Result<(Arc>, NetworkSenders)> { @@ -388,7 +388,7 @@ impl NetworkService { beacon_chain, config, executor.clone(), - gossipsub_registry, + libp2p_registry, beacon_processor_send, beacon_processor_reprocess_tx, ) @@ -497,7 +497,6 @@ impl NetworkService { } } } - metrics::update_bandwidth_metrics(&self.libp2p.bandwidth); } }; executor.spawn(service_fut, "network"); diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 48b4eb037ab..42c8bea038a 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -4,7 +4,7 @@ version = "4.5.0" authors = ["Sigma Prime "] edition = { workspace = true } autotests = false -rust-version = "1.69.0" +rust-version = "1.73.0" [features] default = ["slasher-lmdb"]