Skip to content

Commit

Permalink
refactor!: Add own timestamp to peer handshake
Browse files Browse the repository at this point in the history
In case we want this value in the future, it's convenient to have the
peer's reported timestamp in the handshake.

Currently, we only use this value to log a warning if peer's reported
time is different from our. But we store it on the `PeerInfo` struct in
case we want to use it to e.g. have a network-adjusted time.
  • Loading branch information
Sword-Smith committed Jan 31, 2025
1 parent 4161557 commit 487e8b3
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 42 deletions.
30 changes: 15 additions & 15 deletions src/connect_to_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,8 @@ mod connect_tests {
#[tokio::test]
async fn test_outgoing_connection_succeed() -> Result<()> {
let network = Network::Alpha;
let other_handshake = get_dummy_handshake_data_for_genesis(network).await;
let own_handshake = get_dummy_handshake_data_for_genesis(network).await;
let other_handshake = get_dummy_handshake_data_for_genesis(network);
let own_handshake = get_dummy_handshake_data_for_genesis(network);
let mock = Builder::new()
.write(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
Expand Down Expand Up @@ -627,8 +627,8 @@ mod connect_tests {
) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;

// Get an address for a peer that's not already connected
let (other_handshake, peer_sa) = get_dummy_peer_connection_data_genesis(network, 1).await;
let own_handshake = get_dummy_handshake_data_for_genesis(network).await;
let (other_handshake, peer_sa) = get_dummy_peer_connection_data_genesis(network, 1);
let own_handshake = get_dummy_handshake_data_for_genesis(network);

let mut status = check_if_connection_is_allowed(
state_lock.clone(),
Expand Down Expand Up @@ -771,8 +771,8 @@ mod connect_tests {
// object will panic, and the `await` operator will evaluate
// to Error.
let network = Network::Alpha;
let other_handshake = get_dummy_handshake_data_for_genesis(network).await;
let own_handshake = get_dummy_handshake_data_for_genesis(network).await;
let other_handshake = get_dummy_handshake_data_for_genesis(network);
let own_handshake = get_dummy_handshake_data_for_genesis(network);
let mock = Builder::new()
.read(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
Expand Down Expand Up @@ -812,8 +812,8 @@ mod connect_tests {
#[tokio::test]
async fn test_incoming_connection_fail_bad_magic_value() -> Result<()> {
let network = Network::Alpha;
let other_handshake = get_dummy_handshake_data_for_genesis(network).await;
let own_handshake = get_dummy_handshake_data_for_genesis(network).await;
let other_handshake = get_dummy_handshake_data_for_genesis(network);
let own_handshake = get_dummy_handshake_data_for_genesis(network);
let mock = Builder::new()
.read(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_RESPONSE.to_vec(),
Expand Down Expand Up @@ -841,8 +841,8 @@ mod connect_tests {
#[traced_test]
#[tokio::test]
async fn test_incoming_connection_fail_bad_network() -> Result<()> {
let other_handshake = get_dummy_handshake_data_for_genesis(Network::Testnet).await;
let own_handshake = get_dummy_handshake_data_for_genesis(Network::Alpha).await;
let other_handshake = get_dummy_handshake_data_for_genesis(Network::Testnet);
let own_handshake = get_dummy_handshake_data_for_genesis(Network::Alpha);
let mock = Builder::new()
.read(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
Expand Down Expand Up @@ -874,7 +874,7 @@ mod connect_tests {
#[traced_test]
#[tokio::test]
async fn test_incoming_connection_fail_bad_version() {
let mut other_handshake = get_dummy_handshake_data_for_genesis(Network::Testnet).await;
let mut other_handshake = get_dummy_handshake_data_for_genesis(Network::Testnet);
let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
get_test_genesis_setup(Network::Alpha, 0, cli_args::Args::default())
.await
Expand Down Expand Up @@ -943,8 +943,8 @@ mod connect_tests {
// In this scenario a node attempts to make an ingoing connection but the max
// peer count should prevent a new incoming connection from being accepted.
let network = Network::Alpha;
let other_handshake = get_dummy_handshake_data_for_genesis(network).await;
let own_handshake = get_dummy_handshake_data_for_genesis(network).await;
let other_handshake = get_dummy_handshake_data_for_genesis(network);
let own_handshake = get_dummy_handshake_data_for_genesis(network);
let mock = Builder::new()
.read(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
Expand Down Expand Up @@ -993,8 +993,8 @@ mod connect_tests {
// In this scenario a peer has been banned, and is attempting to make an ingoing
// connection. This should not be possible.
let network = Network::Alpha;
let other_handshake = get_dummy_handshake_data_for_genesis(network).await;
let own_handshake = get_dummy_handshake_data_for_genesis(network).await;
let other_handshake = get_dummy_handshake_data_for_genesis(network);
let own_handshake = get_dummy_handshake_data_for_genesis(network);
let mock = Builder::new()
.read(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
Expand Down
2 changes: 1 addition & 1 deletion src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2338,7 +2338,7 @@ mod test {

// simulate incoming connection
let (peer_handshake_data, peer_socket_address) =
get_dummy_peer_connection_data_genesis(network, 1).await;
get_dummy_peer_connection_data_genesis(network, 1);
let own_handshake_data = main_loop_handler
.global_state_lock
.lock_guard()
Expand Down
6 changes: 6 additions & 0 deletions src/models/peer/handshake_data.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::SystemTime;

use arraystring::typenum::U30;
use arraystring::ArrayString;
use serde::Deserialize;
Expand All @@ -18,4 +20,8 @@ pub(crate) struct HandshakeData {
pub instance_id: u128,
pub version: VersionString,
pub is_archival_node: bool,

/// Client's timestamp when the handshake was generated. Can be used to
/// compare own timestamp to peer's or to a list of peers.
pub timestamp: SystemTime,
}
67 changes: 57 additions & 10 deletions src/models/peer/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::net::SocketAddr;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use serde::Deserialize;
use serde::Serialize;

use super::handshake_data::VersionString;
use super::InstanceId;
use super::PeerStanding;
use crate::HandshakeData;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub(crate) struct PeerConnectionInfo {
Expand All @@ -33,7 +35,8 @@ impl PeerConnectionInfo {
pub struct PeerInfo {
peer_connection_info: PeerConnectionInfo,
instance_id: InstanceId,
connection_established: SystemTime,
pub(crate) own_timestamp_connection_established: SystemTime,
pub(crate) peer_timestamp_connection_established: SystemTime,
pub(crate) standing: PeerStanding,
version: VersionString,
is_archival_node: bool,
Expand All @@ -42,24 +45,49 @@ pub struct PeerInfo {
impl PeerInfo {
pub(crate) fn new(
peer_connection_info: PeerConnectionInfo,
instance_id: InstanceId,
peer_handshake: &HandshakeData,
connection_established: SystemTime,
version: VersionString,
is_archival_node: bool,
peer_tolerance: u16,
) -> Self {
assert!(peer_tolerance > 0, "Peer tolerance must be positive");
let standing = PeerStanding::new(peer_tolerance);
Self {
peer_connection_info,
instance_id,
connection_established,
instance_id: peer_handshake.instance_id,
own_timestamp_connection_established: connection_established,
peer_timestamp_connection_established: peer_handshake.timestamp,
standing,
version,
is_archival_node,
version: peer_handshake.version,
is_archival_node: peer_handshake.is_archival_node,
}
}

/// Infallible absolute difference between two timestamps, in seconds.
fn system_time_diff_seconds(peer: SystemTime, own: SystemTime) -> i128 {
let peer = peer
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i128)
.unwrap_or_else(|e| -(e.duration().as_secs() as i128));

let own = own
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i128)
.unwrap_or_else(|e| -(e.duration().as_secs() as i128));

own - peer
}

/// Return the difference in time as reported by peer and client in seconds.
/// The returned value is `peer clock - own clock`. So the amount of time
/// that the connected peer is ahead of this client's clock. Negative value
/// if peer clock is behind our clock.
pub(crate) fn time_difference_in_seconds(&self) -> i128 {
Self::system_time_diff_seconds(
self.peer_timestamp_connection_established,
self.own_timestamp_connection_established,
)
}

pub(crate) fn with_standing(mut self, standing: PeerStanding) -> Self {
self.standing = standing;
self
Expand All @@ -78,7 +106,7 @@ impl PeerInfo {
}

pub fn connection_established(&self) -> SystemTime {
self.connection_established
self.own_timestamp_connection_established
}

pub fn is_archival_node(&self) -> bool {
Expand All @@ -99,6 +127,25 @@ impl PeerInfo {

#[cfg(test)]
pub(crate) fn set_connection_established(&mut self, new_timestamp: SystemTime) {
self.connection_established = new_timestamp;
self.own_timestamp_connection_established = new_timestamp;
}
}

#[cfg(test)]
mod tests {
use test_strategy::proptest;

use super::*;

#[test]
fn time_difference_in_seconds_simple() {
let now = SystemTime::now();
let and_now = SystemTime::now();
assert!(PeerInfo::system_time_diff_seconds(now, and_now) < 10);
}

#[proptest]
fn time_difference_doesnt_crash(now: SystemTime, and_now: SystemTime) {
PeerInfo::system_time_diff_seconds(now, and_now);
}
}
1 change: 1 addition & 0 deletions src/models/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ impl GlobalState {
}),
// For now, all nodes are archival nodes
is_archival_node: self.chain.is_archival_node(),
timestamp: SystemTime::now(),
}
}

Expand Down
36 changes: 27 additions & 9 deletions src/peer_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::time::SystemTime;

use anyhow::bail;
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use futures::sink::Sink;
use futures::sink::SinkExt;
use futures::stream::TryStream;
Expand Down Expand Up @@ -1696,14 +1698,30 @@ impl PeerLoopHandler {
);
let new_peer = PeerInfo::new(
peer_connection_info,
self.peer_handshake_data.instance_id,
&self.peer_handshake_data,
SystemTime::now(),
self.peer_handshake_data.version,
self.peer_handshake_data.is_archival_node,
cli_args.peer_tolerance,
)
.with_standing(standing);

// If timestamps are different, we currently just log a warning.
const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
|| own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
{
let own_datetime_utc: DateTime<Utc> =
new_peer.own_timestamp_connection_established.into();
let peer_datetime_utc: DateTime<Utc> =
new_peer.peer_timestamp_connection_established.into();
warn!(
"New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
new_peer.connected_address(),
peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
}

// There is potential for a race-condition in the peer_map here, as we've previously
// counted the number of entries and checked if instance ID was already connected. But
// this check could have been invalidated by other tasks so we perform it again
Expand Down Expand Up @@ -1849,7 +1867,7 @@ mod peer_loop_tests {
peer_infos[1].instance_id(),
);

let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2).await;
let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
let expected_response = vec![
(peer_address0, instance_id0),
(peer_address1, instance_id1),
Expand Down Expand Up @@ -2722,7 +2740,7 @@ mod peer_loop_tests {
cli.sync_mode_threshold = 2;
state_lock.set_cli(cli).await;

let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1).await;
let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
let [block_1, _block_2, block_3, block_4] =
fake_valid_sequence_of_blocks_for_tests(&genesis_block, Timestamp::hours(1), rng.gen())
.await;
Expand Down Expand Up @@ -3071,7 +3089,7 @@ mod peer_loop_tests {
.await;
state_lock.set_new_tip(block_1.clone()).await?;

let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1).await;
let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
let expected_peer_list_resp = vec![
(
peer_infos[0].listen_address().unwrap(),
Expand Down Expand Up @@ -3187,7 +3205,7 @@ mod peer_loop_tests {
Action::Read(PeerMessage::Bye),
]);

let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1).await;
let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);

// Mock a timestamp to allow transaction to be considered valid
let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
Expand Down Expand Up @@ -3259,7 +3277,7 @@ mod peer_loop_tests {
.await
.unwrap();

let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1).await;
let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
let mut peer_loop_handler = PeerLoopHandler::new(
to_main_tx,
state_lock.clone(),
Expand Down Expand Up @@ -3325,7 +3343,7 @@ mod peer_loop_tests {
get_test_genesis_setup(network, 0, cli_args::Args::default())
.await
.unwrap();
let peer_hsd = get_dummy_handshake_data_for_genesis(network).await;
let peer_hsd = get_dummy_handshake_data_for_genesis(network);
let peer_loop_handler = PeerLoopHandler::new(
to_main_tx.clone(),
alice.clone(),
Expand Down
14 changes: 7 additions & 7 deletions src/tests/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,11 @@ pub fn get_dummy_socket_address(count: u8) -> SocketAddr {
/// Get a dummy-peer representing an outgoing connection.
pub(crate) fn get_dummy_peer(address: SocketAddr) -> PeerInfo {
let peer_connection_info = PeerConnectionInfo::new(Some(8080), address, false);
let peer_handhake = get_dummy_handshake_data_for_genesis(Network::Main);
PeerInfo::new(
peer_connection_info,
rand::random(),
&peer_handhake,
SystemTime::now(),
get_dummy_version(),
true,
cli_args::Args::default().peer_tolerance,
)
}
Expand All @@ -161,14 +160,15 @@ pub fn get_dummy_version() -> VersionString {
}

/// Return a handshake object with a randomly set instance ID
pub(crate) async fn get_dummy_handshake_data_for_genesis(network: Network) -> HandshakeData {
pub(crate) fn get_dummy_handshake_data_for_genesis(network: Network) -> HandshakeData {
HandshakeData {
instance_id: rand::random(),
tip_header: Block::genesis(network).header().to_owned(),
listen_port: Some(8080),
network,
version: get_dummy_version(),
is_archival_node: true,
timestamp: SystemTime::now(),
}
}

Expand All @@ -180,11 +180,11 @@ pub(crate) fn to_bytes(message: &PeerMessage) -> Result<Bytes> {
Ok(buf.freeze())
}

pub(crate) async fn get_dummy_peer_connection_data_genesis(
pub(crate) fn get_dummy_peer_connection_data_genesis(
network: Network,
id: u8,
) -> (HandshakeData, SocketAddr) {
let handshake = get_dummy_handshake_data_for_genesis(network).await;
let handshake = get_dummy_handshake_data_for_genesis(network);
let socket_address = get_dummy_socket_address(id);

(handshake, socket_address)
Expand Down Expand Up @@ -272,7 +272,7 @@ pub(crate) async fn get_test_genesis_setup(
to_main_tx,
_to_main_rx1,
state,
get_dummy_handshake_data_for_genesis(network).await,
get_dummy_handshake_data_for_genesis(network),
))
}

Expand Down

0 comments on commit 487e8b3

Please sign in to comment.