Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: blocktemplate dao potential inconsistent with transactions #3727

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 66 additions & 40 deletions tx-pool/src/block_assembler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,34 @@ pub(crate) struct TemplateSize {
pub(crate) total: usize,
}

impl TemplateSize {
pub(crate) fn calc_total_by_proposals(&self, new_proposals_size: usize) -> usize {
if new_proposals_size > self.proposals {
self.total
.saturating_add(new_proposals_size - self.proposals)
} else {
self.total
.saturating_sub(self.proposals - new_proposals_size)
}
}

pub(crate) fn calc_total_by_uncles(&self, new_uncles_size: usize) -> usize {
if new_uncles_size > self.uncles {
self.total.saturating_add(new_uncles_size - self.uncles)
} else {
self.total.saturating_sub(self.uncles - new_uncles_size)
}
}

pub(crate) fn calc_total_by_txs(&self, new_txs_size: usize) -> usize {
if new_txs_size > self.txs {
self.total.saturating_add(new_txs_size - self.txs)
} else {
self.total.saturating_sub(self.txs - new_txs_size)
}
}
}

#[derive(Clone)]
pub(crate) struct CurrentTemplate {
pub(crate) template: BlockTemplate,
Expand Down Expand Up @@ -94,8 +122,9 @@ impl BlockAssembler {
let basic_block_size =
Self::basic_block_size(cellbase.data(), &[], iter::empty(), extension.clone());

let dao = Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])
.expect("calc_dao for BlockAssembler initial");
let (dao, _checked_txs) =
Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])
.expect("calc_dao for BlockAssembler initial");

let work_id = AtomicU64::new(0);

Expand Down Expand Up @@ -142,7 +171,7 @@ impl BlockAssembler {
let current_template = &current.template;
let uncles = &current_template.uncles;

let (proposals, txs, txs_size, basic_size) = {
let (proposals, txs, basic_size) = {
let tx_pool_reader = tx_pool.read().await;
if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
return Ok(());
Expand All @@ -162,25 +191,25 @@ impl BlockAssembler {
.ok_or(BlockAssemblerError::Overflow)?;

let max_block_cycles = consensus.max_block_cycles();
let (txs, txs_size, _cycles) =
let (txs, _txs_size, _cycles) =
tx_pool_reader.package_txs(max_block_cycles, txs_size_limit);
(proposals, txs, txs_size, basic_size)
(proposals, txs, basic_size)
};

let proposals_size = proposals.len() * ProposalShortId::serialized_size();
let total_size = basic_size + txs_size;

let dao = Self::calc_dao(
let (dao, checked_txs) = Self::calc_dao(
&current.snapshot,
&current.epoch,
current_template.cellbase.clone(),
txs.clone(),
txs,
)?;
let txs_size = checked_txs.iter().map(|tx| tx.size).sum();
let total_size = basic_size + txs_size;

let mut builder = BlockTemplateBuilder::from_template(&current.template);
builder
.set_proposals(Vec::from_iter(proposals))
.set_transactions(txs)
.set_transactions(checked_txs)
.work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
.current_time(cmp::max(
unix_time_as_millis(),
Expand Down Expand Up @@ -221,7 +250,8 @@ impl BlockAssembler {
let basic_block_size =
Self::basic_block_size(cellbase.data(), &uncles, iter::empty(), extension.clone());

let dao = Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])?;
let (dao, _checked_txs) =
Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])?;

builder
.transactions(vec![])
Expand Down Expand Up @@ -275,9 +305,8 @@ impl BlockAssembler {
let uncles = self.prepare_uncles(&current.snapshot, &current.epoch).await;

let new_uncle_size = uncles.len() * UncleBlockView::serialized_size_in_block();
let incr = new_uncle_size.saturating_sub(current.size.uncles);
let new_total_size = current.size.calc_total_by_uncles(new_uncle_size);

let new_total_size = current.size.total + incr;
if new_total_size < max_block_bytes {
let mut builder = BlockTemplateBuilder::from_template(&current.template);
builder
Expand Down Expand Up @@ -317,10 +346,9 @@ impl BlockAssembler {
};

let new_proposals_size = proposals.len() * ProposalShortId::serialized_size();
let incr = new_proposals_size.saturating_sub(current.size.proposals);
let new_total_size = current.size.calc_total_by_proposals(new_proposals_size);
let max_block_bytes = consensus.max_block_bytes() as usize;
let new_total_size = current.size.total + incr;
if new_total_size + incr < max_block_bytes {
if new_total_size < max_block_bytes {
let mut builder = BlockTemplateBuilder::from_template(&current.template);
builder
.set_proposals(Vec::from_iter(proposals))
Expand Down Expand Up @@ -353,7 +381,7 @@ impl BlockAssembler {
let current_template = &current.template;
let max_block_bytes = consensus.max_block_bytes() as usize;
let extension = Self::build_extension(&current.snapshot)?;
let (txs, new_txs_size) = {
let txs = {
let tx_pool_reader = tx_pool.read().await;
if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
return Ok(());
Expand All @@ -373,23 +401,22 @@ impl BlockAssembler {
}

let max_block_cycles = consensus.max_block_cycles();
let (txs, txs_size, _cycles) = tx_pool_reader
let (txs, _txs_size, _cycles) = tx_pool_reader
.package_txs(max_block_cycles, txs_size_limit.expect("overflow checked"));
(txs, txs_size)
txs
};

let incr = new_txs_size.saturating_sub(current.size.txs);
let new_total_size = current.size.total + incr;

if let Ok(dao) = Self::calc_dao(
if let Ok((dao, checked_txs)) = Self::calc_dao(
&current.snapshot,
&current.epoch,
current_template.cellbase.clone(),
txs.clone(),
txs,
) {
let new_txs_size = checked_txs.iter().map(|tx| tx.size).sum();
let new_total_size = current.size.calc_total_by_txs(new_txs_size);
let mut builder = BlockTemplateBuilder::from_template(&current.template);
builder
.set_transactions(txs)
.set_transactions(checked_txs)
.work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
.current_time(cmp::max(
unix_time_as_millis(),
Expand Down Expand Up @@ -549,20 +576,16 @@ impl BlockAssembler {
current_epoch: &EpochExt,
cellbase: TransactionView,
entries: Vec<TxEntry>,
) -> Result<Byte32, AnyError> {
) -> Result<(Byte32, Vec<TxEntry>), AnyError> {
let tip_header = snapshot.tip_header();
let consensus = snapshot.consensus();

let mut seen_inputs = HashSet::new();
let mut transactions_checker = TransactionsChecker::new(iter::once(&cellbase));

let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0);
let entries_iter = iter::once(dummy_cellbase_entry).chain(entries.into_iter());

let rtxs: Vec<_> = block_in_place(|| {
entries_iter
.enumerate()
.filter_map(|(index, entry)| {
let checked_entries: Vec<_> = block_in_place(|| {
entries
.into_iter()
.filter_map(|entry| {
let overlay_cell_checker =
OverlayCellChecker::new(&transactions_checker, snapshot);
if let Err(err) =
Expand All @@ -580,20 +603,23 @@ impl BlockAssembler {
);
None
} else {
if index != 0 {
transactions_checker.insert(entry.transaction());
}
Some(entry.rtx)
transactions_checker.insert(entry.transaction());
Some(entry)
}
})
.collect()
});

let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0);
let entries_iter = iter::once(&dummy_cellbase_entry)
.chain(checked_entries.iter())
.map(|entry| &entry.rtx);

// Generate DAO fields here
let dao = DaoCalculator::new(consensus, &snapshot.as_data_provider())
.dao_field_with_current_epoch(&rtxs, tip_header, current_epoch)?;
.dao_field_with_current_epoch(entries_iter, tip_header, current_epoch)?;

Ok(dao)
Ok((dao, checked_entries))
}

pub(crate) async fn notify(&self) {
Expand Down
45 changes: 25 additions & 20 deletions util/dao/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
/// Calculates the new dao field with specified [`EpochExt`].
pub fn dao_field_with_current_epoch(
&self,
rtxs: &[ResolvedTransaction],
rtxs: impl Iterator<Item = &'a ResolvedTransaction> + Clone,
parent: &HeaderView,
current_block_epoch: &EpochExt,
) -> Result<Byte32, DaoError> {
// Freed occupied capacities from consumed inputs
let freed_occupied_capacities =
rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
rtxs.clone().try_fold(Capacity::zero(), |capacities, rtx| {
self.input_occupied_capacities(rtx)
.and_then(|c| capacities.safe_add(c))
})?;
let added_occupied_capacities = self.added_occupied_capacities(rtxs)?;
let added_occupied_capacities = self.added_occupied_capacities(rtxs.clone())?;
let withdrawed_interests = self.withdrawed_interests(rtxs)?;

let (parent_ar, parent_c, parent_s, parent_u) = extract_dao_data(parent.dao());
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
.next_epoch_ext(parent, self.data_loader)
.ok_or(DaoError::InvalidHeader)?
.epoch();
self.dao_field_with_current_epoch(rtxs, parent, &current_block_epoch)
self.dao_field_with_current_epoch(rtxs.iter(), parent, &current_block_epoch)
}

/// Returns the total transactions fee of `rtx`.
Expand All @@ -155,20 +155,22 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
.map_err(Into::into)
}

fn added_occupied_capacities(&self, rtxs: &[ResolvedTransaction]) -> CapacityResult<Capacity> {
fn added_occupied_capacities(
&self,
mut rtxs: impl Iterator<Item = &'a ResolvedTransaction>,
) -> CapacityResult<Capacity> {
// Newly added occupied capacities from outputs
let added_occupied_capacities =
rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
rtx.transaction
.outputs_with_data_iter()
.enumerate()
.try_fold(Capacity::zero(), |tx_capacities, (_, (output, data))| {
Capacity::bytes(data.len())
.and_then(|c| output.occupied_capacity(c))
.and_then(|c| tx_capacities.safe_add(c))
})
.and_then(|c| capacities.safe_add(c))
})?;
let added_occupied_capacities = rtxs.try_fold(Capacity::zero(), |capacities, rtx| {
rtx.transaction
.outputs_with_data_iter()
.enumerate()
.try_fold(Capacity::zero(), |tx_capacities, (_, (output, data))| {
Capacity::bytes(data.len())
.and_then(|c| output.occupied_capacity(c))
.and_then(|c| tx_capacities.safe_add(c))
})
.and_then(|c| capacities.safe_add(c))
})?;

Ok(added_occupied_capacities)
}
Expand All @@ -183,12 +185,15 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
.map_err(Into::into)
}

fn withdrawed_interests(&self, rtxs: &[ResolvedTransaction]) -> Result<Capacity, DaoError> {
let maximum_withdraws = rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
fn withdrawed_interests(
&self,
mut rtxs: impl Iterator<Item = &'a ResolvedTransaction> + Clone,
) -> Result<Capacity, DaoError> {
let maximum_withdraws = rtxs.clone().try_fold(Capacity::zero(), |capacities, rtx| {
self.transaction_maximum_withdraw(rtx)
.and_then(|c| capacities.safe_add(c).map_err(Into::into))
})?;
let input_capacities = rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
let input_capacities = rtxs.try_fold(Capacity::zero(), |capacities, rtx| {
let tx_input_capacities = rtx.resolved_inputs.iter().try_fold(
Capacity::zero(),
|tx_capacities, cell_meta| {
Expand Down