Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BundleAccountLocker when handling tip txs #147

Merged
merged 3 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bootstrap
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ RUST_LOG=INFO \
NDEBUG=1 ./multinode-demo/bootstrap-validator.sh \
--wait-for-supermajority 0 \
--expected-bank-hash $bank_hash \
--block-engine-address http://0.0.0.0:13333 \
--block-engine-auth-service-address http://0.0.0.0:14444 \
--relayer-auth-service-address http://0.0.0.0:11226 \
--relayer-address http://0.0.0.0:11226 \
--block-engine-address http://127.0.0.1:1003 \
--block-engine-auth-service-address http://127.0.0.1:1005 \
--relayer-auth-service-address http://127.0.0.1:11226 \
--relayer-address http://127.0.0.1:11226 \
--rpc-pubsub-enable-block-subscription \
--enable-rpc-transaction-history \
--tip-payment-program-pubkey DThZmRNNXh7kvTQW9hXeGoWGPKktK8pgVAyoTLjH7UrT \
--tip-distribution-program-pubkey FjrdANjvo76aCYQ4kf9FM1R8aESUcEE6F8V7qyoVUQcM \
--commission-bps 0 \
--shred-receiver-address 0.0.0.0:13330 \
--shred-receiver-address 127.0.0.1:1002 \
--allow-private-addr \
--trust-relayer-packets \
--trust-block-engine-packets
--trust-block-engine-packets
22 changes: 11 additions & 11 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,19 +1532,19 @@ impl BankingStage {
// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state.
// BundleStage prevents locking ALL accounts in ALL transactions in a bundle mid-execution
// to ensure that avoid race conditions
let mut lock_time = Measure::start("lock_time");

let read_locks = bundle_account_locker.read_locks();
let write_locks = bundle_account_locker.write_locks();

let batch = bank.prepare_sanitized_batch_with_results(
txs,
transactions_qos_results.iter(),
&read_locks,
&write_locks,
);
let batch = {
// BundleStage locks ALL accounts in ALL transactions in a bundle to avoid race
// conditions with BankingStage
let account_locks = bundle_account_locker.account_locks();
bank.prepare_sanitized_batch_with_results(
txs,
transactions_qos_results.iter(),
&account_locks.read_locks(),
&account_locks.write_locks(),
)
};
lock_time.stop();

// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
Expand Down
79 changes: 50 additions & 29 deletions core/src/bundle_account_locker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
/// and commit the results before the bundle completes. By the time the bundle commits the new account
/// state for {A, B, C}, A and B would be incorrect and the entries containing the bundle would be
/// replayed improperly and that leader would have produced an invalid block.
use std::sync::{Arc, Mutex};
use {
solana_runtime::bank::Bank,
solana_sdk::{
bundle::sanitized::SanitizedBundle, pubkey::Pubkey, transaction::TransactionAccountLocks,
},
std::collections::{hash_map::Entry, HashMap, HashSet},
std::sync::{Arc, Mutex, MutexGuard},
};

#[derive(Debug)]
Expand All @@ -27,16 +28,19 @@ pub type BundleAccountLockerResult<T> = Result<T, BundleAccountLockerError>;
pub struct LockedBundle<'a, 'b> {
bundle_account_locker: &'a BundleAccountLocker,
sanitized_bundle: &'b SanitizedBundle,
bank: Arc<Bank>,
}

impl<'a, 'b> LockedBundle<'a, 'b> {
pub fn new(
bundle_account_locker: &'a BundleAccountLocker,
sanitized_bundle: &'b SanitizedBundle,
bank: &Arc<Bank>,
) -> Self {
Self {
bundle_account_locker,
sanitized_bundle,
bank: bank.clone(),
}
}

Expand All @@ -50,119 +54,136 @@ impl<'a, 'b> Drop for LockedBundle<'a, 'b> {
fn drop(&mut self) {
let _ = self
.bundle_account_locker
.unlock_bundle_accounts(self.sanitized_bundle);
.unlock_bundle_accounts(self.sanitized_bundle, &self.bank);
}
}

#[derive(Default, Clone)]
struct BundleAccountLocks {
read_locks: Arc<Mutex<HashMap<Pubkey, u64>>>,
write_locks: Arc<Mutex<HashMap<Pubkey, u64>>>,
pub struct BundleAccountLocks {
read_locks: HashMap<Pubkey, u64>,
write_locks: HashMap<Pubkey, u64>,
}

impl BundleAccountLocks {
pub fn read_locks(&self) -> HashSet<Pubkey> {
self.read_locks.lock().unwrap().keys().cloned().collect()
self.read_locks.keys().cloned().collect()
}

pub fn write_locks(&self) -> HashSet<Pubkey> {
self.write_locks.lock().unwrap().keys().cloned().collect()
self.write_locks.keys().cloned().collect()
}

pub fn lock_accounts(
&self,
&mut self,
read_locks: HashMap<Pubkey, u64>,
write_locks: HashMap<Pubkey, u64>,
) {
let mut read_locks_l = self.read_locks.lock().unwrap();
let mut write_locks_l = self.write_locks.lock().unwrap();
for (acc, count) in read_locks {
*read_locks_l.entry(acc).or_insert(0) += count;
*self.read_locks.entry(acc).or_insert(0) += count;
}
for (acc, count) in write_locks {
*write_locks_l.entry(acc).or_insert(0) += count;
*self.write_locks.entry(acc).or_insert(0) += count;
}
}

pub fn unlock_accounts(
&self,
&mut self,
read_locks: HashMap<Pubkey, u64>,
write_locks: HashMap<Pubkey, u64>,
) {
let mut read_locks_l = self.read_locks.lock().unwrap();
let mut write_locks_l = self.write_locks.lock().unwrap();

for (acc, count) in read_locks {
if let Entry::Occupied(mut entry) = read_locks_l.entry(acc) {
if let Entry::Occupied(mut entry) = self.read_locks.entry(acc) {
let val = entry.get_mut();
*val = val.saturating_sub(count);
if entry.get() == &0 {
let _ = entry.remove();
}
} else {
warn!("error unlocking read-locked account, account: {:?}", acc);
}
}
for (acc, count) in write_locks {
if let Entry::Occupied(mut entry) = write_locks_l.entry(acc) {
if let Entry::Occupied(mut entry) = self.write_locks.entry(acc) {
let val = entry.get_mut();
*val = val.saturating_sub(count);
if entry.get() == &0 {
let _ = entry.remove();
}
} else {
warn!("error unlocking write-locked account, account: {:?}", acc);
}
}
}
}

#[derive(Clone, Default)]
pub struct BundleAccountLocker {
account_locks: BundleAccountLocks,
account_locks: Arc<Mutex<BundleAccountLocks>>,
}

impl BundleAccountLocker {
/// used in BankingStage during TransactionBatch construction to ensure that BankingStage
/// doesn't lock anything currently locked in the BundleAccountLocker
pub fn read_locks(&self) -> HashSet<Pubkey> {
self.account_locks.read_locks()
self.account_locks.lock().unwrap().read_locks()
}

/// used in BankingStage during TransactionBatch construction to ensure that BankingStage
/// doesn't lock anything currently locked in the BundleAccountLocker
pub fn write_locks(&self) -> HashSet<Pubkey> {
self.account_locks.write_locks()
self.account_locks.lock().unwrap().write_locks()
}

/// used in BankingStage during TransactionBatch construction to ensure that BankingStage
/// doesn't lock anything currently locked in the BundleAccountLocker
pub fn account_locks(&self) -> MutexGuard<BundleAccountLocks> {
self.account_locks.lock().unwrap()
}

/// Prepares a locked bundle and returns a LockedBundle containing locked accounts.
/// When a LockedBundle is dropped, the accounts are automatically unlocked
pub fn prepare_locked_bundle<'a, 'b>(
&'a self,
sanitized_bundle: &'b SanitizedBundle,
bank: &Arc<Bank>,
) -> BundleAccountLockerResult<LockedBundle<'a, 'b>> {
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle)?;
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?;

self.account_locks.lock_accounts(read_locks, write_locks);
Ok(LockedBundle::new(self, sanitized_bundle))
self.account_locks
.lock()
.unwrap()
.lock_accounts(read_locks, write_locks);
Ok(LockedBundle::new(self, sanitized_bundle, bank))
}

/// Unlocks bundle accounts. Note that LockedBundle::drop will auto-drop the bundle account locks
fn unlock_bundle_accounts(
&self,
sanitized_bundle: &SanitizedBundle,
bank: &Bank,
) -> BundleAccountLockerResult<()> {
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle)?;
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?;

self.account_locks.unlock_accounts(read_locks, write_locks);
self.account_locks
.lock()
.unwrap()
.unlock_accounts(read_locks, write_locks);
Ok(())
}

/// Returns the read and write locks for this bundle
/// Each lock type contains a HashMap which maps Pubkey to number of locks held
fn get_read_write_locks(
bundle: &SanitizedBundle,
bank: &Bank,
) -> BundleAccountLockerResult<(HashMap<Pubkey, u64>, HashMap<Pubkey, u64>)> {
let transaction_locks: Vec<TransactionAccountLocks> = bundle
.transactions
.iter()
.filter_map(|tx| tx.get_account_locks(64).ok()) // TODO (LB)
.filter_map(|tx| {
tx.get_account_locks(bank.get_transaction_account_lock_limit())
.ok()
})
.collect();

if transaction_locks.len() != bundle.transactions.len() {
Expand Down Expand Up @@ -265,7 +286,7 @@ mod tests {
.expect("sanitize bundle 1");

let locked_bundle0 = bundle_account_locker
.prepare_locked_bundle(&sanitized_bundle0)
.prepare_locked_bundle(&sanitized_bundle0, &bank)
.unwrap();

assert_eq!(
Expand All @@ -278,7 +299,7 @@ mod tests {
);

let locked_bundle1 = bundle_account_locker
.prepare_locked_bundle(&sanitized_bundle1)
.prepare_locked_bundle(&sanitized_bundle1, &bank)
.unwrap();
assert_eq!(
bundle_account_locker.write_locks(),
Expand Down
3 changes: 1 addition & 2 deletions core/src/bundle_sanitizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ pub fn get_sanitized_bundle(
MAX_PROCESSING_AGE,
&mut metrics,
);
if let Some(failure) = check_results.iter().find(|r| r.0.is_err()) {
warn!("bundle check failure: {:?}", failure);
if check_results.iter().any(|r| r.0.is_err()) {
return Err(BundleSanitizerError::FailedCheckResults(packet_bundle.uuid));
}

Expand Down
Loading