diff --git a/tx-pool/src/block_assembler/mod.rs b/tx-pool/src/block_assembler/mod.rs index 5fb74b0dd7..9b134830e7 100644 --- a/tx-pool/src/block_assembler/mod.rs +++ b/tx-pool/src/block_assembler/mod.rs @@ -94,8 +94,9 @@ impl BlockAssembler { let basic_block_size = Self::basic_block_size(cellbase.data(), &[], iter::empty(), extension.clone()); - let dao = Self::calc_dao(&snapshot, ¤t_epoch, cellbase.clone(), vec![]) - .expect("calc_dao for BlockAssembler initial"); + let (dao, _checked, _txs_size) = + Self::calc_dao(&snapshot, ¤t_epoch, cellbase.clone(), vec![]) + .expect("calc_dao for BlockAssembler initial"); let work_id = AtomicU64::new(0); @@ -142,7 +143,7 @@ impl BlockAssembler { let current_template = ¤t.template; let uncles = ¤t_template.uncles; - let (proposals, txs, txs_size, basic_size) = { + let (proposals, txs, basic_size) = { let tx_pool_reader = tx_pool.read().await; if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() { return Ok(()); @@ -162,25 +163,25 @@ impl BlockAssembler { .ok_or(BlockAssemblerError::Overflow)?; let max_block_cycles = consensus.max_block_cycles(); - let (txs, txs_size, _cycles) = + let (txs, _txs_size, _cycles) = tx_pool_reader.package_txs(max_block_cycles, txs_size_limit); - (proposals, txs, txs_size, basic_size) + (proposals, txs, basic_size) }; let proposals_size = proposals.len() * ProposalShortId::serialized_size(); - let total_size = basic_size + txs_size; - - let dao = Self::calc_dao( + let (dao, checked, txs_size) = Self::calc_dao( ¤t.snapshot, ¤t.epoch, current_template.cellbase.clone(), - txs.clone(), + txs, )?; + let total_size = basic_size + txs_size; + let mut builder = BlockTemplateBuilder::from_template(¤t.template); builder .set_proposals(Vec::from_iter(proposals)) - .set_transactions(txs) + .set_transactions(checked) .work_id(self.work_id.fetch_add(1, Ordering::SeqCst)) .current_time(cmp::max( unix_time_as_millis(), @@ -221,7 +222,8 @@ impl BlockAssembler { let basic_block_size = Self::basic_block_size(cellbase.data(), &uncles, iter::empty(), extension.clone()); - let dao = Self::calc_dao(&snapshot, ¤t_epoch, cellbase.clone(), vec![])?; + let (dao, _checked, _txs_size) = + Self::calc_dao(&snapshot, ¤t_epoch, cellbase.clone(), vec![])?; builder .transactions(vec![]) @@ -353,7 +355,7 @@ impl BlockAssembler { let current_template = ¤t.template; let max_block_bytes = consensus.max_block_bytes() as usize; let extension = Self::build_extension(¤t.snapshot)?; - let (txs, new_txs_size) = { + let txs = { let tx_pool_reader = tx_pool.read().await; if current.snapshot.tip_hash() != tx_pool_reader.snapshot().tip_hash() { return Ok(()); @@ -373,23 +375,32 @@ impl BlockAssembler { } let max_block_cycles = consensus.max_block_cycles(); - let (txs, txs_size, _cycles) = tx_pool_reader + let (txs, _txs_size, _cycles) = tx_pool_reader .package_txs(max_block_cycles, txs_size_limit.expect("overflow checked")); - (txs, txs_size) + txs }; - let incr = new_txs_size.saturating_sub(current.size.txs); - let new_total_size = current.size.total + incr; - - if let Ok(dao) = Self::calc_dao( + if let Ok((dao, checked, new_txs_size)) = Self::calc_dao( ¤t.snapshot, ¤t.epoch, current_template.cellbase.clone(), - txs.clone(), + txs, ) { + let new_total_size = if new_txs_size > current.size.txs { + current + .size + .total + .saturating_add(new_txs_size - current.size.txs) + } else { + current + .size + .total + .saturating_sub(current.size.txs - new_txs_size) + }; + let mut builder = BlockTemplateBuilder::from_template(¤t.template); builder - .set_transactions(txs) + .set_transactions(checked) .work_id(self.work_id.fetch_add(1, Ordering::SeqCst)) .current_time(cmp::max( unix_time_as_millis(), @@ -549,20 +560,17 @@ impl BlockAssembler { current_epoch: &EpochExt, cellbase: TransactionView, entries: Vec, - ) -> Result { + ) -> Result<(Byte32, Vec, usize), AnyError> { let tip_header = snapshot.tip_header(); let consensus = snapshot.consensus(); - + let mut size: usize = 0; let mut seen_inputs = HashSet::new(); let mut transactions_checker = TransactionsChecker::new(iter::once(&cellbase)); - let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0); - let entries_iter = iter::once(dummy_cellbase_entry).chain(entries.into_iter()); - - let rtxs: Vec<_> = block_in_place(|| { - entries_iter - .enumerate() - .filter_map(|(index, entry)| { + let checked: Vec<_> = block_in_place(|| { + entries + .into_iter() + .filter_map(|entry| { let overlay_cell_checker = OverlayCellChecker::new(&transactions_checker, snapshot); if let Err(err) = @@ -580,20 +588,24 @@ impl BlockAssembler { ); None } else { - if index != 0 { - transactions_checker.insert(entry.transaction()); - } - Some(entry.rtx) + transactions_checker.insert(entry.transaction()); + size = size.saturating_add(entry.size); + Some(entry) } }) .collect() }); + let dummy_cellbase_entry = TxEntry::dummy_resolve(cellbase, 0, Capacity::zero(), 0); + let entries_iter = iter::once(&dummy_cellbase_entry) + .chain(checked.iter()) + .map(|entry| &entry.rtx); + // Generate DAO fields here let dao = DaoCalculator::new(consensus, &snapshot.as_data_provider()) - .dao_field_with_current_epoch(&rtxs, tip_header, current_epoch)?; + .dao_field_with_current_epoch(entries_iter, tip_header, current_epoch)?; - Ok(dao) + Ok((dao, checked, size)) } pub(crate) async fn notify(&self) { diff --git a/util/dao/src/lib.rs b/util/dao/src/lib.rs index 6dec914087..463cdb25ed 100644 --- a/util/dao/src/lib.rs +++ b/util/dao/src/lib.rs @@ -74,17 +74,17 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a /// Calculates the new dao field with specified [`EpochExt`]. pub fn dao_field_with_current_epoch( &self, - rtxs: &[ResolvedTransaction], + rtxs: impl Iterator + Clone, parent: &HeaderView, current_block_epoch: &EpochExt, ) -> Result { // Freed occupied capacities from consumed inputs let freed_occupied_capacities = - rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| { + rtxs.clone().try_fold(Capacity::zero(), |capacities, rtx| { self.input_occupied_capacities(rtx) .and_then(|c| capacities.safe_add(c)) })?; - let added_occupied_capacities = self.added_occupied_capacities(rtxs)?; + let added_occupied_capacities = self.added_occupied_capacities(rtxs.clone())?; let withdrawed_interests = self.withdrawed_interests(rtxs)?; let (parent_ar, parent_c, parent_s, parent_u) = extract_dao_data(parent.dao()); @@ -143,7 +143,7 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a .next_epoch_ext(parent, self.data_loader) .ok_or(DaoError::InvalidHeader)? .epoch(); - self.dao_field_with_current_epoch(rtxs, parent, ¤t_block_epoch) + self.dao_field_with_current_epoch(rtxs.iter(), parent, ¤t_block_epoch) } /// Returns the total transactions fee of `rtx`. @@ -155,20 +155,22 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a .map_err(Into::into) } - fn added_occupied_capacities(&self, rtxs: &[ResolvedTransaction]) -> CapacityResult { + fn added_occupied_capacities( + &self, + mut rtxs: impl Iterator, + ) -> CapacityResult { // Newly added occupied capacities from outputs - let added_occupied_capacities = - rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| { - rtx.transaction - .outputs_with_data_iter() - .enumerate() - .try_fold(Capacity::zero(), |tx_capacities, (_, (output, data))| { - Capacity::bytes(data.len()) - .and_then(|c| output.occupied_capacity(c)) - .and_then(|c| tx_capacities.safe_add(c)) - }) - .and_then(|c| capacities.safe_add(c)) - })?; + let added_occupied_capacities = rtxs.try_fold(Capacity::zero(), |capacities, rtx| { + rtx.transaction + .outputs_with_data_iter() + .enumerate() + .try_fold(Capacity::zero(), |tx_capacities, (_, (output, data))| { + Capacity::bytes(data.len()) + .and_then(|c| output.occupied_capacity(c)) + .and_then(|c| tx_capacities.safe_add(c)) + }) + .and_then(|c| capacities.safe_add(c)) + })?; Ok(added_occupied_capacities) } @@ -183,12 +185,15 @@ impl<'a, DL: CellDataProvider + EpochProvider + HeaderProvider> DaoCalculator<'a .map_err(Into::into) } - fn withdrawed_interests(&self, rtxs: &[ResolvedTransaction]) -> Result { - let maximum_withdraws = rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| { + fn withdrawed_interests( + &self, + mut rtxs: impl Iterator + Clone, + ) -> Result { + let maximum_withdraws = rtxs.clone().try_fold(Capacity::zero(), |capacities, rtx| { self.transaction_maximum_withdraw(rtx) .and_then(|c| capacities.safe_add(c).map_err(Into::into)) })?; - let input_capacities = rtxs.iter().try_fold(Capacity::zero(), |capacities, rtx| { + let input_capacities = rtxs.try_fold(Capacity::zero(), |capacities, rtx| { let tx_input_capacities = rtx.resolved_inputs.iter().try_fold( Capacity::zero(), |tx_capacities, cell_meta| {