Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.18: client: Timeout resends during send_and_confirm_in_parallel (backport of #358) #384

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions client/src/send_and_confirm_transactions_in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
tokio::{sync::RwLock, task::JoinHandle, time::Instant},
};

const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
const SEND_INTERVAL: Duration = Duration::from_millis(10);
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
Expand Down Expand Up @@ -330,21 +330,20 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
);
}

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}

// wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
while !unconfirmed_transaction_map.is_empty()
&& current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
{
let block_height = current_block_height.load(Ordering::Relaxed);

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}

if let Some(tpu_client) = tpu_client {
let instant = Instant::now();
// retry sending transaction only over TPU port
Expand All @@ -353,10 +352,29 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
.iter()
.filter(|x| block_height < x.last_valid_block_height)
.map(|x| x.serialized_transaction.clone())
.collect();
let _ = tpu_client
.try_send_wire_transaction_batch(txs_to_resend_over_tpu)
.await;
.collect::<Vec<_>>();
let num_txs_to_resend = txs_to_resend_over_tpu.len();
// This is a "reasonable" constant for how long it should
// take to fan the transactions out, taken from
// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let message = if tokio::time::timeout(
SEND_TIMEOUT_INTERVAL,
tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu),
)
.await
.is_err()
{
format!("Timed out resending {num_txs_to_resend} transactions...")
} else {
format!("Resent {num_txs_to_resend} transactions...")
};

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(progress_bar, &message);
}

let elapsed = instant.elapsed();
if elapsed < TPU_RESEND_REFRESH_RATE {
Expand All @@ -374,14 +392,6 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
}
}

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}
}
}

Expand Down
Loading