Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

TransactionScheduler: CLI and hookup for central-scheduler #33890

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 206 additions & 31 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@ use {
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
},
crate::{
banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats,
banking_stage::{
consume_worker::ConsumeWorker,
packet_deserializer::PacketDeserializer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
},
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
validator::BlockProductionMethod,
},
crossbeam_channel::RecvTimeoutError,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::{measure, measure_us},
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::PohRecorder,
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::timing::AtomicInterval,
solana_vote::vote_sender_types::ReplayVoteSender,
Expand Down Expand Up @@ -378,6 +387,20 @@ impl BankingStage {
prioritization_fee_cache,
)
}
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
cluster_info,
poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
),
}
}

Expand Down Expand Up @@ -405,6 +428,15 @@ impl BankingStage {
TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|id| {
Expand Down Expand Up @@ -432,48 +464,182 @@ impl BankingStage {
),
};

let mut packet_receiver =
PacketReceiver::new(id, packet_receiver, bank_forks.clone());
let poh_recorder = poh_recorder.clone();

let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let forwarder = Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
);
let consumer = Consumer::new(
committer,

Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
forwarder,
unprocessed_transaction_storage,
)
})
.collect();
Self { bank_thread_hdls }
}

#[allow(clippy::too_many_arguments)]
pub fn new_central_scheduler(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
gossip_vote_receiver: BankingPacketReceiver,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let data_budget = Arc::new(DataBudget::default());
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

// + 1 for the central scheduler thread
let mut bank_thread_hdls = Vec::with_capacity(num_threads as usize + 1);

// Spawn legacy voting threads first: 1 gossip, 1 tpu
for (id, packet_receiver, vote_source) in [
(0, gossip_vote_receiver, VoteSource::Gossip),
(1, tpu_vote_receiver, VoteSource::Tpu),
] {
bank_thread_hdls.push(Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
vote_source,
),
));
}

// Create channels for communication between scheduler and workers
let num_workers = (num_threads).saturating_sub(NUM_VOTE_PROCESSING_THREADS);
let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
(0..num_workers).map(|_| unbounded()).unzip();
let (finished_work_sender, finished_work_receiver) = unbounded();

// Spawn the worker threads
for (index, work_receiver) in work_receivers.into_iter().enumerate() {
let id = (index as u32).saturating_add(NUM_VOTE_PROCESSING_THREADS);
let consume_worker = ConsumeWorker::new(
work_receiver,
Consumer::new(
committer.clone(),
poh_recorder.read().unwrap().new_recorder(),
QosService::new(id),
log_messages_bytes_limit,
);
),
finished_work_sender.clone(),
poh_recorder.read().unwrap().new_leader_bank_notifier(),
);

bank_thread_hdls.push(
Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
Self::process_loop(
&mut packet_receiver,
&decision_maker,
&forwarder,
&consumer,
id,
unprocessed_transaction_storage,
);
let _ = consume_worker.run();
})
.unwrap()
})
.collect();
.unwrap(),
)
}

// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer =
PacketDeserializer::new(non_vote_receiver, bank_forks.clone());
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
bank_forks,
scheduler,
);
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
})
.unwrap()
});

Self { bank_thread_hdls }
}

fn spawn_thread_local_multi_iterator_thread(
id: u32,
packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
forwarder: Forwarder,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
let consumer = Consumer::new(
committer,
transaction_recorder,
QosService::new(id),
log_messages_bytes_limit,
);

Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.spawn(move || {
Self::process_loop(
&mut packet_receiver,
&decision_maker,
&forwarder,
&consumer,
id,
unprocessed_transaction_storage,
)
})
.unwrap()
}

#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
decision_maker: &DecisionMaker,
Expand Down Expand Up @@ -793,8 +959,7 @@ mod tests {
with_vers.into_iter().map(|(b, _)| b).collect()
}

#[test]
fn test_banking_stage_entries_only() {
fn test_banking_stage_entries_only(block_production_method: BlockProductionMethod) {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
Expand Down Expand Up @@ -829,7 +994,7 @@ mod tests {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();

let banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
block_production_method,
&cluster_info,
&poh_recorder,
non_vote_receiver,
Expand Down Expand Up @@ -922,6 +1087,16 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_banking_stage_entries_only_thread_local_multi_iterator() {
test_banking_stage_entries_only(BlockProductionMethod::ThreadLocalMultiIterator);
}

#[test]
fn test_banking_stage_entries_only_central_scheduler() {
test_banking_stage_entries_only(BlockProductionMethod::CentralScheduler);
}

#[test]
fn test_banking_stage_entryfication() {
solana_logger::setup();
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub(super) struct PreBalanceInfo {
pub mint_decimals: HashMap<Pubkey, u8>,
}

#[derive(Clone)]
pub struct Committer {
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/decision_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl BufferedPacketsDecision {
}
}

#[derive(Clone)]
pub struct DecisionMaker {
my_pubkey: Pubkey,
poh_recorder: Arc<RwLock<PohRecorder>>,
Expand Down
18 changes: 6 additions & 12 deletions core/src/banking_stage/transaction_scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
mod batch_id_generator;
#[allow(dead_code)]
mod in_flight_tracker;
pub(crate) mod prio_graph_scheduler;
pub(crate) mod scheduler_controller;
pub(crate) mod scheduler_error;
mod thread_aware_account_locks;

mod transaction_id_generator;
mod transaction_priority_id;
#[allow(dead_code)]
mod transaction_state;
#[allow(dead_code)]
mod transaction_state_container;

mod batch_id_generator;
#[allow(dead_code)]
mod in_flight_tracker;
#[allow(dead_code)]
mod prio_graph_scheduler;
#[allow(dead_code)]
mod scheduler_controller;
mod scheduler_error;
#[allow(dead_code)]
mod transaction_id_generator;
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl BlockVerificationMethod {
pub enum BlockProductionMethod {
#[default]
ThreadLocalMultiIterator,
CentralScheduler,
}

impl BlockProductionMethod {
Expand Down