From fc53683b5e7cb1fe5a750c7b55128d32fe7bf810 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Fri, 24 Jan 2025 17:40:32 +0800 Subject: [PATCH] fix: use atomic to check if leader bank changed (#4596) * fix: use atomic to check if leader bank changed * fix default value and tests * ordering update * eedback * add comment (cherry picked from commit 4cc49ac80a83ac86baa2664c2a53c37d7e31c10f) --- core/src/banking_stage/consume_worker.rs | 10 +++++- poh/src/leader_bank_notifier.rs | 44 ++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index 9375e733aae1c4..143106cd1add52 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -80,7 +80,10 @@ impl ConsumeWorker { .fetch_add(get_bank_us, Ordering::Relaxed); for work in try_drain_iter(work, &self.consume_receiver) { - if bank.is_complete() { + if bank.is_complete() || { + // check if the bank got interrupted before completion + self.get_consume_bank_id() != Some(bank.bank_id()) + } { let (maybe_new_bank, get_bank_us) = measure_us!(self.get_consume_bank()); if let Some(new_bank) = maybe_new_bank { self.metrics @@ -129,6 +132,11 @@ impl ConsumeWorker { .upgrade() } + /// Try to get the id for the bank that should be used for consuming + fn get_consume_bank_id(&self) -> Option { + self.leader_bank_notifier.get_current_bank_id() + } + /// Retry current batch and all outstanding batches. fn retry_drain(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { for work in try_drain_iter(work, &self.consume_receiver) { diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 8d950d487b41fe..6751e9f90b79fe 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -2,20 +2,38 @@ use { solana_runtime::bank::Bank, solana_sdk::slot_history::Slot, std::{ - sync::{Arc, Condvar, Mutex, MutexGuard, Weak}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Condvar, Mutex, MutexGuard, Weak, + }, time::{Duration, Instant}, }, }; +const STAND_BY_SENTINEL_ID: u64 = u64::MAX; + /// Tracks leader status of the validator node and notifies when: /// 1. A leader bank initiates (=PoH-initiated) /// 2. A leader slot completes (=PoH-completed) -#[derive(Debug, Default)] +#[derive(Debug)] pub struct LeaderBankNotifier { /// Current state (slot, bank, and status) of the system state: Mutex, /// CondVar to notify status changes and waiting condvar: Condvar, + /// Lightweight atomic variable that can be used to check the id of the + /// latest leader bank + current_bank_id: AtomicU64, +} + +impl Default for LeaderBankNotifier { + fn default() -> Self { + Self { + state: Mutex::default(), + condvar: Condvar::default(), + current_bank_id: AtomicU64::new(STAND_BY_SENTINEL_ID), + } + } } /// Leader status state machine for the validator. @@ -43,6 +61,8 @@ impl LeaderBankNotifier { let mut state = self.state.lock().unwrap(); assert_eq!(state.status, Status::StandBy); + self.current_bank_id + .store(bank.bank_id(), Ordering::Relaxed); *state = SlotAndBankWithStatus { status: Status::InProgress, slot: Some(bank.slot()), @@ -61,12 +81,26 @@ impl LeaderBankNotifier { assert_eq!(state.status, Status::InProgress); assert_eq!(state.slot, Some(slot)); + self.current_bank_id + .store(STAND_BY_SENTINEL_ID, Ordering::Relaxed); state.status = Status::StandBy; drop(state); self.condvar.notify_all(); } + /// Fetch the bank id of the bank inside the mutex wrapped state field. Due + /// to the usage of relaxed ordering, this is not a guarantee that the + /// caller thread will see the updated bank in the mutex wrapped state yet. + pub fn get_current_bank_id(&self) -> Option { + let current_bank_id = self.current_bank_id.load(Ordering::Relaxed); + if current_bank_id == STAND_BY_SENTINEL_ID { + None + } else { + Some(current_bank_id) + } + } + /// If the status is `InProgress`, immediately return a weak reference to the bank. /// Otherwise, wait up to the `timeout` for the status to become `InProgress`. /// If the timeout is reached, the weak reference is unupgradable. @@ -124,6 +158,7 @@ mod tests { fn test_leader_bank_notifier_default() { let leader_bank_notifier = LeaderBankNotifier::default(); let state = leader_bank_notifier.state.lock().unwrap(); + assert_eq!(leader_bank_notifier.get_current_bank_id(), None); assert_eq!(state.status, Status::StandBy); assert_eq!(state.slot, None); assert!(state.bank.upgrade().is_none()); @@ -145,6 +180,10 @@ mod tests { leader_bank_notifier.set_in_progress(&bank); let state = leader_bank_notifier.state.lock().unwrap(); + assert_eq!( + leader_bank_notifier.get_current_bank_id(), + Some(bank.bank_id()) + ); assert_eq!(state.status, Status::InProgress); assert_eq!(state.slot, Some(bank.slot())); assert_eq!(state.bank.upgrade(), Some(bank)); @@ -184,6 +223,7 @@ mod tests { leader_bank_notifier.set_completed(bank.slot()); let state = leader_bank_notifier.state.lock().unwrap(); + assert_eq!(leader_bank_notifier.get_current_bank_id(), None); assert_eq!(state.status, Status::StandBy); assert_eq!(state.slot, Some(bank.slot())); assert_eq!(state.bank.upgrade(), Some(bank));