Skip to content

Commit

Permalink
preallocate_squash
Browse files Browse the repository at this point in the history
  • Loading branch information
jedleggett committed Feb 27, 2023
1 parent 383b788 commit 706915d
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 32 deletions.
8 changes: 6 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1656,8 +1656,12 @@ impl BankingStage {

let transaction_costs = qos_service.compute_transaction_costs(txs.iter());

let (transactions_qos_results, num_included) =
qos_service.select_transactions_per_cost(txs.iter(), transaction_costs.iter(), bank);
let (transactions_qos_results, num_included) = qos_service.select_transactions_per_cost(
txs.iter(),
transaction_costs.iter(),
bank.slot(),
&mut bank.write_cost_tracker().unwrap(),
);

let cost_model_throttled_transactions_count = txs.len().saturating_sub(num_included);

Expand Down
131 changes: 119 additions & 12 deletions core/src/bundle_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
TransactionBalancesSet, TransactionExecutionResult,
},
bank_utils,
block_cost_limits::{MAX_BLOCK_UNITS, MAX_VOTE_UNITS, MAX_WRITABLE_ACCOUNT_UNITS},
cost_model::{CostModel, TransactionCost},
transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
Expand Down Expand Up @@ -122,6 +123,48 @@ struct AllExecutionResults {
pub post_balances: (TransactionBalances, TransactionTokenBalances),
}

struct BundleReservedSpace {
current_tx_block_limit: u64,
current_bundle_block_limit: u64,
initial_allocated_cost: u64,
}

impl BundleReservedSpace {
fn reset_reserved_cost(&mut self, working_bank: &Arc<Bank>) {
self.current_bundle_block_limit = MAX_BLOCK_UNITS;
self.current_tx_block_limit = MAX_BLOCK_UNITS - self.initial_allocated_cost;

working_bank.write_cost_tracker().unwrap().set_limits(
MAX_WRITABLE_ACCOUNT_UNITS,
self.current_tx_block_limit,
MAX_VOTE_UNITS,
);

info!(
"Slot: {}. Cost Limits Reset. Bundle: {}, TX: {}",
working_bank.slot(),
self.current_bundle_block_limit,
self.current_tx_block_limit,
);
}

fn bundle_block_limit(&self) -> u64 {
self.current_bundle_block_limit
}

fn tx_block_limit(&self) -> u64 {
self.current_tx_block_limit
}

fn update_reserved_cost(&mut self, working_bank: &Arc<Bank>) {
if self.current_tx_block_limit != self.current_bundle_block_limit
&& working_bank.tick_height() > working_bank.ticks_per_slot() * 4 / 5
{
self.current_tx_block_limit = self.current_bundle_block_limit;
}
}
}

pub struct BundleStage {
bundle_thread: JoinHandle<()>,
}
Expand All @@ -140,6 +183,7 @@ impl BundleStage {
tip_manager: TipManager,
bundle_account_locker: BundleAccountLocker,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
preallocated_bundle_cost: u64,
) -> Self {
Self::start_bundle_thread(
cluster_info,
Expand All @@ -153,6 +197,7 @@ impl BundleStage {
bundle_account_locker,
MAX_BUNDLE_RETRY_DURATION,
block_builder_fee_info,
preallocated_bundle_cost,
)
}

Expand All @@ -169,6 +214,7 @@ impl BundleStage {
bundle_account_locker: BundleAccountLocker,
max_bundle_retry_duration: Duration,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
preallocated_bundle_cost: u64,
) -> Self {
const BUNDLE_STAGE_ID: u32 = 10_000;
let poh_recorder = poh_recorder.clone();
Expand All @@ -191,6 +237,7 @@ impl BundleStage {
bundle_account_locker,
max_bundle_retry_duration,
block_builder_fee_info,
preallocated_bundle_cost,
);
})
.unwrap();
Expand Down Expand Up @@ -292,16 +339,32 @@ impl BundleStage {
bank_start: &BankStart,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
max_bundle_retry_duration: &Duration,
reserved_space: &mut BundleReservedSpace,
) -> BundleStageResult<()> {
if sanitized_bundle.transactions.is_empty() {
return Ok(());
}

let tx_costs = qos_service.compute_transaction_costs(sanitized_bundle.transactions.iter());
let bundle_cost: u64 = tx_costs.iter().map(|c| c.sum()).sum();
let cost_tracker = &mut bank_start.working_bank.write_cost_tracker().unwrap();
// Increase blcok cost limit for bundles
cost_tracker.set_limits(
MAX_WRITABLE_ACCOUNT_UNITS,
reserved_space.bundle_block_limit(),
MAX_VOTE_UNITS,
);
let (transactions_qos_results, num_included) = qos_service.select_transactions_per_cost(
sanitized_bundle.transactions.iter(),
tx_costs.iter(),
&bank_start.working_bank,
bank_start.working_bank.slot(),
cost_tracker,
);
// Reset block cost limit for normal txs
cost_tracker.set_limits(
MAX_WRITABLE_ACCOUNT_UNITS,
reserved_space.tx_block_limit(),
MAX_VOTE_UNITS,
);

// accumulates QoS to metrics
Expand All @@ -312,14 +375,22 @@ impl BundleStage {
),
);

// either qos rate-limited a tx in here or bundle exceeds max cost, drop the bundle
// qos rate-limited a tx in here, drop the bundle
if sanitized_bundle.transactions.len() != num_included {
QosService::remove_transaction_costs(
tx_costs.iter(),
transactions_qos_results.iter(),
&bank_start.working_bank,
);
qos_service.report_metrics(bank_start.working_bank.clone());
warn!(
"Bundle dropped, QoS rate limit!! Bundle Cost {bundle_cost}, Block Cost {}",
&bank_start
.working_bank
.read_cost_tracker()
// Using unwrap here is scary, but that's the way it's done in QoS
.unwrap()
.block_cost()
);
return Err(BundleExecutionError::ExceedsCostModel);
}

Expand Down Expand Up @@ -885,6 +956,7 @@ impl BundleStage {
last_tip_update_slot: &mut Slot,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut BundleReservedSpace,
) {
let (sanitized_bundles, sanitized_bundle_elapsed) = measure!(
unprocessed_bundles
Expand Down Expand Up @@ -983,7 +1055,8 @@ impl BundleStage {
max_bundle_retry_duration,
last_tip_update_slot,
bundle_stage_leader_stats,
block_builder_fee_info
block_builder_fee_info,
reserved_space,
),
"execute_locked_bundles_elapsed"
);
Expand Down Expand Up @@ -1056,6 +1129,7 @@ impl BundleStage {
tip_manager: &TipManager,
max_bundle_retry_duration: &Duration,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
reserved_space: &mut BundleReservedSpace,
) -> BundleStageResult<()> {
let initialize_tip_accounts_bundle = SanitizedBundle {
transactions: Self::get_initialize_tip_accounts_transactions(
Expand All @@ -1079,6 +1153,7 @@ impl BundleStage {
bank_start,
bundle_stage_leader_stats,
max_bundle_retry_duration,
reserved_space,
);

match &result {
Expand Down Expand Up @@ -1118,6 +1193,7 @@ impl BundleStage {
max_bundle_retry_duration: &Duration,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut BundleReservedSpace,
) -> BundleStageResult<()> {
let start_handle_tips = Instant::now();

Expand Down Expand Up @@ -1155,6 +1231,7 @@ impl BundleStage {
bank_start,
bundle_stage_leader_stats,
max_bundle_retry_duration,
reserved_space,
);

bundle_stage_leader_stats
Expand Down Expand Up @@ -1198,6 +1275,7 @@ impl BundleStage {
last_tip_update_slot: &mut Slot,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut BundleReservedSpace,
) -> Vec<BundleStageResult<()>> {
let tip_pdas = tip_manager.get_tip_accounts();

Expand Down Expand Up @@ -1235,6 +1313,7 @@ impl BundleStage {
tip_manager,
max_bundle_retry_duration,
bundle_stage_leader_stats,
reserved_space,
)?;

Self::maybe_change_tip_receiver(
Expand All @@ -1249,6 +1328,7 @@ impl BundleStage {
max_bundle_retry_duration,
bundle_stage_leader_stats,
block_builder_fee_info,
reserved_space,
)?;

*last_tip_update_slot = bank_start.working_bank.slot();
Expand All @@ -1263,6 +1343,7 @@ impl BundleStage {
bank_start,
bundle_stage_leader_stats,
max_bundle_retry_duration,
reserved_space,
)
}
})
Expand Down Expand Up @@ -1302,6 +1383,7 @@ impl BundleStage {
bundle_stage_stats: &mut BundleStageLoopStats,
id: u32,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut BundleReservedSpace,
) {
const DROP_BUNDLE_SLOT_OFFSET: u64 = 4;

Expand Down Expand Up @@ -1335,13 +1417,19 @@ impl BundleStage {
(None, Some(_)) => true,
(_, _) => false,
};
if is_new_slot && !cost_model_failed_bundles.is_empty() {
debug!(
"Slot {}: Re-buffering {} bundles that failed cost model!",
&bank_start.working_bank.slot(),
cost_model_failed_bundles.len()
);
unprocessed_bundles.extend(cost_model_failed_bundles.drain(..));
if is_new_slot {
reserved_space.reset_reserved_cost(&bank_start.working_bank);
// Re-Buffer any bundles that didn't fit into last block
if !cost_model_failed_bundles.is_empty() {
debug!(
"Slot {}: Re-buffering {} bundles that failed cost model!",
&bank_start.working_bank.slot(),
cost_model_failed_bundles.len()
);
unprocessed_bundles.extend(cost_model_failed_bundles.drain(..));
}
} else {
reserved_space.update_reserved_cost(&bank_start.working_bank);
}

Self::execute_bundles_until_empty_or_end_of_slot(
Expand All @@ -1361,6 +1449,7 @@ impl BundleStage {
last_tip_update_slot,
bundle_stage_leader_stats.bundle_stage_leader_stats(),
block_builder_fee_info,
reserved_space,
);
}
}
Expand All @@ -1379,6 +1468,7 @@ impl BundleStage {
bundle_account_locker: BundleAccountLocker,
max_bundle_retry_duration: Duration,
block_builder_fee_info: Arc<Mutex<BlockBuilderFeeInfo>>,
preallocated_bundle_cost: u64,
) {
const LOOP_STATS_METRICS_PERIOD: Duration = Duration::from_secs(1);

Expand All @@ -1399,6 +1489,12 @@ impl BundleStage {

let mut unprocessed_bundles: VecDeque<PacketBundle> = VecDeque::with_capacity(1000);
let mut cost_model_failed_bundles: VecDeque<PacketBundle> = VecDeque::with_capacity(1000);
let mut reserved_space = BundleReservedSpace {
current_bundle_block_limit: MAX_BLOCK_UNITS,
current_tx_block_limit: MAX_BLOCK_UNITS - preallocated_bundle_cost,
initial_allocated_cost: preallocated_bundle_cost,
};

while !exit.load(Ordering::Relaxed) {
if !unprocessed_bundles.is_empty()
|| last_leader_slots_update_time.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
Expand All @@ -1422,7 +1518,8 @@ impl BundleStage {
&mut bundle_stage_leader_stats,
&mut bundle_stage_stats,
id,
&block_builder_fee_info
&block_builder_fee_info,
&mut reserved_space,
),
"process_buffered_bundles_elapsed"
);
Expand Down Expand Up @@ -1642,6 +1739,11 @@ mod tests {
&bank_start,
&mut bundle_stage_leader_stats,
&TEST_MAX_RETRY_DURATION,
&mut BundleReservedSpace {
current_tx_block_limit: MAX_BLOCK_UNITS,
current_bundle_block_limit: MAX_BLOCK_UNITS,
initial_allocated_cost: 0,
},
);

// This is ugly, not really an option for testing but a test itself.
Expand Down Expand Up @@ -1973,6 +2075,11 @@ mod tests {
&bank_start,
&mut bundle_stage_leader_stats,
&TEST_MAX_RETRY_DURATION,
&mut BundleReservedSpace {
current_tx_block_limit: MAX_BLOCK_UNITS,
current_bundle_block_limit: MAX_BLOCK_UNITS,
initial_allocated_cost: 0,
},
);
info!("test_bundle_max_retries result: {:?}", result);
assert!(matches!(
Expand Down
9 changes: 9 additions & 0 deletions core/src/leader_slot_banking_stage_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,15 @@ impl LeaderSlotMetricsTracker {

// Our leader slot has begain, time to create a new slot tracker
(None, Some(bank_start)) => {
info!(
"Fuck. New Slot: {}, Block Clost: {}",
bank_start.working_bank.slot(),
bank_start
.working_bank
.read_cost_tracker()
.unwrap()
.block_cost()
);
MetricsTrackerAction::NewTracker(Some(LeaderSlotMetrics::new(
self.id,
bank_start.working_bank.slot(),
Expand Down
Loading

0 comments on commit 706915d

Please sign in to comment.