Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

TransactionScheduler: Schedule Filter #34252

Merged
merged 4 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 85 additions & 28 deletions core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
crossbeam_channel::{Receiver, Sender, TryRecvError},
itertools::izip,
prio_graph::{AccessKind, PrioGraph},
solana_measure::measure_us,
solana_sdk::{
pubkey::Pubkey, saturating_add_assign, slot_history::Slot,
transaction::SanitizedTransaction,
Expand Down Expand Up @@ -47,6 +48,8 @@ impl PrioGraphScheduler {

/// Schedule transactions from the given `TransactionStateContainer` to be consumed by the
/// worker threads. Returns summary of scheduling, or an error.
/// `filter` is used to filter out transactions that should be skipped and dropped, and
/// should set `false` for transactions that should be dropped, and `true` otherwise.
///
/// Uses a `PrioGraph` to perform look-ahead during the scheduling of transactions.
/// This, combined with internal tracking of threads' in-flight transactions, allows
Expand All @@ -55,6 +58,7 @@ impl PrioGraphScheduler {
pub(crate) fn schedule(
&mut self,
container: &mut TransactionStateContainer,
filter: impl Fn(&[&SanitizedTransaction], &mut [bool]),
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let mut batches = Batches::new(num_threads);
Expand All @@ -67,15 +71,60 @@ impl PrioGraphScheduler {
let mut blocking_locks = ReadWriteAccountSet::default();
let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id);

// Create the initial look-ahead window.
for _ in 0..self.look_ahead_window_size {
let Some(id) = container.pop() else {
break;
};
// Track metrics on filter.
let mut num_filtered_out: usize = 0;
let mut total_filter_time_us: u64 = 0;

let mut window_budget = self.look_ahead_window_size;
let mut chunked_pops = |container: &mut TransactionStateContainer,
prio_graph: &mut PrioGraph<_, _, _, _>,
window_budget: &mut usize| {
while *window_budget > 0 {
const MAX_FILTER_CHUNK_SIZE: usize = 128;
let mut filter_array = [true; MAX_FILTER_CHUNK_SIZE];
let mut ids = Vec::with_capacity(MAX_FILTER_CHUNK_SIZE);
let mut txs = Vec::with_capacity(MAX_FILTER_CHUNK_SIZE);

let chunk_size = (*window_budget).min(MAX_FILTER_CHUNK_SIZE);
for _ in 0..chunk_size {
if let Some(id) = container.pop() {
ids.push(id);
} else {
break;
}
}
*window_budget = window_budget.saturating_sub(chunk_size);

ids.iter().for_each(|id| {
let transaction = container.get_transaction_ttl(&id.id).unwrap();
txs.push(&transaction.transaction);
});

let (_, filter_us) = measure_us!(filter(&txs, &mut filter_array[..chunk_size]));
saturating_add_assign!(total_filter_time_us, filter_us);

for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) {
if *filter_result {
let transaction = container.get_transaction_ttl(&id.id).unwrap();
prio_graph.insert_transaction(
*id,
Self::get_transaction_account_access(transaction),
);
} else {
saturating_add_assign!(num_filtered_out, 1);
container.remove_by_id(&id.id);
}
}

let transaction = container.get_transaction_ttl(&id.id).unwrap();
prio_graph.insert_transaction(id, Self::get_transaction_account_access(transaction));
}
if ids.len() != chunk_size {
break;
}
}
};

// Create the initial look-ahead window.
// Check transactions against filter, remove from container if it fails.
chunked_pops(container, &mut prio_graph, &mut window_budget);

let mut unblock_this_batch =
Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH);
Expand All @@ -92,15 +141,6 @@ impl PrioGraphScheduler {
while let Some(id) = prio_graph.pop() {
unblock_this_batch.push(id);

// Push next transaction from container into the `PrioGraph` look-ahead window.
if let Some(next_id) = container.pop() {
let transaction = container.get_transaction_ttl(&next_id.id).unwrap();
prio_graph.insert_transaction(
next_id,
Self::get_transaction_account_access(transaction),
);
}

// Should always be in the container, during initial testing phase panic.
// Later, we can replace with a continue in case this does happen.
let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else {
Expand Down Expand Up @@ -175,6 +215,10 @@ impl PrioGraphScheduler {
// Send all non-empty batches
saturating_add_assign!(num_sent, self.send_batches(&mut batches)?);

// Refresh window budget and do chunked pops
saturating_add_assign!(window_budget, unblock_this_batch.len());
chunked_pops(container, &mut prio_graph, &mut window_budget);

// Unblock all transactions that were blocked by the transactions that were just sent.
for id in unblock_this_batch.drain(..) {
prio_graph.unblock(&id);
Expand Down Expand Up @@ -202,6 +246,8 @@ impl PrioGraphScheduler {
Ok(SchedulingSummary {
num_scheduled,
num_unschedulable,
num_filtered_out,
filter_time_us: total_filter_time_us,
})
}

Expand Down Expand Up @@ -393,6 +439,10 @@ pub(crate) struct SchedulingSummary {
pub num_scheduled: usize,
/// Number of transactions that were not scheduled due to conflicts.
pub num_unschedulable: usize,
/// Number of transactions that were dropped due to filter.
pub num_filtered_out: usize,
/// Time spent filtering transactions
pub filter_time_us: u64,
}

struct Batches {
Expand Down Expand Up @@ -551,14 +601,18 @@ mod tests {
.unzip()
}

fn test_filter(_txs: &[&SanitizedTransaction], results: &mut [bool]) {
results.fill(true);
}

#[test]
fn test_schedule_disconnected_channel() {
let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1);
let mut container = create_container([(&Keypair::new(), &[Pubkey::new_unique()], 1, 1)]);

drop(work_receivers); // explicitly drop receivers
assert_matches!(
scheduler.schedule(&mut container),
scheduler.schedule(&mut container, test_filter),
Err(SchedulerError::DisconnectedSendChannel(_))
);
}
Expand All @@ -571,7 +625,7 @@ mod tests {
(&Keypair::new(), &[Pubkey::new_unique()], 2, 2),
]);

let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]);
Expand All @@ -586,7 +640,7 @@ mod tests {
(&Keypair::new(), &[pubkey], 1, 2),
]);

let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(
Expand All @@ -604,7 +658,7 @@ mod tests {
);

// expect 4 full batches to be scheduled
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(
scheduling_summary.num_scheduled,
4 * TARGET_NUM_TRANSACTIONS_PER_BATCH
Expand All @@ -624,7 +678,7 @@ mod tests {
let mut container =
create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i)));

let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 4);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]);
Expand Down Expand Up @@ -656,7 +710,7 @@ mod tests {
// fact they eventually join means that the scheduler will schedule them
// onto the same thread to avoid causing [4], which conflicts with both
// chains, to be un-schedulable.
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 5);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(
Expand Down Expand Up @@ -697,15 +751,18 @@ mod tests {
// Because the look-ahead window is shortened to a size of 4, the scheduler does
// not have knowledge of the joining at transaction [4] until after [0] and [1]
// have been scheduled.
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 4);
assert_eq!(scheduling_summary.num_unschedulable, 2);
let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]);
assert_eq!(thread_0_ids, [txids!([0, 2])]);
assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1, 3])]);
assert_eq!(thread_0_ids, [txids!([0]), txids!([2])]);
assert_eq!(
collect_work(&work_receivers[1]).1,
[txids!([1]), txids!([3])]
);

// Cannot schedule even on next pass because of lock conflicts
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 0);
assert_eq!(scheduling_summary.num_unschedulable, 2);

Expand All @@ -717,7 +774,7 @@ mod tests {
})
.unwrap();
scheduler.receive_completed(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
crossbeam_channel::RecvTimeoutError,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_measure::measure_us,
solana_runtime::bank_forks::BankForks,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::MAX_PROCESSING_AGE, saturating_add_assign, timing::AtomicInterval,
transaction::SanitizedTransaction,
Expand Down Expand Up @@ -113,9 +113,13 @@ impl SchedulerController {
decision: &BufferedPacketsDecision,
) -> Result<(), SchedulerError> {
match decision {
BufferedPacketsDecision::Consume(_bank_start) => {
BufferedPacketsDecision::Consume(bank_start) => {
let (scheduling_summary, schedule_time_us) =
measure_us!(self.scheduler.schedule(&mut self.container)?);
measure_us!(self
.scheduler
.schedule(&mut self.container, |txs, results| {
Self::pre_scheduling_filter(txs, results, &bank_start.working_bank)
})?);
saturating_add_assign!(
self.count_metrics.num_scheduled,
scheduling_summary.num_scheduled
Expand All @@ -124,6 +128,14 @@ impl SchedulerController {
self.count_metrics.num_unschedulable,
scheduling_summary.num_unschedulable
);
saturating_add_assign!(
self.count_metrics.num_schedule_filtered_out,
scheduling_summary.num_filtered_out
);
saturating_add_assign!(
self.timing_metrics.schedule_filter_time_us,
scheduling_summary.filter_time_us
);
saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us);
}
BufferedPacketsDecision::Forward => {
Expand All @@ -140,6 +152,25 @@ impl SchedulerController {
Ok(())
}

fn pre_scheduling_filter(
transactions: &[&SanitizedTransaction],
results: &mut [bool],
bank: &Bank,
) {
let lock_results = vec![Ok(()); transactions.len()];
let mut error_counters = TransactionErrorMetrics::default();
let check_results = bank.check_transactions(
transactions,
&lock_results,
MAX_PROCESSING_AGE,
&mut error_counters,
);

for ((check_result, _), result) in check_results.into_iter().zip(results.iter_mut()) {
*result = check_result.is_ok();
}
}

/// Clears the transaction state container.
/// This only clears pending transactions, and does **not** clear in-flight transactions.
fn clear_container(&mut self) {
Expand Down Expand Up @@ -315,6 +346,8 @@ struct SchedulerCountMetrics {
num_scheduled: usize,
/// Number of transactions that were unschedulable.
num_unschedulable: usize,
/// Number of transactions that were filtered out during scheduling.
num_schedule_filtered_out: usize,
/// Number of completed transactions received from workers.
num_finished: usize,
/// Number of transactions that were retryable.
Expand Down Expand Up @@ -352,6 +385,11 @@ impl SchedulerCountMetrics {
("num_buffered", self.num_buffered, i64),
("num_scheduled", self.num_scheduled, i64),
("num_unschedulable", self.num_unschedulable, i64),
(
"num_schedule_filtered_out",
self.num_schedule_filtered_out,
i64
),
("num_finished", self.num_finished, i64),
("num_retryable", self.num_retryable, i64),
("num_dropped_on_receive", self.num_dropped_on_receive, i64),
Expand Down Expand Up @@ -380,6 +418,7 @@ impl SchedulerCountMetrics {
|| self.num_buffered != 0
|| self.num_scheduled != 0
|| self.num_unschedulable != 0
|| self.num_schedule_filtered_out != 0
|| self.num_finished != 0
|| self.num_retryable != 0
|| self.num_dropped_on_receive != 0
Expand All @@ -395,6 +434,7 @@ impl SchedulerCountMetrics {
self.num_buffered = 0;
self.num_scheduled = 0;
self.num_unschedulable = 0;
self.num_schedule_filtered_out = 0;
self.num_finished = 0;
self.num_retryable = 0;
self.num_dropped_on_receive = 0;
Expand All @@ -415,6 +455,8 @@ struct SchedulerTimingMetrics {
receive_time_us: u64,
/// Time spent buffering packets.
buffer_time_us: u64,
/// Time spent filtering transactions during scheduling.
schedule_filter_time_us: u64,
/// Time spent scheduling transactions.
schedule_time_us: u64,
/// Time spent clearing transactions from the container.
Expand Down Expand Up @@ -442,6 +484,7 @@ impl SchedulerTimingMetrics {
("decision_time_us", self.decision_time_us, i64),
("receive_time_us", self.receive_time_us, i64),
("buffer_time_us", self.buffer_time_us, i64),
("schedule_filter_time_us", self.schedule_filter_time_us, i64),
("schedule_time_us", self.schedule_time_us, i64),
("clear_time_us", self.clear_time_us, i64),
("clean_time_us", self.clean_time_us, i64),
Expand All @@ -457,6 +500,7 @@ impl SchedulerTimingMetrics {
self.decision_time_us = 0;
self.receive_time_us = 0;
self.buffer_time_us = 0;
self.schedule_filter_time_us = 0;
self.schedule_time_us = 0;
self.clear_time_us = 0;
self.clean_time_us = 0;
Expand Down