Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: do not attempt to vote on requests more than once #968

Merged
merged 11 commits into from
Nov 28, 2024
6 changes: 4 additions & 2 deletions signer/src/request_decider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ where
.await?
.ok_or(Error::NoChainTip)?;

let signer_public_key = self.signer_public_key();

let span = tracing::Span::current();
span.record("chain_tip", tracing::field::display(chain_tip));

let deposit_requests = db
.get_pending_deposit_requests(&chain_tip, self.context_window)
.get_pending_deposit_requests(&chain_tip, self.context_window, &signer_public_key)
.await?;

for deposit_request in deposit_requests {
Expand All @@ -137,7 +139,7 @@ where
}

let withdraw_requests = db
.get_pending_withdrawal_requests(&chain_tip, self.context_window)
.get_pending_withdrawal_requests(&chain_tip, self.context_window, &signer_public_key)
.await?;

for withdraw_request in withdraw_requests {
Expand Down
221 changes: 128 additions & 93 deletions signer/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
use bitcoin::consensus::Decodable as _;
use bitcoin::OutPoint;
use blockstack_lib::types::chainstate::StacksBlockId;
use futures::StreamExt as _;
use futures::TryStreamExt as _;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
Expand Down Expand Up @@ -187,6 +185,96 @@ impl Store {

get_utxo(aggregate_key, sbtc_txs)
}

/// Get all deposit requests that are on the blockchain identified by
/// the chain tip within the context window.
pub fn get_deposit_requests(
&self,
chain_tip: &model::BitcoinBlockHash,
context_window: u16,
) -> Vec<model::DepositRequest> {
(0..context_window)
// Find all tracked transaction IDs in the context window
.scan(chain_tip, |block_hash, _| {
let transaction_ids = self
.bitcoin_block_to_transactions
.get(*block_hash)
.cloned()
.unwrap_or_else(Vec::new);

let block = self.bitcoin_blocks.get(*block_hash)?;
*block_hash = &block.parent_hash;

Some(transaction_ids)
})
.flatten()
// Return all deposit requests associated with any of these transaction IDs
.flat_map(|txid| {
self.deposit_requests
.values()
.filter(move |req| req.txid == txid)
.cloned()
})
.collect()
}

fn get_stacks_chain_tip(
&self,
bitcoin_chain_tip: &model::BitcoinBlockHash,
) -> Option<model::StacksBlock> {
let bitcoin_chain_tip = self.bitcoin_blocks.get(bitcoin_chain_tip)?;

std::iter::successors(Some(bitcoin_chain_tip), |block| {
self.bitcoin_blocks.get(&block.parent_hash)
})
.filter_map(|block| self.bitcoin_anchor_to_stacks_blocks.get(&block.block_hash))
.flatten()
.filter_map(|stacks_block_hash| self.stacks_blocks.get(stacks_block_hash))
.max_by_key(|block| (block.block_height, &block.block_hash))
.cloned()
}

fn get_withdrawal_requests(
&self,
chain_tip: &model::BitcoinBlockHash,
context_window: u16,
) -> Vec<model::WithdrawalRequest> {
let first_block = self.bitcoin_blocks.get(chain_tip);

let context_window_end_block = std::iter::successors(first_block, |block| {
self.bitcoin_blocks.get(&block.parent_hash)
})
.nth(context_window as usize);

let Some(stacks_chain_tip) = self.get_stacks_chain_tip(chain_tip) else {
return Vec::new();
};

std::iter::successors(Some(&stacks_chain_tip), |stacks_block| {
self.stacks_blocks.get(&stacks_block.parent_hash)
})
.take_while(|stacks_block| {
!context_window_end_block.as_ref().is_some_and(|block| {
self.bitcoin_blocks
.get(&stacks_block.bitcoin_anchor)
.is_some_and(|anchor| anchor.block_height <= block.block_height)
})
})
.flat_map(|stacks_block| {
self.stacks_block_to_withdrawal_requests
.get(&stacks_block.block_hash)
.cloned()
.unwrap_or_default()
.into_iter()
.map(|pk| {
self.withdrawal_requests
.get(&pk)
.expect("missing withdraw request")
.clone()
})
})
.collect()
}
}

impl super::DbRead for SharedStore {
Expand Down Expand Up @@ -220,52 +308,32 @@ impl super::DbRead for SharedStore {
&self,
bitcoin_chain_tip: &model::BitcoinBlockHash,
) -> Result<Option<model::StacksBlock>, Error> {
let store = self.lock().await;
let Some(bitcoin_chain_tip) = store.bitcoin_blocks.get(bitcoin_chain_tip) else {
return Ok(None);
};

Ok(std::iter::successors(Some(bitcoin_chain_tip), |block| {
store.bitcoin_blocks.get(&block.parent_hash)
})
.filter_map(|block| store.bitcoin_anchor_to_stacks_blocks.get(&block.block_hash))
.flatten()
.filter_map(|stacks_block_hash| store.stacks_blocks.get(stacks_block_hash))
.max_by_key(|block| (block.block_height, &block.block_hash))
.cloned())
Ok(self.lock().await.get_stacks_chain_tip(bitcoin_chain_tip))
}

async fn get_pending_deposit_requests(
&self,
chain_tip: &model::BitcoinBlockHash,
context_window: u16,
signer_public_key: &PublicKey,
) -> Result<Vec<model::DepositRequest>, Error> {
let store = self.lock().await;

Ok((0..context_window)
// Find all tracked transaction IDs in the context window
.scan(chain_tip, |block_hash, _| {
let transaction_ids = store
.bitcoin_block_to_transactions
.get(*block_hash)
.cloned()
.unwrap_or_else(Vec::new);
let deposits_requests = store.get_deposit_requests(chain_tip, context_window);
let voted: HashSet<(model::BitcoinTxId, u32)> = store
.signer_to_deposit_request
.get(signer_public_key)
.cloned()
.unwrap_or(Vec::new())
.into_iter()
.collect();

let block = store.bitcoin_blocks.get(*block_hash)?;
*block_hash = &block.parent_hash;
let result = deposits_requests
.into_iter()
.filter(|x| !voted.contains(&(x.txid, x.output_index)))
.collect();

Some(transaction_ids)
})
.flatten()
// Return all deposit requests associated with any of these transaction IDs
.flat_map(|txid| {
store
.deposit_requests
.values()
.filter(move |req| req.txid == txid)
.cloned()
})
.collect())
Ok(result)
}

async fn get_pending_accepted_deposit_requests(
Expand All @@ -274,12 +342,10 @@ impl super::DbRead for SharedStore {
context_window: u16,
threshold: u16,
) -> Result<Vec<model::DepositRequest>, Error> {
let pending_deposit_requests = self
.get_pending_deposit_requests(chain_tip, context_window)
.await?;
let store = self.lock().await;
let deposit_requests = store.get_deposit_requests(chain_tip, context_window);

let threshold = threshold as usize;
let store = self.lock().await;

// Add one to the acceptable unlock height because the chain tip is at height one less
// than the height of the next block, which is the block for which we are assessing
Expand All @@ -299,7 +365,7 @@ impl super::DbRead for SharedStore {
.take(context_window as usize)
.collect::<HashSet<_>>();

Ok(pending_deposit_requests
Ok(deposit_requests
.into_iter()
.filter(|deposit_request| {
store
Expand Down Expand Up @@ -433,57 +499,29 @@ impl super::DbRead for SharedStore {
&self,
chain_tip: &model::BitcoinBlockHash,
context_window: u16,
signer_public_key: &PublicKey,
) -> Result<Vec<model::WithdrawalRequest>, Error> {
let Some(bitcoin_chain_tip) = self.get_bitcoin_block(chain_tip).await? else {
return Ok(Vec::new());
};
let store = self.lock().await;
let withdrawal_requests = store.get_withdrawal_requests(chain_tip, context_window);

let context_window_end_block =
futures::stream::try_unfold(bitcoin_chain_tip.block_hash, |block_hash| async move {
self.get_bitcoin_block(&block_hash)
.await
.map(|opt| opt.map(|block| (block.clone(), block.parent_hash)))
// These are the withdrawal requests that this signer has voted on.
let voted: HashSet<(u64, model::StacksBlockHash)> = store
.withdrawal_request_to_signers
.iter()
.filter_map(|(pk, decisions)| {
decisions
.iter()
.find(|decision| &decision.signer_pub_key == signer_public_key)
.map(|_| *pk)
})
.skip(context_window as usize)
.boxed()
.try_next()
.await?;

let Some(stacks_chain_tip) = self.get_stacks_chain_tip(chain_tip).await? else {
return Ok(Vec::new());
};
.collect();

let store = self.lock().await;
let result = withdrawal_requests
.into_iter()
.filter(|x| !voted.contains(&(x.request_id, x.block_hash)))
.collect();

Ok(
std::iter::successors(Some(&stacks_chain_tip), |stacks_block| {
store.stacks_blocks.get(&stacks_block.parent_hash)
})
.take_while(|stacks_block| {
!context_window_end_block.as_ref().is_some_and(|block| {
store
.bitcoin_blocks
.get(&stacks_block.bitcoin_anchor)
.is_some_and(|anchor| anchor.block_height <= block.block_height)
})
})
.flat_map(|stacks_block| {
store
.stacks_block_to_withdrawal_requests
.get(&stacks_block.block_hash)
.cloned()
.unwrap_or_default()
.into_iter()
.map(|pk| {
store
.withdrawal_requests
.get(&pk)
.expect("missing withdraw request")
.clone()
})
})
.collect(),
)
Ok(result)
}

async fn get_pending_accepted_withdrawal_requests(
Expand All @@ -492,14 +530,11 @@ impl super::DbRead for SharedStore {
context_window: u16,
threshold: u16,
) -> Result<Vec<model::WithdrawalRequest>, Error> {
let pending_withdraw_requests = self
.get_pending_withdrawal_requests(chain_tip, context_window)
.await?;
let store = self.lock().await;

let withdraw_requests = store.get_withdrawal_requests(chain_tip, context_window);
let threshold = threshold as usize;

Ok(pending_withdraw_requests
Ok(withdraw_requests
.into_iter()
.filter(|withdraw_request| {
store
Expand Down
10 changes: 10 additions & 0 deletions signer/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ pub trait DbRead {
) -> impl Future<Output = Result<Option<model::StacksBlock>, Error>> + Send;

/// Get pending deposit requests
///
/// These are deposit requests that have been added to our database but
/// where the current signer has not made a decision on whether they
/// will sign for the deposit and sweep in the funds.
fn get_pending_deposit_requests(
&self,
chain_tip: &model::BitcoinBlockHash,
context_window: u16,
signer_public_key: &PublicKey,
) -> impl Future<Output = Result<Vec<model::DepositRequest>, Error>> + Send;

/// Get pending deposit requests that have been accepted by at least
Expand Down Expand Up @@ -142,10 +147,15 @@ pub trait DbRead {
) -> impl Future<Output = Result<Vec<model::WithdrawalSigner>, Error>> + Send;

/// Get pending withdrawal requests
///
/// These are withdrawal requests that have been added to our database
/// but where the current signer has not made a decision on whether
/// they will sweep out the withdrawal funds and sweep transaction.
fn get_pending_withdrawal_requests(
&self,
chain_tip: &model::BitcoinBlockHash,
context_window: u16,
signer_public_key: &PublicKey,
) -> impl Future<Output = Result<Vec<model::WithdrawalRequest>, Error>> + Send;

/// Get pending withdrawal requests that have been accepted by at least
Expand Down
Loading