From 2d4da258fe93e894a2fec625659c7d19310a5dff Mon Sep 17 00:00:00 2001 From: ar Date: Tue, 20 Aug 2024 17:20:16 -0400 Subject: [PATCH 1/8] Adds a mempool request to wait for a transaction verification result and uses it in `sendrawtransaction` RPC method --- zebra-node-services/src/mempool.rs | 5 ++ zebra-rpc/src/methods.rs | 5 +- zebra-rpc/src/methods/tests/prop.rs | 10 ++-- zebrad/src/components/mempool.rs | 60 ++++++++++++++++++++-- zebrad/src/components/mempool/downloads.rs | 29 ++++++++--- 5 files changed, 88 insertions(+), 21 deletions(-) diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index 98c1969bbad..7f273b5953a 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -58,6 +58,11 @@ pub enum Request { /// The transaction downloader checks for duplicates across IDs and transactions. Queue(Vec), + /// Queue a list of unmined transactions submitted via the `sendrawtransaction` RPC method, + /// and wait for them to be verified, rejected, or for their verification task to be cancelled if + /// the transaction id is mined into the best chain. + QueueRpc(Vec), + /// Check for newly verified transactions. /// /// The transaction downloader does not push transactions into the mempool. diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index ae5deb7a5b9..a4014c9f53a 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -657,10 +657,9 @@ where // send transaction to the rpc queue, ignore any error. let unmined_transaction = UnminedTx::from(raw_transaction.clone()); - let _ = queue_sender.send(unmined_transaction); + let _ = queue_sender.send(unmined_transaction.clone()); - let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); - let request = mempool::Request::Queue(vec![transaction_parameter]); + let request = mempool::Request::QueueRpc(vec![unmined_transaction]); let response = mempool.oneshot(request).await.map_server_error()?; diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index c2a9c70a348..e503c885c47 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -60,7 +60,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction.into()]); let response = mempool::Response::Queued(vec![Ok(())]); mempool @@ -114,7 +114,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction.into()]); mempool .expect_request(expected_request) @@ -174,7 +174,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction.into()]); let response = mempool::Response::Queued(vec![Err(DummyError.into())]); mempool @@ -857,7 +857,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); let tx_unmined = UnminedTx::from(tx); - let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); + let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone().into()]); // fail the mempool insertion mempool @@ -949,7 +949,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); let tx_unmined = UnminedTx::from(tx.clone()); - let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); + let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone().into()]); // insert to hs we will use later transactions_hash_set.insert(tx_unmined.id); diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 2d9b2b3e0c5..1e0daa7f62d 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -27,7 +27,7 @@ use std::{ }; use futures::{future::FutureExt, stream::Stream}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, oneshot}; use tokio_stream::StreamExt; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; @@ -560,7 +560,7 @@ impl Service for Mempool { for tx in tx_retries { // This is just an efficiency optimisation, so we don't care if queueing // transaction requests fails. - let _result = tx_downloads.download_if_needed_and_verify(tx); + let _result = tx_downloads.download_if_needed_and_verify(tx, None); } } @@ -608,8 +608,8 @@ impl Service for Mempool { tracing::trace!("chain grew during tx verification, retrying ..",); // We don't care if re-queueing the transaction request fails. - let _result = - tx_downloads.download_if_needed_and_verify(tx.transaction.into()); + let _result = tx_downloads + .download_if_needed_and_verify(tx.transaction.into(), None); } } Ok(Err((txid, error))) => { @@ -762,7 +762,7 @@ impl Service for Mempool { .into_iter() .map(|gossiped_tx| -> Result<(), MempoolError> { storage.should_download_or_verify(gossiped_tx.id())?; - tx_downloads.download_if_needed_and_verify(gossiped_tx)?; + tx_downloads.download_if_needed_and_verify(gossiped_tx, None)?; Ok(()) }) @@ -775,6 +775,47 @@ impl Service for Mempool { async move { Ok(Response::Queued(rsp)) }.boxed() } + // Queue mempool candidates + Request::QueueRpc(unmined_txs) => { + trace!(req_count = ?unmined_txs.len(), "got mempool QueueRpc request"); + + let results: Vec>, BoxError>> = + unmined_txs + .into_iter() + .map(Gossip::Tx) + .map( + |unmined_tx| -> Result< + oneshot::Receiver>, + MempoolError, + > { + let (rsp_tx, rsp_rx) = oneshot::channel(); + storage.should_download_or_verify(unmined_tx.id())?; + tx_downloads + .download_if_needed_and_verify(unmined_tx, Some(rsp_tx))?; + Ok(rsp_rx) + }, + ) + .map(|result| result.map_err(BoxError::from)) + .collect(); + + // We've added transactions to the queue + self.update_metrics(); + + async move { + let mut rsp = vec![]; + + for result in results { + rsp.push(match result { + Ok(rsp_rx) => rsp_rx.await?, + Err(err) => Err(err), + }) + } + + Ok(Response::Queued(rsp)) + } + .boxed() + } + // Store successfully downloaded and verified transactions in the mempool Request::CheckForVerifiedTransactions => { trace!(?req, "got mempool request"); @@ -821,6 +862,15 @@ impl Service for Mempool { .collect(), ), + // Don't queue mempool candidates, because there is no queue. + Request::QueueRpc(unmined_txs) => Response::Queued( + iter::repeat(MempoolError::Disabled) + .take(unmined_txs.len()) + .map(BoxError::from) + .map(Err) + .collect(), + ), + // Check if the mempool should be enabled. // This request makes sure mempools are debug-enabled in the acceptance tests. Request::CheckForVerifiedTransactions => { diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index d3f62b4087b..bce2e647494 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -51,7 +51,7 @@ use zebra_chain::{ use zebra_consensus::transaction as tx; use zebra_network as zn; use zebra_node_services::mempool::Gossip; -use zebra_state as zs; +use zebra_state::{self as zs, CloneError}; use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; @@ -105,17 +105,17 @@ pub const MAX_INBOUND_CONCURRENCY: usize = 25; struct CancelDownloadAndVerify; /// Errors that can occur while downloading and verifying a transaction. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[allow(dead_code)] pub enum TransactionDownloadVerifyError { #[error("transaction is already in state")] InState, #[error("error in state service")] - StateError(#[source] BoxError), + StateError(#[source] CloneError), #[error("error downloading transaction")] - DownloadFailed(#[source] BoxError), + DownloadFailed(#[source] CloneError), #[error("transaction download / verification was cancelled")] Cancelled, @@ -243,6 +243,7 @@ where pub fn download_if_needed_and_verify( &mut self, gossiped_tx: Gossip, + rsp_tx: Option>>, ) -> Result<(), MempoolError> { let txid = gossiped_tx.id(); @@ -295,7 +296,7 @@ where Ok((Some(height), next_height)) } Ok(_) => unreachable!("wrong response"), - Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), + Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())), }?; trace!(?txid, ?next_height, "got next height"); @@ -307,11 +308,12 @@ where let tx = match network .oneshot(req) .await + .map_err(CloneError::from) .map_err(TransactionDownloadVerifyError::DownloadFailed)? { zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| { TransactionDownloadVerifyError::DownloadFailed( - "no transactions returned".into(), + BoxError::from("no transactions returned").into(), ) })?, _ => unreachable!("wrong response to transaction request"), @@ -373,7 +375,7 @@ where let task = tokio::spawn(async move { // Prefer the cancel handle if both are ready. - tokio::select! { + let result = tokio::select! { biased; _ = &mut cancel_rx => { trace!("task cancelled prior to completion"); @@ -381,7 +383,17 @@ where Err((TransactionDownloadVerifyError::Cancelled, txid)) } verification = fut => verification, + }; + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = + rsp_tx.send(result.as_ref().map(|_| ()).map_err(|(err, id)| { + format!("failed to verify tx {id}, err: {err}").into() + })); } + + result }); self.pending.push(task); @@ -458,6 +470,7 @@ where match state .ready() .await + .map_err(CloneError::from) .map_err(TransactionDownloadVerifyError::StateError)? .call(zs::Request::Transaction(txid.mined_id())) .await @@ -465,7 +478,7 @@ where Ok(zs::Response::Transaction(None)) => Ok(()), Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState), Ok(_) => unreachable!("wrong response"), - Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), + Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())), }?; Ok(()) From a41b7b82c4c6da0b9b19391dfc6f98b86c57556d Mon Sep 17 00:00:00 2001 From: ar Date: Tue, 20 Aug 2024 17:54:14 -0400 Subject: [PATCH 2/8] removes unnecessary clone --- zebra-rpc/src/methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index a4014c9f53a..bd86c56ec69 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -656,7 +656,7 @@ where let transaction_hash = raw_transaction.hash(); // send transaction to the rpc queue, ignore any error. - let unmined_transaction = UnminedTx::from(raw_transaction.clone()); + let unmined_transaction = UnminedTx::from(raw_transaction); let _ = queue_sender.send(unmined_transaction.clone()); let request = mempool::Request::QueueRpc(vec![unmined_transaction]); From 352f45842bb042f5558fd8998e856b98449778b8 Mon Sep 17 00:00:00 2001 From: ar Date: Tue, 20 Aug 2024 18:06:28 -0400 Subject: [PATCH 3/8] fix clippy warnings --- zebra-rpc/src/methods/tests/prop.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index e503c885c47..0e8789fdd3b 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -60,7 +60,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction]); let response = mempool::Response::Queued(vec![Ok(())]); mempool @@ -114,7 +114,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction]); mempool .expect_request(expected_request) @@ -174,7 +174,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction]); let response = mempool::Response::Queued(vec![Err(DummyError.into())]); mempool @@ -857,7 +857,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); let tx_unmined = UnminedTx::from(tx); - let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone().into()]); + let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone()]); // fail the mempool insertion mempool @@ -949,7 +949,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); let tx_unmined = UnminedTx::from(tx.clone()); - let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone().into()]); + let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone()]); // insert to hs we will use later transactions_hash_set.insert(tx_unmined.id); From acdd8dcb0b2927f8212b03d8186db9bf472b68ec Mon Sep 17 00:00:00 2001 From: Arya Date: Thu, 29 Aug 2024 21:55:14 -0400 Subject: [PATCH 4/8] returns verification errors for all `mempool::Queue` requests, removes `QueueRpc` request variant --- zebra-node-services/src/mempool.rs | 5 --- zebra-rpc/src/methods.rs | 7 ++-- zebra-rpc/src/methods/tests/prop.rs | 10 +++--- zebrad/src/components/mempool.rs | 40 +++------------------- zebrad/src/components/mempool/downloads.rs | 10 +++--- 5 files changed, 20 insertions(+), 52 deletions(-) diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index 7f273b5953a..98c1969bbad 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -58,11 +58,6 @@ pub enum Request { /// The transaction downloader checks for duplicates across IDs and transactions. Queue(Vec), - /// Queue a list of unmined transactions submitted via the `sendrawtransaction` RPC method, - /// and wait for them to be verified, rejected, or for their verification task to be cancelled if - /// the transaction id is mined into the best chain. - QueueRpc(Vec), - /// Check for newly verified transactions. /// /// The transaction downloader does not push transactions into the mempool. diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index bd86c56ec69..ae5deb7a5b9 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -656,10 +656,11 @@ where let transaction_hash = raw_transaction.hash(); // send transaction to the rpc queue, ignore any error. - let unmined_transaction = UnminedTx::from(raw_transaction); - let _ = queue_sender.send(unmined_transaction.clone()); + let unmined_transaction = UnminedTx::from(raw_transaction.clone()); + let _ = queue_sender.send(unmined_transaction); - let request = mempool::Request::QueueRpc(vec![unmined_transaction]); + let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); + let request = mempool::Request::Queue(vec![transaction_parameter]); let response = mempool.oneshot(request).await.map_server_error()?; diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 0e8789fdd3b..c2a9c70a348 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -60,7 +60,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction]); + let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); let response = mempool::Response::Queued(vec![Ok(())]); mempool @@ -114,7 +114,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction]); + let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); mempool .expect_request(expected_request) @@ -174,7 +174,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::QueueRpc(vec![unmined_transaction]); + let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); let response = mempool::Response::Queued(vec![Err(DummyError.into())]); mempool @@ -857,7 +857,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); let tx_unmined = UnminedTx::from(tx); - let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone()]); + let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); // fail the mempool insertion mempool @@ -949,7 +949,7 @@ proptest! { let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); let tx_unmined = UnminedTx::from(tx.clone()); - let expected_request = mempool::Request::QueueRpc(vec![tx_unmined.clone()]); + let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); // insert to hs we will use later transactions_hash_set.insert(tx_unmined.id); diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 1e0daa7f62d..5aa98332df1 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -758,40 +758,19 @@ impl Service for Mempool { Request::Queue(gossiped_txs) => { trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request"); - let rsp: Vec> = gossiped_txs - .into_iter() - .map(|gossiped_tx| -> Result<(), MempoolError> { - storage.should_download_or_verify(gossiped_tx.id())?; - tx_downloads.download_if_needed_and_verify(gossiped_tx, None)?; - - Ok(()) - }) - .map(|result| result.map_err(BoxError::from)) - .collect(); - - // We've added transactions to the queue - self.update_metrics(); - - async move { Ok(Response::Queued(rsp)) }.boxed() - } - - // Queue mempool candidates - Request::QueueRpc(unmined_txs) => { - trace!(req_count = ?unmined_txs.len(), "got mempool QueueRpc request"); - let results: Vec>, BoxError>> = - unmined_txs + gossiped_txs .into_iter() - .map(Gossip::Tx) .map( - |unmined_tx| -> Result< + |gossiped_tx| -> Result< oneshot::Receiver>, MempoolError, > { let (rsp_tx, rsp_rx) = oneshot::channel(); - storage.should_download_or_verify(unmined_tx.id())?; + storage.should_download_or_verify(gossiped_tx.id())?; tx_downloads - .download_if_needed_and_verify(unmined_tx, Some(rsp_tx))?; + .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?; + Ok(rsp_rx) }, ) @@ -862,15 +841,6 @@ impl Service for Mempool { .collect(), ), - // Don't queue mempool candidates, because there is no queue. - Request::QueueRpc(unmined_txs) => Response::Queued( - iter::repeat(MempoolError::Disabled) - .take(unmined_txs.len()) - .map(BoxError::from) - .map(Err) - .collect(), - ), - // Check if the mempool should be enabled. // This request makes sure mempools are debug-enabled in the acceptance tests. Request::CheckForVerifiedTransactions => { diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index bce2e647494..b37f988dcc8 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -387,10 +387,12 @@ where // Send the result to responder channel if one was provided. if let Some(rsp_tx) = rsp_tx { - let _ = - rsp_tx.send(result.as_ref().map(|_| ()).map_err(|(err, id)| { - format!("failed to verify tx {id}, err: {err}").into() - })); + let _ = rsp_tx.send( + result + .as_ref() + .map(|_| ()) + .map_err(|(err, _)| err.clone().into()), + ); } result From 4bbac10ef5992bc039a2867be2bbacf539456f1d Mon Sep 17 00:00:00 2001 From: Arya Date: Thu, 29 Aug 2024 22:18:34 -0400 Subject: [PATCH 5/8] returns oneshot channel in mempool::Response::Queue --- zebra-node-services/src/mempool.rs | 9 ++++----- zebra-rpc/src/methods.rs | 14 ++++++++++---- zebra-rpc/src/methods/tests/prop.rs | 13 ++++++++++--- zebra-rpc/src/queue/tests/prop.rs | 10 +++++++--- zebrad/src/components/mempool.rs | 16 ++-------------- .../src/components/mempool/crawler/tests/prop.rs | 14 +++++++++++--- 6 files changed, 44 insertions(+), 32 deletions(-) diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index 98c1969bbad..fbaaf029c75 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; +use tokio::sync::oneshot; use zebra_chain::transaction::{self, UnminedTx, UnminedTxId}; #[cfg(feature = "getblocktemplate-rpcs")] @@ -114,13 +115,11 @@ pub enum Response { /// Returns matching cached rejected [`UnminedTxId`]s from the mempool, RejectedTransactionIds(HashSet), - /// Returns a list of queue results. - /// - /// These are the results of the initial queue checks. - /// The transaction may also fail download or verification later. + /// Returns a list of initial queue checks results and a oneshot receiver + /// for awaiting download and/or verification results. /// /// Each result matches the request at the corresponding vector index. - Queued(Vec>), + Queued(Vec>, BoxError>>), /// Confirms that the mempool has checked for recently verified transactions. CheckedForVerifiedTransactions, diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index ae5deb7a5b9..a30ddbdf017 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -664,7 +664,7 @@ where let response = mempool.oneshot(request).await.map_server_error()?; - let queue_results = match response { + let mut queue_results = match response { mempool::Response::Queued(results) => results, _ => unreachable!("incorrect response variant from mempool service"), }; @@ -675,10 +675,16 @@ where "mempool service returned more results than expected" ); - tracing::debug!("sent transaction to mempool: {:?}", &queue_results[0]); + let queue_result = queue_results + .pop() + .expect("there should be exactly one item in Vec") + .inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err)) + .map_server_error()? + .await; + + tracing::debug!("sent transaction to mempool: {:?}", &queue_result); - queue_results[0] - .as_ref() + queue_result .map(|_| SentTransactionHash(transaction_hash)) .map_server_error() } diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index c2a9c70a348..1ca73dfdc14 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -7,6 +7,7 @@ use hex::ToHex; use jsonrpc_core::{Error, ErrorCode}; use proptest::{collection::vec, prelude::*}; use thiserror::Error; +use tokio::sync::oneshot; use tower::buffer::Buffer; use zebra_chain::{ @@ -61,7 +62,9 @@ proptest! { let unmined_transaction = UnminedTx::from(transaction); let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); - let response = mempool::Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = mempool::Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) @@ -897,7 +900,9 @@ proptest! { // now a retry will be sent to the mempool let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]); - let response = mempool::Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = mempool::Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) @@ -997,7 +1002,9 @@ proptest! { for tx in txs.clone() { let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]); - let response = mempool::Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = mempool::Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 1db9a340f2e..9f63ecce24d 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -5,7 +5,7 @@ use std::{collections::HashSet, env, sync::Arc}; use proptest::prelude::*; use chrono::Duration; -use tokio::time; +use tokio::{sync::oneshot, time}; use tower::ServiceExt; use zebra_chain::{ @@ -196,7 +196,9 @@ proptest! { let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let send_task = tokio::spawn(mempool.clone().oneshot(request)); - let response = Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) @@ -337,7 +339,9 @@ proptest! { // retry will queue the transaction to mempool let gossip = Gossip::Tx(UnminedTx::from(transaction.clone())); let expected_request = Request::Queue(vec![gossip]); - let response = Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 5aa98332df1..05732ddaac2 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -758,7 +758,7 @@ impl Service for Mempool { Request::Queue(gossiped_txs) => { trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request"); - let results: Vec>, BoxError>> = + let rsp: Vec>, BoxError>> = gossiped_txs .into_iter() .map( @@ -780,19 +780,7 @@ impl Service for Mempool { // We've added transactions to the queue self.update_metrics(); - async move { - let mut rsp = vec![]; - - for result in results { - rsp.push(match result { - Ok(rsp_rx) => rsp_rx.await?, - Err(err) => Err(err), - }) - } - - Ok(Response::Queued(rsp)) - } - .boxed() + async move { Ok(Response::Queued(rsp)) }.boxed() } // Store successfully downloaded and verified transactions in the mempool diff --git a/zebrad/src/components/mempool/crawler/tests/prop.rs b/zebrad/src/components/mempool/crawler/tests/prop.rs index fa1e3ef5785..524d754cfdc 100644 --- a/zebrad/src/components/mempool/crawler/tests/prop.rs +++ b/zebrad/src/components/mempool/crawler/tests/prop.rs @@ -6,7 +6,7 @@ use proptest::{ collection::{hash_set, vec}, prelude::*, }; -use tokio::time; +use tokio::{sync::oneshot, time}; use zebra_chain::{ chain_sync_status::ChainSyncStatus, parameters::Network, transaction::UnminedTxId, @@ -317,9 +317,17 @@ async fn respond_to_queue_request( expected_transaction_ids: HashSet, response: impl IntoIterator>, ) -> Result<(), TestCaseError> { - let response = response + let response: Vec>, BoxError>> = response .into_iter() - .map(|result| result.map_err(BoxError::from)) + .map(|result| { + result + .map(|_| { + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + rsp_rx + }) + .map_err(BoxError::from) + }) .collect(); mempool From a2a1cbb3a72e22c642c698cae615224f4bd2b0b0 Mon Sep 17 00:00:00 2001 From: Arya Date: Thu, 29 Aug 2024 22:38:04 -0400 Subject: [PATCH 6/8] updates a test vector to check for download or verification error in mempool::response::Queued result receiver --- zebrad/src/components/mempool/tests/vector.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 2868fef2e65..c285923fa7d 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -445,12 +445,17 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .call(Request::Queue(vec![txid.into()])) .await .unwrap(); - let queued_responses = match response { + let mut queued_responses = match response { Response::Queued(queue_responses) => queue_responses, _ => unreachable!("will never happen in this test"), }; assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); + + let queued_response = queued_responses + .pop() + .expect("already checked that there is exactly 1 item in Vec") + .expect("initial queue checks result should be Ok"); + assert_eq!(mempool.tx_downloads().in_flight(), 1); // Push block 2 to the state @@ -489,6 +494,14 @@ async fn mempool_cancel_mined() -> Result<(), Report> { // Check if download was cancelled. assert_eq!(mempool.tx_downloads().in_flight(), 0); + assert!( + queued_response + .await + .expect("channel should not be closed") + .is_err(), + "queued tx should fail to download and verify due to chain tip change" + ); + Ok(()) } From 7e5359dea76224b6f66f1f6e6074e0b2b34ba15d Mon Sep 17 00:00:00 2001 From: Arya Date: Thu, 29 Aug 2024 22:57:05 -0400 Subject: [PATCH 7/8] Always require tokio as a dependency in zebra-node-services --- zebra-node-services/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zebra-node-services/Cargo.toml b/zebra-node-services/Cargo.toml index 5c188b34b40..8d0992dcf5a 100644 --- a/zebra-node-services/Cargo.toml +++ b/zebra-node-services/Cargo.toml @@ -34,7 +34,7 @@ rpc-client = [ "serde_json", ] -shielded-scan = ["tokio"] +shielded-scan = [] [dependencies] zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.39" } @@ -48,7 +48,7 @@ jsonrpc-core = { version = "18.0.0", optional = true } reqwest = { version = "0.11.26", default-features = false, features = ["rustls-tls"], optional = true } serde = { version = "1.0.204", optional = true } serde_json = { version = "1.0.122", optional = true } -tokio = { version = "1.39.2", features = ["time"], optional = true } +tokio = { version = "1.39.2", features = ["time", "sync"] } [dev-dependencies] From 33b7767ee1b1ad2df57a9a0bfc530e9f1a187ff7 Mon Sep 17 00:00:00 2001 From: Arya Date: Thu, 29 Aug 2024 23:27:45 -0400 Subject: [PATCH 8/8] checks for closed channel errors in sendrawtransaction and updates a prop test to check that verification errors are propagated correctly --- zebra-rpc/src/methods.rs | 1 + zebra-rpc/src/methods/tests/prop.rs | 30 +++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index a30ddbdf017..471d542922c 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -685,6 +685,7 @@ where tracing::debug!("sent transaction to mempool: {:?}", &queue_result); queue_result + .map_server_error()? .map(|_| SentTransactionHash(transaction_hash)) .map_server_error() } diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 1ca73dfdc14..409a6aefe52 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -114,10 +114,10 @@ proptest! { .expect("Transaction serializes successfully"); let transaction_hex = hex::encode(&transaction_bytes); - let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); + let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex.clone())); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]); mempool .expect_request(expected_request) @@ -141,6 +141,32 @@ proptest! { "Result is not a server error: {result:?}" ); + let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); + + let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]); + + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Err("any verification error".into())); + mempool + .expect_request(expected_request) + .await? + .respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)]))); + + let result = send_task + .await + .expect("Sending raw transactions should not panic"); + + prop_assert!( + matches!( + result, + Err(Error { + code: ErrorCode::ServerError(_), + .. + }) + ), + "Result is not a server error: {result:?}" + ); + // The queue task should continue without errors or panics let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); prop_assert!(rpc_tx_queue_task_result.is_none());