diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 48cdf7031a1..f0c25124ddb 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -121,6 +121,7 @@ impl fmt::Display for Error { pub struct Timeouts { pub attestation: Duration, pub attester_duties: Duration, + pub attestation_subscriptions: Duration, pub liveness: Duration, pub proposal: Duration, pub proposer_duties: Duration, @@ -137,6 +138,7 @@ impl Timeouts { Timeouts { attestation: timeout, attester_duties: timeout, + attestation_subscriptions: timeout, liveness: timeout, proposal: timeout, proposer_duties: timeout, @@ -2540,7 +2542,12 @@ impl BeaconNodeHttpClient { .push("validator") .push("beacon_committee_subscriptions"); - self.post(path, &subscriptions).await?; + self.post_with_timeout( + path, + &subscriptions, + self.timeouts.attestation_subscriptions, + ) + .await?; Ok(()) } diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 4467b807865..58d7f9d8eef 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -134,6 +134,12 @@ impl fmt::Display for Errors { } } +impl Errors { + pub fn num_errors(&self) -> usize { + self.0.len() + } +} + /// Reasons why a candidate might not be ready. #[derive(Debug, Clone, Copy)] pub enum CandidateError { @@ -599,46 +605,41 @@ impl BeaconNodeFallback { F: Fn(&'a BeaconNodeHttpClient) -> R, R: Future>, { - let mut results = vec![]; let mut to_retry = vec![]; let mut retry_unsynced = vec![]; // Run `func` using a `candidate`, returning the value or capturing errors. - // - // We use a macro instead of a closure here since it is not trivial to move `func` into a - // closure. - macro_rules! try_func { - ($candidate: ident) => {{ - inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]); + let run_on_candidate = |candidate: &'a CandidateBeaconNode| async { + inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]); - // There exists a race condition where `func` may be called when the candidate is - // actually not ready. We deem this an acceptable inefficiency. - match func(&$candidate.beacon_node).await { - Ok(val) => results.push(Ok(val)), - Err(e) => { - // If we have an error on this function, make the client as not-ready. - // - // There exists a race condition where the candidate may have been marked - // as ready between the `func` call and now. We deem this an acceptable - // inefficiency. - if matches!(offline_on_failure, OfflineOnFailure::Yes) { - $candidate.set_offline().await; - } - results.push(Err(( - $candidate.beacon_node.to_string(), - Error::RequestFailed(e), - ))); - inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); + // There exists a race condition where `func` may be called when the candidate is + // actually not ready. We deem this an acceptable inefficiency. + match func(&candidate.beacon_node).await { + Ok(val) => Ok(val), + Err(e) => { + // If we have an error on this function, mark the client as not-ready. + // + // There exists a race condition where the candidate may have been marked + // as ready between the `func` call and now. We deem this an acceptable + // inefficiency. + if matches!(offline_on_failure, OfflineOnFailure::Yes) { + candidate.set_offline().await; } + inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]); + Err((candidate.beacon_node.to_string(), Error::RequestFailed(e))) } - }}; - } + } + }; // First pass: try `func` on all synced and ready candidates. // // This ensures that we always choose a synced node if it is available. + let mut first_batch_futures = vec![]; for candidate in &self.candidates { match candidate.status(RequireSynced::Yes).await { + Ok(_) => { + first_batch_futures.push(run_on_candidate(candidate)); + } Err(CandidateError::NotSynced) if require_synced == false => { // This client is unsynced we will try it after trying all synced clients retry_unsynced.push(candidate); @@ -647,22 +648,24 @@ impl BeaconNodeFallback { // This client was not ready on the first pass, we might try it again later. to_retry.push(candidate); } - Ok(_) => try_func!(candidate), } } + let first_batch_results = futures::future::join_all(first_batch_futures).await; // Second pass: try `func` on ready unsynced candidates. This only runs if we permit // unsynced candidates. // // Due to async race-conditions, it is possible that we will send a request to a candidate // that has been set to an offline/unready status. This is acceptable. - if require_synced == false { - for candidate in retry_unsynced { - try_func!(candidate); - } - } + let second_batch_results = if require_synced == false { + futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await + } else { + vec![] + }; // Third pass: try again, attempting to make non-ready clients become ready. + let mut third_batch_futures = vec![]; + let mut third_batch_results = vec![]; for candidate in to_retry { // If the candidate hasn't luckily transferred into the correct state in the meantime, // force an update of the state. @@ -676,16 +679,21 @@ impl BeaconNodeFallback { }; match new_status { - Ok(()) => try_func!(candidate), - Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), - Err(e) => { - results.push(Err(( - candidate.beacon_node.to_string(), - Error::Unavailable(e), - ))); + Ok(()) => third_batch_futures.push(run_on_candidate(candidate)), + Err(CandidateError::NotSynced) if require_synced == false => { + third_batch_futures.push(run_on_candidate(candidate)) } + Err(e) => third_batch_results.push(Err(( + candidate.beacon_node.to_string(), + Error::Unavailable(e), + ))), } } + third_batch_results.extend(futures::future::join_all(third_batch_futures).await); + + let mut results = first_batch_results; + results.extend(second_batch_results); + results.extend(third_batch_results); let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect(); diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 880f0eaa488..faa157a8592 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -86,7 +86,8 @@ const _: () = assert!({ /// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the /// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid /// bringing in the entire crate. -const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2); +const MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD: u64 = 2; +const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD); // The info in the enum variants is displayed in logging, clippy thinks it's dead code. #[derive(Debug)] @@ -121,6 +122,8 @@ pub struct DutyAndProof { pub struct SubscriptionSlots { /// Pairs of `(slot, already_sent)` in slot-descending order. slots: Vec<(Slot, AtomicBool)>, + /// The slot of the duty itself. + duty_slot: Slot, } /// Create a selection proof for `duty`. @@ -172,18 +175,20 @@ impl SubscriptionSlots { .filter(|scheduled_slot| *scheduled_slot > current_slot) .map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false))) .collect(); - Arc::new(Self { slots }) + Arc::new(Self { slots, duty_slot }) } /// Return `true` if we should send a subscription at `slot`. fn should_send_subscription_at(&self, slot: Slot) -> bool { // Iterate slots from smallest to largest looking for one that hasn't been completed yet. - self.slots - .iter() - .rev() - .any(|(scheduled_slot, already_sent)| { - slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed) - }) + slot + MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD <= self.duty_slot + && self + .slots + .iter() + .rev() + .any(|(scheduled_slot, already_sent)| { + slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed) + }) } /// Update our record of subscribed slots to account for successful subscription at `slot`. @@ -737,7 +742,7 @@ async fn poll_beacon_attesters( // If there are any subscriptions, push them out to beacon nodes if !subscriptions.is_empty() { let subscriptions_ref = &subscriptions; - if let Err(e) = duties_service + let subscription_result = duties_service .beacon_nodes .request( RequireSynced::No, @@ -753,15 +758,8 @@ async fn poll_beacon_attesters( .await }, ) - .await - { - error!( - log, - "Failed to subscribe validators"; - "error" => %e - ) - } else { - // Record that subscriptions were successfully sent. + .await; + if subscription_result.as_ref().is_ok() { debug!( log, "Broadcast attestation subscriptions"; @@ -770,6 +768,25 @@ async fn poll_beacon_attesters( for subscription_slots in subscription_slots_to_confirm { subscription_slots.record_successful_subscription_at(current_slot); } + } else if let Err(e) = subscription_result { + if e.num_errors() < duties_service.beacon_nodes.num_total() { + warn!( + log, + "Some subscriptions failed"; + "error" => %e, + ); + // If subscriptions were sent to at least one node, regard that as a success. + // There is some redundancy built into the subscription schedule to handle failures. + for subscription_slots in subscription_slots_to_confirm { + subscription_slots.record_successful_subscription_at(current_slot); + } + } else { + error!( + log, + "All subscriptions failed"; + "error" => %e + ); + } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 729ff62ee30..dff50582dfe 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12); /// This can help ensure that proper endpoint fallback occurs. const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24; const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; @@ -323,6 +324,8 @@ impl ProductionValidatorClient { Timeouts { attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT, attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, + attestation_subscriptions: slot_duration + / HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT, liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT, proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,