Skip to content

Commit

Permalink
fix: blocktemplate dao potential inconsistent with transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Nov 25, 2022
1 parent f206799 commit 86ec06c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 55 deletions.
82 changes: 47 additions & 35 deletions tx-pool/src/block_assembler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ impl BlockAssembler {
let basic_block_size =
Self::basic_block_size(cellbase.data(), &[], iter::empty(), extension.clone());

let dao = Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])
.expect("calc_dao for BlockAssembler initial");
let (dao, _checked, _txs_size) =
Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])
.expect("calc_dao for BlockAssembler initial");

let work_id = AtomicU64::new(0);

Expand Down Expand Up @@ -142,7 +143,7 @@ impl BlockAssembler {
let current_template = &current.template;
let uncles = &current_template.uncles;

let (proposals, txs, txs_size, basic_size) = {
let (proposals, txs, basic_size) = {
let tx_pool_reader = tx_pool.read().await;
if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
return Ok(());
Expand All @@ -162,25 +163,25 @@ impl BlockAssembler {
.ok_or(BlockAssemblerError::Overflow)?;

let max_block_cycles = consensus.max_block_cycles();
let (txs, txs_size, _cycles) =
let (txs, _txs_size, _cycles) =
tx_pool_reader.package_txs(max_block_cycles, txs_size_limit);
(proposals, txs, txs_size, basic_size)
(proposals, txs, basic_size)
};

let proposals_size = proposals.len() * ProposalShortId::serialized_size();
let total_size = basic_size + txs_size;

let dao = Self::calc_dao(
let (dao, checked, txs_size) = Self::calc_dao(
&current.snapshot,
&current.epoch,
current_template.cellbase.clone(),
txs.clone(),
txs,
)?;

let total_size = basic_size + txs_size;

let mut builder = BlockTemplateBuilder::from_template(&current.template);
builder
.set_proposals(Vec::from_iter(proposals))
.set_transactions(txs)
.set_transactions(checked)
.work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
.current_time(cmp::max(
unix_time_as_millis(),
Expand Down Expand Up @@ -221,7 +222,8 @@ impl BlockAssembler {
let basic_block_size =
Self::basic_block_size(cellbase.data(), &uncles, iter::empty(), extension.clone());

let dao = Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])?;
let (dao, _checked, _txs_size) =
Self::calc_dao(&snapshot, &current_epoch, cellbase.clone(), vec![])?;

builder
.transactions(vec![])
Expand Down Expand Up @@ -353,7 +355,7 @@ impl BlockAssembler {
let current_template = &current.template;
let max_block_bytes = consensus.max_block_bytes() as usize;
let extension = Self::build_extension(&current.snapshot)?;
let (txs, new_txs_size) = {
let txs = {
let tx_pool_reader = tx_pool.read().await;
if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() {
return Ok(());
Expand All @@ -373,23 +375,32 @@ impl BlockAssembler {
}

let max_block_cycles = consensus.max_block_cycles();
let (txs, txs_size, _cycles) = tx_pool_reader
let (txs, _txs_size, _cycles) = tx_pool_reader
.package_txs(max_block_cycles, txs_size_limit.expect("overflow checked"));
(txs, txs_size)
txs
};

let incr = new_txs_size.saturating_sub(current.size.txs);
let new_total_size = current.size.total + incr;

if let Ok(dao) = Self::calc_dao(
if let Ok((dao, checked, new_txs_size)) = Self::calc_dao(
&current.snapshot,
&current.epoch,
current_template.cellbase.clone(),
txs.clone(),
txs,
) {
let new_total_size = if new_txs_size > current.size.txs {
current
.size
.total
.saturating_add(new_txs_size - current.size.txs)
} else {
current
.size
.total
.saturating_sub(current.size.txs - new_txs_size)
};

let mut builder = BlockTemplateBuilder::from_template(&current.template);
builder
.set_transactions(txs)
.set_transactions(checked)
.work_id(self.work_id.fetch_add(1, Ordering::SeqCst))
.current_time(cmp::max(
unix_time_as_millis(),
Expand Down Expand Up @@ -549,20 +560,17 @@ impl BlockAssembler {
current_epoch: &EpochExt,
cellbase: TransactionView,
entries: Vec<TxEntry>,
) -> Result<Byte32, AnyError> {
) -> Result<(Byte32, Vec<TxEntry>, usize), AnyError> {
let tip_header = snapshot.tip_header();
let consensus = snapshot.consensus();

let mut size: usize = 0;
let mut seen_inputs = HashSet::new();
let mut transactions_checker = TransactionsChecker::new(iter::once(&cellbase));

let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0);
let entries_iter = iter::once(dummy_cellbase_entry).chain(entries.into_iter());

let rtxs: Vec<_> = block_in_place(|| {
entries_iter
.enumerate()
.filter_map(|(index, entry)| {
let checked: Vec<_> = block_in_place(|| {
entries
.into_iter()
.filter_map(|entry| {
let overlay_cell_checker =
OverlayCellChecker::new(&transactions_checker, snapshot);
if let Err(err) =
Expand All @@ -580,20 +588,24 @@ impl BlockAssembler {
);
None
} else {
if index != 0 {
transactions_checker.insert(entry.transaction());
}
Some(entry.rtx)
transactions_checker.insert(entry.transaction());
size = size.saturating_add(entry.size);
Some(entry)
}
})
.collect()
});

let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0);
let entries_iter = iter::once(&dummy_cellbase_entry)
.chain(checked.iter())
.map(|entry| &entry.rtx);

// Generate DAO fields here
let dao = DaoCalculator::new(consensus, &snapshot.as_data_provider())
.dao_field_with_current_epoch(&rtxs, tip_header, current_epoch)?;
.dao_field_with_current_epoch(entries_iter, tip_header, current_epoch)?;

Ok(dao)
Ok((dao, checked, size))
}

pub(crate) async fn notify(&self) {
Expand Down
45 changes: 25 additions & 20 deletions util/dao/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
/// Calculates the new dao field with specified [`EpochExt`].
pub fn dao_field_with_current_epoch(
&self,
rtxs: &[ResolvedTransaction],
rtxs: impl Iterator<Item = &'a ResolvedTransaction> + Clone,
parent: &HeaderView,
current_block_epoch: &EpochExt,
) -> Result<Byte32, DaoError> {
// Freed occupied capacities from consumed inputs
let freed_occupied_capacities =
rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
rtxs.clone().try_fold(Capacity::zero(), |capacities, rtx| {
self.input_occupied_capacities(rtx)
.and_then(|c| capacities.safe_add(c))
})?;
let added_occupied_capacities = self.added_occupied_capacities(rtxs)?;
let added_occupied_capacities = self.added_occupied_capacities(rtxs.clone())?;
let withdrawed_interests = self.withdrawed_interests(rtxs)?;

let (parent_ar, parent_c, parent_s, parent_u) = extract_dao_data(parent.dao());
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
.next_epoch_ext(parent, self.data_loader)
.ok_or(DaoError::InvalidHeader)?
.epoch();
self.dao_field_with_current_epoch(rtxs, parent, &current_block_epoch)
self.dao_field_with_current_epoch(rtxs.iter(), parent, &current_block_epoch)
}

/// Returns the total transactions fee of `rtx`.
Expand All @@ -155,20 +155,22 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
.map_err(Into::into)
}

fn added_occupied_capacities(&self, rtxs: &[ResolvedTransaction]) -> CapacityResult<Capacity> {
fn added_occupied_capacities(
&self,
mut rtxs: impl Iterator<Item = &'a ResolvedTransaction>,
) -> CapacityResult<Capacity> {
// Newly added occupied capacities from outputs
let added_occupied_capacities =
rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
rtx.transaction
.outputs_with_data_iter()
.enumerate()
.try_fold(Capacity::zero(), |tx_capacities, (_, (output, data))| {
Capacity::bytes(data.len())
.and_then(|c| output.occupied_capacity(c))
.and_then(|c| tx_capacities.safe_add(c))
})
.and_then(|c| capacities.safe_add(c))
})?;
let added_occupied_capacities = rtxs.try_fold(Capacity::zero(), |capacities, rtx| {
rtx.transaction
.outputs_with_data_iter()
.enumerate()
.try_fold(Capacity::zero(), |tx_capacities, (_, (output, data))| {
Capacity::bytes(data.len())
.and_then(|c| output.occupied_capacity(c))
.and_then(|c| tx_capacities.safe_add(c))
})
.and_then(|c| capacities.safe_add(c))
})?;

Ok(added_occupied_capacities)
}
Expand All @@ -183,12 +185,15 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a
.map_err(Into::into)
}

fn withdrawed_interests(&self, rtxs: &[ResolvedTransaction]) -> Result<Capacity, DaoError> {
let maximum_withdraws = rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
fn withdrawed_interests(
&self,
mut rtxs: impl Iterator<Item = &'a ResolvedTransaction> + Clone,
) -> Result<Capacity, DaoError> {
let maximum_withdraws = rtxs.clone().try_fold(Capacity::zero(), |capacities, rtx| {
self.transaction_maximum_withdraw(rtx)
.and_then(|c| capacities.safe_add(c).map_err(Into::into))
})?;
let input_capacities = rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| {
let input_capacities = rtxs.try_fold(Capacity::zero(), |capacities, rtx| {
let tx_input_capacities = rtx.resolved_inputs.iter().try_fold(
Capacity::zero(),
|tx_capacities, cell_meta| {
Expand Down

0 comments on commit 86ec06c

Please sign in to comment.