Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): support eth/68 #1361

Merged
merged 37 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
aab2d1c
chore: impl eth/68
jinsankim Feb 15, 2023
736c57d
fix: clippy and nits
jinsankim Feb 15, 2023
f0b2f92
chore: check version for GetNodeData, NodeData
jinsankim Feb 15, 2023
6e0c6f4
chore: remove TODOs, check NewPooledTransactionHashes68 len of fields
jinsankim Feb 15, 2023
e3d2cad
chore: revise Capabilities & fix tests
jinsankim Feb 15, 2023
7ade142
test: capabilities_supports_eth()
jinsankim Feb 15, 2023
9c74b82
chore: revise types
jinsankim Feb 15, 2023
46963e0
fix: EthStream::new()
jinsankim Feb 15, 2023
012ef53
fix: fuzz tests
jinsankim Feb 15, 2023
9e8543a
chore: revise error handling for GetNodeData, NodeData
jinsankim Feb 15, 2023
651fc4d
chore: not enable Eth68
jinsankim Feb 16, 2023
6e0c51d
fix: test_session_established_with_highest_version
jinsankim Feb 16, 2023
ca74056
chore: use EthVersion instead of u8
jinsankim Feb 16, 2023
5ba25a6
Merge remote-tracking branch 'upstream/main' into feat-eth68
jinsankim Feb 18, 2023
b7b4b14
chore: change eth versions order as desc
jinsankim Feb 18, 2023
011eb73
chore: revise `EthMessage` doc
jinsankim Feb 18, 2023
a94b1da
chore: rename `NewPooledTransactionHashes` with `NewPooledTransaction…
jinsankim Feb 18, 2023
46e438c
fix: doc lint
jinsankim Feb 18, 2023
a8a6ec8
chore: revise comment on `version()`
jinsankim Feb 18, 2023
a9de801
chore: `PooledTransactions66` is not emitted by network
jinsankim Feb 18, 2023
12cc2a0
fix(test): test_send_at_capacity()
jinsankim Feb 18, 2023
6dad68b
chore: re-intro `NewPooledTransactionHashes` message
jinsankim Feb 18, 2023
e3eb950
fix(test): test_send_at_capacity
jinsankim Feb 18, 2023
f0f41fe
chore: use `NewPooledTransactionHashes` internally
jinsankim Feb 18, 2023
63abb51
chore: clean up and revert unnecessary changes
jinsankim Feb 18, 2023
03beade
chore: revert unnecessary change
jinsankim Feb 18, 2023
e124f9a
Update crates/net/network/src/session/active.rs
jinsankim Feb 19, 2023
d5da4f6
chore: use `match` for `try_into()`
jinsankim Feb 19, 2023
e1d007b
chore: rename `pooled_transactions` with `pooled_transaction_hashes` …
jinsankim Feb 19, 2023
231f460
chore: revise EthStreamError::TransactionHashesInvalidLenOfFields
jinsankim Feb 20, 2023
acb0d81
chore: remove `From impls for tuples`
jinsankim Feb 20, 2023
fb528ed
chore(nit): remove explicit type
jinsankim Feb 20, 2023
c70e2d2
chore: revise decode_message Result
jinsankim Feb 20, 2023
517380e
chore(nit): revise debug message
jinsankim Feb 20, 2023
f53e2a7
chore: remove outgoing feature
jinsankim Feb 20, 2023
d43efa0
fix: remove unused imports in test
jinsankim Feb 20, 2023
3227771
test: test_removed_message_at_eth67()
jinsankim Feb 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ impl Capability {
pub fn is_eth_v67(&self) -> bool {
self.name == "eth" && self.version == 67
}

/// Whether this is eth v68.
#[inline]
pub fn is_eth_v68(&self) -> bool {
self.name == "eth" && self.version == 68
}
}
Comment on lines 64 to 73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the greater multi-capability feature let's refactor this into an enum as this is not a very rusty way of doing things

Copy link
Contributor Author

@jinsankim jinsankim Feb 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to open another issue for it because it could be broader than intuition.


#[cfg(any(test, feature = "arbitrary"))]
Expand Down Expand Up @@ -97,6 +103,7 @@ pub struct Capabilities {
inner: Vec<Capability>,
eth_66: bool,
eth_67: bool,
eth_68: bool,
}

impl Capabilities {
Expand All @@ -115,7 +122,7 @@ impl Capabilities {
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub fn supports_eth(&self) -> bool {
self.eth_67 || self.eth_66
self.eth_68 || self.eth_67 || self.eth_66
}

/// Whether this peer supports eth v66 protocol.
Expand All @@ -129,13 +136,20 @@ impl Capabilities {
pub fn supports_eth_v67(&self) -> bool {
self.eth_67
}

/// Whether this peer supports eth v68 protocol.
#[inline]
pub fn supports_eth_v68(&self) -> bool {
self.eth_68
}
}

impl From<Vec<Capability>> for Capabilities {
fn from(value: Vec<Capability>) -> Self {
Self {
eth_66: value.iter().any(Capability::is_eth_v66),
eth_67: value.iter().any(Capability::is_eth_v67),
eth_68: value.iter().any(Capability::is_eth_v68),
inner: value,
}
}
Expand All @@ -154,6 +168,7 @@ impl Decodable for Capabilities {
Ok(Self {
eth_66: inner.iter().any(Capability::is_eth_v66),
eth_67: inner.iter().any(Capability::is_eth_v67),
eth_68: inner.iter().any(Capability::is_eth_v68),
inner,
})
}
Expand Down Expand Up @@ -227,6 +242,15 @@ pub enum SharedCapabilityError {
mod tests {
use super::*;

#[test]
fn from_eth_68() {
let capability = SharedCapability::new("eth", 68, 0).unwrap();

assert_eq!(capability.name(), "eth");
assert_eq!(capability.version(), 68);
assert_eq!(capability, SharedCapability::Eth { version: EthVersion::Eth68, offset: 0 });
}

#[test]
fn from_eth_67() {
let capability = SharedCapability::new("eth", 67, 0).unwrap();
Expand All @@ -244,4 +268,19 @@ mod tests {
assert_eq!(capability.version(), 66);
assert_eq!(capability, SharedCapability::Eth { version: EthVersion::Eth66, offset: 0 });
}

#[test]
fn capabilities_supports_eth() {
let capabilities: Capabilities = vec![
Capability::new("eth".into(), 66),
Capability::new("eth".into(), 67),
Capability::new("eth".into(), 68),
]
.into();

assert!(capabilities.supports_eth());
assert!(capabilities.supports_eth_v66());
assert!(capabilities.supports_eth_v67());
assert!(capabilities.supports_eth_v68());
}
}
10 changes: 9 additions & 1 deletion crates/net/eth-wire/src/errors/eth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Error handling for (`EthStream`)[crate::EthStream]
use crate::{errors::P2PStreamError, DisconnectReason};
use crate::{
errors::P2PStreamError, version::ParseVersionError, DisconnectReason, EthMessageID, EthVersion,
};
use reth_primitives::{Chain, ValidationError, H256};
use std::io;

Expand All @@ -10,9 +12,15 @@ pub enum EthStreamError {
#[error(transparent)]
P2PStreamError(#[from] P2PStreamError),
#[error(transparent)]
ParseVersionError(#[from] ParseVersionError),
#[error(transparent)]
EthHandshakeError(#[from] EthHandshakeError),
#[error("For {0:?} version, message id({1:?}) is invalid")]
EthInvalidMessageError(EthVersion, EthMessageID),
#[error("message size ({0}) exceeds max length (10MB)")]
MessageTooBig(usize),
#[error("TransactionHashes invalid len of fields: hashes_len={hashes_len} types_len={types_len} sizes_len={sizes_len}")]
TransactionHashesInvalidLenOfFields { hashes_len: usize, types_len: usize, sizes_len: usize },
}

// === impl EthStreamError ===
Expand Down
36 changes: 22 additions & 14 deletions crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use crate::{
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
types::{EthMessage, ProtocolMessage, Status},
EthVersion,
};
use futures::{ready, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
use reth_primitives::{
bytes::{Bytes, BytesMut},
ForkFilter,
};
use reth_rlp::{Decodable, Encodable};
use reth_rlp::Encodable;
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -76,11 +77,12 @@ where
return Err(EthStreamError::MessageTooBig(their_msg.len()))
}

let msg = match ProtocolMessage::decode(&mut their_msg.as_ref()) {
let version = EthVersion::try_from(status.version)?;
let msg = match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) {
Ok(m) => m,
Err(err) => {
tracing::debug!("rlp decode error in eth handshake: msg={their_msg:x}");
return Err(err.into())
tracing::debug!("decode error in eth handshake: msg={their_msg:x}");
return Err(err)
}
};

Expand Down Expand Up @@ -120,7 +122,7 @@ where

// now we can create the `EthStream` because the peer has successfully completed
// the handshake
let stream = EthStream::new(self.inner);
let stream = EthStream::new(version, self.inner);

Ok((stream, resp))
}
Expand All @@ -136,15 +138,21 @@ where
#[pin_project]
#[derive(Debug)]
pub struct EthStream<S> {
version: EthVersion,
#[pin]
inner: S,
}

impl<S> EthStream<S> {
/// Creates a new unauthed [`EthStream`] from a provided stream. You will need
/// to manually handshake a peer.
pub fn new(inner: S) -> Self {
Self { inner }
pub fn new(version: EthVersion, inner: S) -> Self {
Self { version, inner }
}

/// Returns the eth version.
pub fn version(&self) -> EthVersion {
self.version
}

/// Returns the underlying stream.
Expand Down Expand Up @@ -203,11 +211,11 @@ where
return Poll::Ready(Some(Err(EthStreamError::MessageTooBig(bytes.len()))))
}

let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) {
let msg = match ProtocolMessage::decode_message(*this.version, &mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => {
tracing::debug!("rlp decode error: msg={bytes:x}");
return Poll::Ready(Some(Err(err.into())))
tracing::debug!("decode error: msg={bytes:x}");
return Poll::Ready(Some(Err(err)))
}
};

Expand Down Expand Up @@ -337,7 +345,7 @@ mod tests {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = PassthroughCodec::default().framed(incoming);
let mut stream = EthStream::new(stream);
let mut stream = EthStream::new(EthVersion::Eth67, stream);

// use the stream to get the next message
let message = stream.next().await.unwrap().unwrap();
Expand All @@ -346,7 +354,7 @@ mod tests {

let outgoing = TcpStream::connect(local_addr).await.unwrap();
let sink = PassthroughCodec::default().framed(outgoing);
let mut client_stream = EthStream::new(sink);
let mut client_stream = EthStream::new(EthVersion::Eth67, sink);

client_stream.send(test_msg).await.unwrap();

Expand All @@ -372,7 +380,7 @@ mod tests {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
let mut stream = EthStream::new(stream);
let mut stream = EthStream::new(EthVersion::Eth67, stream);

// use the stream to get the next message
let message = stream.next().await.unwrap().unwrap();
Expand All @@ -386,7 +394,7 @@ mod tests {

let outgoing = TcpStream::connect(local_addr).await.unwrap();
let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
let mut client_stream = EthStream::new(outgoing);
let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing);

client_stream.send(test_msg).await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/net/eth-wire/src/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl HelloMessageBuilder {
protocol_version: protocol_version.unwrap_or_default(),
client_version: client_version.unwrap_or_else(|| DEFAULT_CLIENT_VERSION.to_string()),
capabilities: capabilities
.unwrap_or_else(|| vec![EthVersion::Eth66.into(), EthVersion::Eth67.into()]),
.unwrap_or_else(|| vec![EthVersion::Eth67.into(), EthVersion::Eth66.into()]),
port: port.unwrap_or(30303),
id,
}
Expand Down
2 changes: 1 addition & 1 deletion crates/net/eth-wire/src/p2pstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ mod tests {
#[test]
fn test_peer_lower_capability_version() {
let local_capabilities: Vec<Capability> =
vec![EthVersion::Eth66.into(), EthVersion::Eth67.into()];
vec![EthVersion::Eth66.into(), EthVersion::Eth67.into(), EthVersion::Eth68.into()];
let peer_capabilities: Vec<Capability> = vec![EthVersion::Eth66.into()];

let shared_capability =
Expand Down
20 changes: 17 additions & 3 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,33 @@ pub struct SharedTransactions(
#[derive_arbitrary(rlp)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NewPooledTransactionHashes(
pub struct NewPooledTransactionHashes66(
/// Transaction hashes for new transactions that have appeared on the network.
/// Clients should request the transactions with the given hashes using a
/// [`GetPooledTransactions`](crate::GetPooledTransactions) message.
pub Vec<H256>,
);

impl From<Vec<H256>> for NewPooledTransactionHashes {
impl From<Vec<H256>> for NewPooledTransactionHashes66 {
fn from(v: Vec<H256>) -> Self {
NewPooledTransactionHashes(v)
NewPooledTransactionHashes66(v)
}
}

/// Same as [`NewPooledTransactionHashes66`] but extends that that beside the transaction hashes,
/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well.
#[derive_arbitrary(rlp)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NewPooledTransactionHashes68 {
/// Transaction types for new transactions that have appeared on the network.
pub types: Vec<u8>,
/// Transaction sizes for new transactions that have appeared on the network.
pub sizes: Vec<usize>,
/// Transaction hashes for new transactions that have appeared on the network.
pub hashes: Vec<H256>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading