Skip to content

Commit

Permalink
Remove timeout locks (sigp#6048)
Browse files Browse the repository at this point in the history
* Remove locks with timeouts

* Readd test

* Update docs

* Merge remote-tracking branch 'origin/unstable' into pk-cache-timeout
  • Loading branch information
dapplion authored Jul 26, 2024
1 parent 28e3b86 commit b949db0
Show file tree
Hide file tree
Showing 18 changed files with 60 additions and 214 deletions.
11 changes: 2 additions & 9 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
mod batch;

use crate::{
beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
metrics,
observed_aggregates::{ObserveOutcome, ObservedAttestationKey},
observed_attesters::Error as ObservedAttestersError,
Expand Down Expand Up @@ -1174,10 +1173,7 @@ pub fn verify_attestation_signature<T: BeaconChainTypes>(
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES);

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let fork = chain
.spec
Expand Down Expand Up @@ -1272,10 +1268,7 @@ pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: &IndexedAttestation<T::EthSpec>,
) -> Result<bool, Error> {
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let aggregator_index = signed_aggregate.message().aggregator_index();
if aggregator_index >= pubkey_cache.len() as u64 {
Expand Down
15 changes: 3 additions & 12 deletions beacon_node/beacon_chain/src/attestation_verification/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use super::{
CheckAttestationSignature, Error, IndexedAggregatedAttestation, IndexedUnaggregatedAttestation,
VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation,
};
use crate::{
beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, BeaconChain, BeaconChainError,
BeaconChainTypes,
};
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use bls::verify_signature_sets;
use state_processing::signature_sets::{
indexed_attestation_signature_set_from_pubkeys, signed_aggregate_selection_proof_signature_set,
Expand Down Expand Up @@ -60,10 +57,7 @@ where
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES);

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let mut signature_sets = Vec::with_capacity(num_indexed * 3);
// Iterate, flattening to get only the `Ok` values.
Expand Down Expand Up @@ -169,10 +163,7 @@ where
&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES,
);

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let mut signature_sets = Vec::with_capacity(num_partially_verified);

Expand Down
94 changes: 28 additions & 66 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{
get_slot_delay_ms, timestamp_now, ValidatorMonitor,
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
Expand Down Expand Up @@ -132,17 +131,6 @@ pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
/// Alias to appease clippy.
type HashBlockTuple<E> = (Hash256, RpcBlock<E>);

/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

/// The time-out before failure during an operation to take a read/write RwLock on the
/// validator pubkey cache.
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

/// The timeout for the eth1 finalization cache
pub const ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_millis(200);

// These keys are all zero because they get stored in different columns, see `DBColumn` type.
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
Expand Down Expand Up @@ -465,13 +453,13 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: TimeoutRwLock<ShufflingCache>,
pub shuffling_cache: RwLock<ShufflingCache>,
/// A cache of eth1 deposit data at epoch boundaries for deposit finalization
pub eth1_finalization_cache: TimeoutRwLock<Eth1FinalizationCache>,
pub eth1_finalization_cache: RwLock<Eth1FinalizationCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>,
pub(crate) validator_pubkey_cache: RwLock<ValidatorPubkeyCache<T>>,
/// A cache used when producing attestations.
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
Expand Down Expand Up @@ -1472,10 +1460,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get_index(pubkey))
}
Expand All @@ -1488,10 +1473,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

validator_pubkeys
.map(|pubkey| {
Expand All @@ -1516,10 +1498,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_pubkey(&self, validator_index: usize) -> Result<Option<PublicKey>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get(validator_index).cloned())
}
Expand All @@ -1529,10 +1508,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_index: usize,
) -> Result<Option<PublicKeyBytes>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get_pubkey_bytes(validator_index).copied())
}
Expand All @@ -1546,10 +1522,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_indices: &[usize],
) -> Result<HashMap<usize, PublicKeyBytes>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

let mut map = HashMap::with_capacity(validator_indices.len());
for &validator_index in validator_indices {
Expand Down Expand Up @@ -3506,11 +3479,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
.import_new_pubkeys(&state)?;
let mut ops = {
let _timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_PUBKEY_CACHE_LOCK);
self.validator_pubkey_cache
.write()
.import_new_pubkeys(&state)?
};

// Apply the state to the attester cache, only if it is from the previous epoch or later.
//
Expand Down Expand Up @@ -4116,18 +4090,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

let shuffling_is_cached = self
.shuffling_cache
.try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.contains(&shuffling_id);
let shuffling_is_cached = self.shuffling_cache.read().contains(&shuffling_id);

if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.write()
.insert_committee_cache(shuffling_id, committee_cache);
}
}
Expand Down Expand Up @@ -4174,14 +4143,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
};

if let Some(finalized_eth1_data) = self
.eth1_finalization_cache
.try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT)
.and_then(|mut cache| {
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
})
{
let finalized_eth1_data = {
let mut cache = self.eth1_finalization_cache.write();
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
};
if let Some(finalized_eth1_data) = finalized_eth1_data {
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
let finalized_deposit_count = finalized_eth1_data.deposit_count;
eth1_chain.finalize_eth1_data(finalized_eth1_data);
Expand Down Expand Up @@ -6365,15 +6332,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})?;

// Obtain the shuffling cache, timing how long we wait.
let cache_wait_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);

let mut shuffling_cache = self
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?;

metrics::stop_timer(cache_wait_timer);
let mut shuffling_cache = {
let _ =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
self.shuffling_cache.write()
};

if let Some(cache_item) = shuffling_cache.get(&shuffling_id) {
// The shuffling cache is no longer required, drop the write-lock to allow concurrent
Expand Down Expand Up @@ -6481,8 +6444,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let shuffling_decision_block = shuffling_id.shuffling_decision_block;

self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.write()
.insert_committee_cache(shuffling_id, &committee_cache);

metrics::stop_timer(committee_building_timer);
Expand Down
7 changes: 2 additions & 5 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::observed_block_producers::SeenBlock;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
beacon_chain::{BeaconForkChoice, ForkChoiceError, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT},
beacon_chain::{BeaconForkChoice, ForkChoiceError},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
Expand Down Expand Up @@ -2096,10 +2096,7 @@ pub fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec, Err: BlockBlobEr
pub fn get_validator_pubkey_cache<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<RwLockReadGuard<ValidatorPubkeyCache<T>>, BeaconChainError> {
chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)
Ok(chain.validator_pubkey_cache.read())
}

/// Produces an _empty_ `BlockSignatureVerifier`.
Expand Down
7 changes: 3 additions & 4 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::ChainConfig;
Expand Down Expand Up @@ -935,16 +934,16 @@ where
fork_choice_signal_rx,
event_handler: self.event_handler,
head_tracker,
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(
shuffling_cache: RwLock::new(ShufflingCache::new(
shuffling_cache_size,
head_shuffling_ids,
log.clone(),
)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
eth1_finalization_cache: RwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache,
block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
reqresp_pre_import_cache: <_>::default(),
Expand Down
20 changes: 4 additions & 16 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
//! the head block root. This is unacceptable for fast-responding functions like the networking
//! stack.
use crate::beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::BlockShufflingIds;
use crate::{
Expand Down Expand Up @@ -817,21 +816,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_snapshot.beacon_block_root,
&new_snapshot.beacon_state,
) {
Ok(head_shuffling_ids) => {
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.map(|mut shuffling_cache| {
shuffling_cache.update_head_shuffling_ids(head_shuffling_ids)
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "shuffling_cache",
"task" => "update head shuffling decision root"
);
});
}
Ok(head_shuffling_ids) => self
.shuffling_cache
.write()
.update_head_shuffling_ids(head_shuffling_ids),
Err(e) => {
error!(
self.log,
Expand Down
3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub struct ChainConfig {
pub weak_subjectivity_checkpoint: Option<Checkpoint>,
/// Determine whether to reconstruct historic states, usually after a checkpoint sync.
pub reconstruct_historic_states: bool,
/// Whether timeouts on `TimeoutRwLock`s are enabled or not.
pub enable_lock_timeouts: bool,
/// The max size of a message that can be sent over the network.
pub max_network_size: usize,
/// Maximum percentage of the head committee weight at which to attempt re-orging the canonical head.
Expand Down Expand Up @@ -94,7 +92,6 @@ impl Default for ChainConfig {
import_max_skip_slots: None,
weak_subjectivity_checkpoint: None,
reconstruct_historic_states: false,
enable_lock_timeouts: true,
max_network_size: 10 * 1_048_576, // 10M
re_org_head_threshold: Some(DEFAULT_RE_ORG_HEAD_THRESHOLD),
re_org_parent_threshold: Some(DEFAULT_RE_ORG_PARENT_THRESHOLD),
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub mod state_advance_timer;
pub mod sync_committee_rewards;
pub mod sync_committee_verification;
pub mod test_utils;
mod timeout_rw_lock;
pub mod validator_monitor;
pub mod validator_pubkey_cache;

Expand Down Expand Up @@ -98,5 +97,4 @@ pub use state_processing::per_block_processing::errors::{
ExitValidationError, ProposerSlashingValidationError,
};
pub use store;
pub use timeout_rw_lock::TimeoutRwLock;
pub use types;
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ lazy_static! {
"Time spent running fork choice's `get_head` during block import",
exponential_buckets(1e-3, 2.0, 8)
);
pub static ref BLOCK_PROCESSING_PUBKEY_CACHE_LOCK: Result<Histogram> = try_create_histogram(
"beacon_block_processing_pubkey_cache_lock_seconds",
"Time spent waiting or holding the pubkey cache write lock",
);
pub static ref BLOCK_SYNC_AGGREGATE_SET_BITS: Result<IntGauge> = try_create_int_gauge(
"block_sync_aggregate_set_bits",
"The number of true bits in the last sync aggregate in a block"
Expand Down
Loading

0 comments on commit b949db0

Please sign in to comment.