Skip to content

Commit

Permalink
add exponential delays when attempting to catchup in order to prevent…
Browse files Browse the repository at this point in the history
… excess retries / flooding (#41)

* add exponential delays when attempting to catchup in order to prevent excess retries / flooding

* clippy

* fix comment
  • Loading branch information
eranrund authored Apr 21, 2020
1 parent acf85dd commit 3884aee
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 12 deletions.
63 changes: 56 additions & 7 deletions consensus/service/src/byzantine_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator};
use retry::delay::Fibonacci;
use scp::{scp_log::LoggingScpNode, slot::Phase, Msg, Node, QuorumSet, ScpNode, SlotIndex};
use std::{
cmp::min,
collections::{btree_map::Entry::Vacant, BTreeMap, BTreeSet},
iter::FromIterator,
path::PathBuf,
Expand Down Expand Up @@ -253,7 +254,13 @@ enum LedgerSyncState {
MaybeBehind(Instant),

/// We are behind the network and need to perform catchup.
IsBehind,
IsBehind {
// Time when we should attempt to sync.
attempt_sync_at: Instant,

// Number of attempts made so far,
num_sync_attempts: u64,
},
}

struct ByzantineLedgerThread<
Expand Down Expand Up @@ -411,7 +418,10 @@ impl<
);
counters::CATCHUP_INITIATED.inc();
self.is_behind.store(true, Ordering::SeqCst);
self.ledger_sync_state = LedgerSyncState::IsBehind;
self.ledger_sync_state = LedgerSyncState::IsBehind {
attempt_sync_at: Instant::now(),
num_sync_attempts: 0,
};

// Continue on the next tick.
return true;
Expand All @@ -428,10 +438,34 @@ impl<
}

// We think we're behind and sync service confirms that, attempt to sync.
(LedgerSyncState::IsBehind, true) => {
(
LedgerSyncState::IsBehind {
attempt_sync_at,
num_sync_attempts,
},
true,
) => {
// See if it's time to attempt syncing.
let now = Instant::now();
if attempt_sync_at > now {
// Not yet. Continue on to the next tick and then try again. We sleep here to
// throttle the event loop as it won't be doing anything until we reach the next
// attempt_sync_at milestone.
log::trace!(
self.logger,
"sync_service reported we're behind, but deadline {:?} not reached yet (attempt {})!",
attempt_sync_at,
num_sync_attempts,
);

thread::sleep(Duration::from_secs(1));
return true;
}

log::info!(
self.logger,
"sync_service reported we're behind, attempting catchup!"
"sync_service reported we're behind, attempting catchup (attempt {})!",
num_sync_attempts,
);

// Attempt incremental catch-up.
Expand All @@ -441,15 +475,28 @@ impl<
.attempt_ledger_sync(&self.network_state, blocks_per_attempt)
{
log::error!(self.logger, "Could not sync ledger: {:?}", err);
thread::sleep(Duration::from_secs(1));

// The next time we attempt to sync is a linear back-off based on how many
// attempts we've done so far, capped at 60 seconds.
let next_sync_at = now + Duration::from_secs(min(num_sync_attempts + 1, 60));
self.ledger_sync_state = LedgerSyncState::IsBehind {
attempt_sync_at: next_sync_at,
num_sync_attempts: num_sync_attempts + 1,
};
} else {
// We successfully synced a chunk of blocks, so reset our attempts to zero for the next chunk.
self.ledger_sync_state = LedgerSyncState::IsBehind {
attempt_sync_at,
num_sync_attempts: 0,
};
}

// Continue on the next tick.
return true;
}

// We think we're behind but sync service indicates we're back to being in sync.
(LedgerSyncState::IsBehind, false) => {
(LedgerSyncState::IsBehind { .. }, false) => {
log::info!(self.logger, "sync_service reports we are no longer behind!");

// Reset scp state.
Expand Down Expand Up @@ -490,7 +537,9 @@ impl<

// Sanity - code here should never run if we're behind.
assert!(!self.is_behind.load(Ordering::SeqCst));
assert!(self.ledger_sync_state != LedgerSyncState::IsBehind);
if let LedgerSyncState::IsBehind { .. } = &self.ledger_sync_state {
unreachable!();
}

// Nominate values for current slot.
self.nominate_pending_values();
Expand Down
3 changes: 3 additions & 0 deletions ledger/sync/src/ledger_sync_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum LedgerSyncError {

#[fail(display = "Invalid block ID.")]
InvalidBlockId,

#[fail(display = "No transaction data.")]
NoTransactionData,
}

impl<TFE: TransactionFetcherError + 'static> From<TFE> for LedgerSyncError {
Expand Down
39 changes: 34 additions & 5 deletions ledger/sync/src/ledger_sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<L: Ledger, BC: BlockchainConnection + 'static, TF: TransactionsFetcher + 's
return Err(LedgerSyncError::EmptyBlockVec);
}

let num_potentially_safe_blocks = potentially_safe_blocks.len();

// Get transactions.
let block_index_to_opt_transactions: BTreeMap<BlockIndex, Option<Vec<RedactedTx>>> =
get_transactions(
Expand Down Expand Up @@ -160,6 +162,15 @@ impl<L: Ledger, BC: BlockchainConnection + 'static, TF: TransactionsFetcher + 's
}
}

if blocks_with_transactions.is_empty() {
log::error!(
self.logger,
"Identified {} safe blocks but was unable to get transaction data",
num_potentially_safe_blocks,
);
return Err(LedgerSyncError::NoTransactionData);
}

// Process safe blocks.
log::trace!(
&self.logger,
Expand Down Expand Up @@ -486,7 +497,13 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(
type ResultsMap = BTreeMap<BlockIndex, Option<Vec<RedactedTx>>>;

enum Msg {
ProcessBlock(Block),
ProcessBlock {
// Block we are trying to fetch transactions for.
block: Block,

// How many attempts have we made so far (this is used for calculating retry delays).
num_attempts: u64,
},
Stop,
}

Expand All @@ -495,7 +512,10 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(
let (sender, receiver) = crossbeam_channel::bounded(blocks.len());
for block in blocks.iter().cloned() {
sender
.send(Msg::ProcessBlock(block))
.send(Msg::ProcessBlock {
block,
num_attempts: 0,
})
.expect("failed sending to channel");
}

Expand All @@ -522,7 +542,10 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(

for msg in thread_receiver.iter() {
match msg {
Msg::ProcessBlock(block) => {
Msg::ProcessBlock {
block,
num_attempts,
} => {
// Check for timeout.
if Instant::now() > deadline {
log::error!(
Expand Down Expand Up @@ -598,10 +621,16 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(
err
);

// Sleep, with a linearly increasing delay. This prevents endless retries
// as long as the deadline is not exceeded.
thread::sleep(Duration::from_secs(num_attempts + 1));

// Put back to queue for a retry
// TODO sleep?
thread_sender
.send(Msg::ProcessBlock(block))
.send(Msg::ProcessBlock {
block,
num_attempts: num_attempts + 1,
})
.expect("failed sending to channel");
}
}
Expand Down

0 comments on commit 3884aee

Please sign in to comment.