Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exponential delays when attempting to catchup in order to prevent excess retries / flooding #41

Merged
merged 3 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 56 additions & 7 deletions consensus/service/src/byzantine_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator};
use retry::delay::Fibonacci;
use scp::{scp_log::LoggingScpNode, slot::Phase, Msg, Node, QuorumSet, ScpNode, SlotIndex};
use std::{
cmp::min,
collections::{btree_map::Entry::Vacant, BTreeMap, BTreeSet},
iter::FromIterator,
path::PathBuf,
Expand Down Expand Up @@ -253,7 +254,13 @@ enum LedgerSyncState {
MaybeBehind(Instant),

/// We are behind the network and need to perform catchup.
IsBehind,
IsBehind {
// Time when we should attempt to sync.
attempt_sync_at: Instant,

// Number of attempts made so far,
num_sync_attempts: u64,
},
}

struct ByzantineLedgerThread<
Expand Down Expand Up @@ -411,7 +418,10 @@ impl<
);
counters::CATCHUP_INITIATED.inc();
self.is_behind.store(true, Ordering::SeqCst);
self.ledger_sync_state = LedgerSyncState::IsBehind;
self.ledger_sync_state = LedgerSyncState::IsBehind {
attempt_sync_at: Instant::now(),
num_sync_attempts: 0,
};

// Continue on the next tick.
return true;
Expand All @@ -428,10 +438,34 @@ impl<
}

// We think we're behind and sync service confirms that, attempt to sync.
(LedgerSyncState::IsBehind, true) => {
(
LedgerSyncState::IsBehind {
attempt_sync_at,
num_sync_attempts,
},
true,
) => {
// See if it's time to attempt syncing.
let now = Instant::now();
if attempt_sync_at > now {
// Not yet. Continue on to the next tick and then try again. We sleep here to
// throttle the event loop as it won't be doing anything until we reach the next
// attempt_sync_at milestone.
log::trace!(
self.logger,
"sync_service reported we're behind, but deadline {:?} not reached yet (attempt {})!",
attempt_sync_at,
num_sync_attempts,
);

thread::sleep(Duration::from_secs(1));
return true;
}

log::info!(
self.logger,
"sync_service reported we're behind, attempting catchup!"
"sync_service reported we're behind, attempting catchup (attempt {})!",
num_sync_attempts,
);

// Attempt incremental catch-up.
Expand All @@ -441,15 +475,28 @@ impl<
.attempt_ledger_sync(&self.network_state, blocks_per_attempt)
{
log::error!(self.logger, "Could not sync ledger: {:?}", err);
thread::sleep(Duration::from_secs(1));

// The next time we attempt to sync is a linear back-off based on how many
// attempts we've done so far, capped at 60 seconds.
let next_sync_at = now + Duration::from_secs(min(num_sync_attempts + 1, 60));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like linear (as opposed to exponential) backoff, but that works, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's correct. I changed the comment.

self.ledger_sync_state = LedgerSyncState::IsBehind {
attempt_sync_at: next_sync_at,
num_sync_attempts: num_sync_attempts + 1,
};
} else {
// We successfully synced a chunk of blocks, so reset our attempts to zero for the next chunk.
self.ledger_sync_state = LedgerSyncState::IsBehind {
attempt_sync_at,
num_sync_attempts: 0,
};
}

// Continue on the next tick.
return true;
}

// We think we're behind but sync service indicates we're back to being in sync.
(LedgerSyncState::IsBehind, false) => {
(LedgerSyncState::IsBehind { .. }, false) => {
log::info!(self.logger, "sync_service reports we are no longer behind!");

// Reset scp state.
Expand Down Expand Up @@ -490,7 +537,9 @@ impl<

// Sanity - code here should never run if we're behind.
assert!(!self.is_behind.load(Ordering::SeqCst));
assert!(self.ledger_sync_state != LedgerSyncState::IsBehind);
if let LedgerSyncState::IsBehind { .. } = &self.ledger_sync_state {
unreachable!();
}

// Nominate values for current slot.
self.nominate_pending_values();
Expand Down
3 changes: 3 additions & 0 deletions ledger/sync/src/ledger_sync_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum LedgerSyncError {

#[fail(display = "Invalid block ID.")]
InvalidBlockId,

#[fail(display = "No transaction data.")]
NoTransactionData,
}

impl<TFE: TransactionFetcherError + 'static> From<TFE> for LedgerSyncError {
Expand Down
39 changes: 34 additions & 5 deletions ledger/sync/src/ledger_sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<L: Ledger, BC: BlockchainConnection + 'static, TF: TransactionsFetcher + 's
return Err(LedgerSyncError::EmptyBlockVec);
}

let num_potentially_safe_blocks = potentially_safe_blocks.len();

// Get transactions.
let block_index_to_opt_transactions: BTreeMap<BlockIndex, Option<Vec<RedactedTx>>> =
get_transactions(
Expand Down Expand Up @@ -160,6 +162,15 @@ impl<L: Ledger, BC: BlockchainConnection + 'static, TF: TransactionsFetcher + 's
}
}

if blocks_with_transactions.is_empty() {
log::error!(
self.logger,
"Identified {} safe blocks but was unable to get transaction data",
num_potentially_safe_blocks,
);
return Err(LedgerSyncError::NoTransactionData);
}

// Process safe blocks.
log::trace!(
&self.logger,
Expand Down Expand Up @@ -486,7 +497,13 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(
type ResultsMap = BTreeMap<BlockIndex, Option<Vec<RedactedTx>>>;

enum Msg {
ProcessBlock(Block),
ProcessBlock {
// Block we are trying to fetch transactions for.
block: Block,

// How many attempts have we made so far (this is used for calculating retry delays).
num_attempts: u64,
},
Stop,
}

Expand All @@ -495,7 +512,10 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(
let (sender, receiver) = crossbeam_channel::bounded(blocks.len());
for block in blocks.iter().cloned() {
sender
.send(Msg::ProcessBlock(block))
.send(Msg::ProcessBlock {
block,
num_attempts: 0,
})
.expect("failed sending to channel");
}

Expand All @@ -522,7 +542,10 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(

for msg in thread_receiver.iter() {
match msg {
Msg::ProcessBlock(block) => {
Msg::ProcessBlock {
block,
num_attempts,
} => {
// Check for timeout.
if Instant::now() > deadline {
log::error!(
Expand Down Expand Up @@ -598,10 +621,16 @@ fn get_transactions<TF: TransactionsFetcher + 'static>(
err
);

// Sleep, with a linearly increasing delay. This prevents endless retries
// as long as the deadline is not exceeded.
thread::sleep(Duration::from_secs(num_attempts + 1));

// Put back to queue for a retry
// TODO sleep?
thread_sender
.send(Msg::ProcessBlock(block))
.send(Msg::ProcessBlock {
block,
num_attempts: num_attempts + 1,
})
.expect("failed sending to channel");
}
}
Expand Down