Skip to content

Commit

Permalink
Cache unsolicited address messages, and provide them to Zebra when re…
Browse files Browse the repository at this point in the history
…quested
  • Loading branch information
teor2345 committed Dec 30, 2021
1 parent fa44edf commit c28fd01
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 42 deletions.
137 changes: 95 additions & 42 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use zebra_chain::{

use crate::{
constants,
meta_addr::MetaAddr,
peer::{
error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest,
MustUseOneshotSender, PeerError, SharedPeerError,
Expand Down Expand Up @@ -420,6 +421,15 @@ pub struct Connection<S, Tx> {
/// other state handling.
pub(super) request_timer: Option<Pin<Box<Sleep>>>,

/// A cached copy of the last unsolicited `addr` or `addrv2` message from this peer.
///
/// When Zebra requests peers, the cache is consumed and returned as a synthetic response.
/// This works around `zcashd`'s address response rate-limit.
///
/// Multi-peer `addr` or `addrv2` messages replace single-peer messages in the cache.
/// (`zcashd` also gossips its own address at regular intervals.)
pub(super) cached_addrs: Vec<MetaAddr>,

/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,

Expand Down Expand Up @@ -548,6 +558,52 @@ where
}
}
}

// Check whether the handler is finished before waiting for a response message,
// because the response might be `Nil` or synthetic.
State::AwaitingResponse {
handler: Handler::Finished(_),
ref span,
..
} => {
// We have to get rid of the span reference so we can tamper with the state.
let span = span.clone();
trace!(
parent: &span,
"returning completed response to client request"
);

// Replace the state with a temporary value,
// so we can take ownership of the response sender.
let tmp_state = std::mem::replace(&mut self.state, State::Failed);

if let State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} = tmp_state
{
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}

let _ = tx.send(response.map_err(Into::into));
} else {
unreachable!("already checked for AwaitingResponse");
}

self.state = State::AwaitingRequest;
}

// We're awaiting a response to a client request,
// so wait on either a peer message, or on a request cancellation.
State::AwaitingResponse {
Expand Down Expand Up @@ -600,45 +656,6 @@ where

self.update_state_metrics(None);

// Check whether the handler is finished processing messages,
// and update the state.
// (Some messages can indicate that a response has finished,
// even if the message wasn't consumed as a response or a request.)
//
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } =>
pending
,
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};

self.update_state_metrics(None);

// If the message was not consumed as a response,
// check whether it can be handled as a request.
let unused_msg = if let Some(request_msg) = request_msg {
Expand Down Expand Up @@ -695,6 +712,7 @@ where
}
}
}

// This connection has failed: stop the event loop, and complete the future.
State::Failed => break,
}
Expand Down Expand Up @@ -723,7 +741,7 @@ where
self.shutdown(error);
}

/// Handle an incoming client request, possibly generating outgoing messages to the
/// Handle an internal client request, possibly generating outgoing messages to the
/// remote peer.
///
/// NOTE: the caller should use .instrument(msg.span) to instrument the function.
Expand Down Expand Up @@ -772,6 +790,25 @@ where
pending,
self.client_rx
),

// Consume the cached addresses from the peer,
// to work-around a `zcashd` response rate-limit
(AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
let cached_addrs = std::mem::take(&mut self.cached_addrs);
debug!(
addrs = cached_addrs.len(),
"responding to Peers request using cached addresses",
);

Ok((
AwaitingResponse {
handler: Handler::Finished(Ok(Response::Peers(cached_addrs))),
tx,
span,
},
None,
))}
,
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
Expand All @@ -783,6 +820,7 @@ where
)),
Err(e) => Err((e, tx)),
},

(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok((
AwaitingResponse {
Expand Down Expand Up @@ -1012,8 +1050,23 @@ where
}
// Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book.
Message::Addr(_) => {
debug!(%msg, "ignoring unsolicited addr message");
Message::Addr(ref addrs) => {
// Workaround `zcashd`'s `getaddr` response rate-limit
if addrs.len() > 1 {
// Always refresh the cache with multi-addr messages.
debug!(%msg, "caching unsolicited multi-addr message");
self.cached_addrs = addrs.clone();
} else if addrs.len() == 1 && self.cached_addrs.len() <= 1 {
// Only refresh a cached single addr message with another single addr.
// (`zcashd` regularly advertises its own address.)
debug!(%msg, "caching unsolicited single addr message");
self.cached_addrs = addrs.clone();
} else {
debug!(
%msg,
"ignoring unsolicited single addr message: already cached a multi-addr message"
);
}
None
}
Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())),
Expand Down
7 changes: 7 additions & 0 deletions zebra-network/src/peer/connection/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async fn connection_run_loop_ok() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -103,6 +104,7 @@ async fn connection_run_loop_future_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -152,6 +154,7 @@ async fn connection_run_loop_client_close() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -208,6 +211,7 @@ async fn connection_run_loop_client_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -263,6 +267,7 @@ async fn connection_run_loop_inbound_close() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -319,6 +324,7 @@ async fn connection_run_loop_inbound_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -379,6 +385,7 @@ async fn connection_run_loop_failed() {
let connection = Connection {
state: State::Failed,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ where
let server = Connection {
state: connection::State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: error_slot.clone(),
Expand Down

0 comments on commit c28fd01

Please sign in to comment.