Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.1: fix: use atomic to check if leader bank changed (backport of #4596) #4612

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ impl ConsumeWorker {
.fetch_add(get_bank_us, Ordering::Relaxed);

for work in try_drain_iter(work, &self.consume_receiver) {
if bank.is_complete() {
if bank.is_complete() || {
// check if the bank got interrupted before completion
self.get_consume_bank_id() != Some(bank.bank_id())
} {
let (maybe_new_bank, get_bank_us) = measure_us!(self.get_consume_bank());
if let Some(new_bank) = maybe_new_bank {
self.metrics
Expand Down Expand Up @@ -129,6 +132,11 @@ impl ConsumeWorker {
.upgrade()
}

/// Try to get the id for the bank that should be used for consuming
fn get_consume_bank_id(&self) -> Option<u64> {
self.leader_bank_notifier.get_current_bank_id()
}

/// Retry current batch and all outstanding batches.
fn retry_drain(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
for work in try_drain_iter(work, &self.consume_receiver) {
Expand Down
44 changes: 42 additions & 2 deletions poh/src/leader_bank_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ use {
solana_runtime::bank::Bank,
solana_sdk::slot_history::Slot,
std::{
sync::{Arc, Condvar, Mutex, MutexGuard, Weak},
sync::{
atomic::{AtomicU64, Ordering},
Arc, Condvar, Mutex, MutexGuard, Weak,
},
time::{Duration, Instant},
},
};

const STAND_BY_SENTINEL_ID: u64 = u64::MAX;

/// Tracks leader status of the validator node and notifies when:
/// 1. A leader bank initiates (=PoH-initiated)
/// 2. A leader slot completes (=PoH-completed)
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct LeaderBankNotifier {
/// Current state (slot, bank, and status) of the system
state: Mutex<SlotAndBankWithStatus>,
/// CondVar to notify status changes and waiting
condvar: Condvar,
/// Lightweight atomic variable that can be used to check the id of the
/// latest leader bank
current_bank_id: AtomicU64,
}

impl Default for LeaderBankNotifier {
fn default() -> Self {
Self {
state: Mutex::default(),
condvar: Condvar::default(),
current_bank_id: AtomicU64::new(STAND_BY_SENTINEL_ID),
}
}
}

/// Leader status state machine for the validator.
Expand Down Expand Up @@ -43,6 +61,8 @@ impl LeaderBankNotifier {
let mut state = self.state.lock().unwrap();
assert_eq!(state.status, Status::StandBy);

self.current_bank_id
.store(bank.bank_id(), Ordering::Relaxed);
*state = SlotAndBankWithStatus {
status: Status::InProgress,
slot: Some(bank.slot()),
Expand All @@ -61,12 +81,26 @@ impl LeaderBankNotifier {
assert_eq!(state.status, Status::InProgress);
assert_eq!(state.slot, Some(slot));

self.current_bank_id
.store(STAND_BY_SENTINEL_ID, Ordering::Relaxed);
state.status = Status::StandBy;
drop(state);

self.condvar.notify_all();
}

/// Fetch the bank id of the bank inside the mutex wrapped state field. Due
/// to the usage of relaxed ordering, this is not a guarantee that the
/// caller thread will see the updated bank in the mutex wrapped state yet.
pub fn get_current_bank_id(&self) -> Option<u64> {
let current_bank_id = self.current_bank_id.load(Ordering::Relaxed);
if current_bank_id == STAND_BY_SENTINEL_ID {
None
} else {
Some(current_bank_id)
}
}

/// If the status is `InProgress`, immediately return a weak reference to the bank.
/// Otherwise, wait up to the `timeout` for the status to become `InProgress`.
/// If the timeout is reached, the weak reference is unupgradable.
Expand Down Expand Up @@ -124,6 +158,7 @@ mod tests {
fn test_leader_bank_notifier_default() {
let leader_bank_notifier = LeaderBankNotifier::default();
let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
assert_eq!(state.status, Status::StandBy);
assert_eq!(state.slot, None);
assert!(state.bank.upgrade().is_none());
Expand All @@ -145,6 +180,10 @@ mod tests {
leader_bank_notifier.set_in_progress(&bank);

let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(
leader_bank_notifier.get_current_bank_id(),
Some(bank.bank_id())
);
assert_eq!(state.status, Status::InProgress);
assert_eq!(state.slot, Some(bank.slot()));
assert_eq!(state.bank.upgrade(), Some(bank));
Expand Down Expand Up @@ -184,6 +223,7 @@ mod tests {
leader_bank_notifier.set_completed(bank.slot());

let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
assert_eq!(state.status, Status::StandBy);
assert_eq!(state.slot, Some(bank.slot()));
assert_eq!(state.bank.upgrade(), Some(bank));
Expand Down
Loading