From db75beb0e24cd9e335f29f22ffd174f592bd7871 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Wed, 30 Mar 2022 20:44:58 -0300 Subject: [PATCH 01/26] Add a rpc queue --- zebra-rpc/src/lib.rs | 1 + zebra-rpc/src/queue.rs | 297 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 298 insertions(+) create mode 100644 zebra-rpc/src/queue.rs diff --git a/zebra-rpc/src/lib.rs b/zebra-rpc/src/lib.rs index ce6072e2589..0bb4b072e2e 100644 --- a/zebra-rpc/src/lib.rs +++ b/zebra-rpc/src/lib.rs @@ -6,6 +6,7 @@ pub mod config; pub mod methods; +pub mod queue; pub mod server; #[cfg(test)] mod tests; diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs new file mode 100644 index 00000000000..73f4c2df861 --- /dev/null +++ b/zebra-rpc/src/queue.rs @@ -0,0 +1,297 @@ +//! Transaction Queue. +//! +//! All transactions that are sent from RPC methods should be added to this queue for retries. +//! Transactions can fail to be inserted to the mempool inmediatly to different reasons, +//! like having not mined utxos. +//! +//! The queue is a `HashMap` which can be shared by a `Listener` and a `Runner` component. + +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, Mutex}, +}; + +use chrono::Duration; +use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + time::Instant, +}; + +use tower::{Service, ServiceExt}; + +use zebra_chain::{ + chain_tip::ChainTip, + parameters::NetworkUpgrade, + transaction::{Transaction, UnminedTx, UnminedTxId}, +}; +use zebra_node_services::{ + mempool::{Gossip, Request, Response}, + BoxError, +}; + +use zebra_state::{ReadRequest, ReadResponse}; + +//#[cfg(test)] +//mod tests; + +/// The number of blocks a transaction can be in the queue. +const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; + +#[derive(Clone, Debug)] +/// The queue itself +pub struct Queue { + transactions: HashMap, Instant)>, +} + +#[derive(Clone, Debug)] +/// The runner +pub struct Runner { + queue: Arc>, + sender: Sender>, +} + +/// The listener +pub struct Listener { + queue: Arc>, + receiver: Receiver>, +} + +impl Queue { + /// Start a new queue + pub fn start() -> (Listener, Runner) { + let (sender, receiver) = channel(10); + + let queue = Arc::new(Mutex::new(Queue { + transactions: HashMap::new(), + })); + + let runner = Runner { + queue: queue.clone(), + sender, + }; + + let listener = Listener { queue, receiver }; + + (listener, runner) + } + + /// Get the transactions in the queue + pub fn transactions(&self) -> HashMap, Instant)> { + self.transactions.clone() + } + + /// Insert a transaction to the queue + pub fn insert(&mut self, unmined_tx: UnminedTx) { + self.transactions + .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now())); + } + + /// Remove a transaction from the queue + pub fn remove(&mut self, unmined_id: UnminedTxId) { + self.transactions.remove(&unmined_id); + } +} + +impl Listener { + /// Listen for transaction and insert them to the queue + pub async fn listen(&mut self) { + loop { + if let Some(Some(tx)) = self.receiver.recv().await { + self.queue + .lock() + .expect("queue mutex should be unpoisoned") + .insert(tx); + } + } + } +} + +impl Runner { + /// Access the sender field of the runner. + pub fn sender(&self) -> Sender> { + self.sender.clone() + } + + /// Access the mutable queue. + pub fn queue(&self) -> Arc> { + self.queue.clone() + } + + /// Get the queue transactions as a `HashSet` of unmined ids. + fn transactions_as_hash_set(&self) -> HashSet { + let transactions = self + .queue + .lock() + .expect("queue mutex should be unpoisoned") + .transactions(); + transactions.iter().map(|t| *t.0).collect() + } + + /// Get the queue transactions as a `Vec` of transactions. + fn transactions_as_vec(&self) -> Vec> { + let transactions = self + .queue + .lock() + .expect("queue mutex should be unpoisoned") + .transactions(); + transactions.iter().map(|t| t.1 .0.clone()).collect() + } + + /// Retry sending to memempool if needed. + pub async fn run(self, mempool: Mempool, state: State, _tip: Tip) + where + Mempool: Service + Clone + 'static, + State: Service + + Clone + + Send + + Sync + + 'static, + Tip: ChainTip + Clone + Send + Sync + 'static, + { + // TODO: this should be an argument + let network = zebra_chain::parameters::Network::Mainnet; + + // TODO: Use tip.best_tip_height() + let tip_height = zebra_chain::block::Height(1); + + // get spacing between blocks + let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height); + + loop { + // sleep until the next block + tokio::time::sleep(spacing.to_std().unwrap()).await; + + // first, remove what is expired + self.remove_expired(spacing); + + // remove if any of the queued transactions is now in the mempool + let in_mempool = + Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_mempool); + + // remove if any of the queued transactions is now in the state + let in_state = Self::check_state(state.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_state); + + // retry what is left in the queue + let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; + } + } + + /// Remove transactions that are expired according to number of blocks and current spacing between blocks. + fn remove_expired(&self, spacing: Duration) { + let duration_to_expire = + Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()); + let transactions = self + .queue + .lock() + .expect("queue mutex should be unpoisoned") + .transactions(); + let now = Instant::now(); + + for tx in transactions.iter() { + let tx_time = + tx.1 .1 + .checked_add(duration_to_expire.to_std().unwrap()) + .unwrap(); + + if now > tx_time { + self.queue + .lock() + .expect("queue mutex should be unpoisoned") + .remove(*tx.0); + } + } + } + + /// Remove transactions from the queue that had been inserted to the state or the mempool. + fn remove_committed(&self, to_remove: HashSet) { + for r in to_remove { + self.queue + .lock() + .expect("queue mutex should be unpoisoned") + .remove(r); + } + } + + /// Check the mempool for given transactions. + async fn check_mempool( + mempool: Mempool, + transactions: HashSet, + ) -> HashSet + where + Mempool: Service + Clone + 'static, + { + let mut response = HashSet::new(); + let request = Request::TransactionsById(transactions); + + // TODO: ignore errors + let mempool_response = mempool + .oneshot(request) + .await + .expect("Requesting transactions should not panic"); + + match mempool_response { + Response::Transactions(txs) => { + for tx in txs { + response.insert(tx.id); + } + } + _ => unreachable!("TransactionsById always respond with at least an empty vector"), + } + + response + } + + /// Check the state for given transactions. + async fn check_state( + state: State, + transactions: HashSet, + ) -> HashSet + where + State: Service + + Clone + + Send + + Sync + + 'static, + { + let mut response = HashSet::new(); + + for t in transactions { + let request = ReadRequest::Transaction(t.mined_id()); + + let state_response = state + .clone() + .oneshot(request) + .await + .expect("Requesting transactions should not panic"); + + match state_response { + ReadResponse::Transaction(Some(tx)) => { + response.insert(tx.0.unmined_id()); + } + ReadResponse::Transaction(None) => {} + _ => unreachable!("ReadResponse::Transaction is always some or none"), + } + } + + response + } + + /// Retry sending given transactions to mempool. + async fn retry(mempool: Mempool, transactions: Vec>) + where + Mempool: Service + Clone + 'static, + { + for tx in transactions { + let transaction_parameter = Gossip::Tx(UnminedTx::from(tx.clone())); + let request = Request::Queue(vec![transaction_parameter]); + + let _ = mempool + .clone() + .oneshot(request) + .await + .expect("Sending to memmpool should not panic"); + } + } +} From 706c89d84d19cac26628245df2cd6794c028419d Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Wed, 30 Mar 2022 21:33:12 -0300 Subject: [PATCH 02/26] Implement the rpc queue --- zebra-rpc/src/methods.rs | 54 +++++++++++++++++++++++++++++++--------- zebra-rpc/src/server.rs | 2 +- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 6e119afaa66..ef16935db82 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -21,11 +21,13 @@ use zebra_chain::{ chain_tip::ChainTip, parameters::{ConsensusBranchId, Network, NetworkUpgrade}, serialization::{SerializationError, ZcashDeserialize}, - transaction::{self, SerializedTransaction, Transaction}, + transaction::{self, SerializedTransaction, Transaction, UnminedTx}, }; use zebra_network::constants::USER_AGENT; use zebra_node_services::{mempool, BoxError}; +use crate::queue::{Queue, Runner}; + #[cfg(test)] mod tests; @@ -160,17 +162,23 @@ where /// The configured network for this RPC service. #[allow(dead_code)] network: Network, + + /// An instance of the RPC transaction queue + queue_runner: Runner, } impl RpcImpl where - Mempool: Service, + Mempool: Service + 'static, State: Service< - zebra_state::ReadRequest, - Response = zebra_state::ReadResponse, - Error = zebra_state::BoxError, - >, - Tip: ChainTip + Send + Sync, + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + Tip: ChainTip + Clone + Send + Sync + 'static, { /// Create a new instance of the RPC handler. pub fn new( @@ -182,14 +190,31 @@ where ) -> Self where Version: ToString, + >::Future: Send, + >::Future: Send, { - RpcImpl { + let (mut listener, runner) = Queue::start(); + + let rpc_impl = RpcImpl { app_version: app_version.to_string(), - mempool, - state, - latest_chain_tip, + mempool: mempool.clone(), + state: state.clone(), + latest_chain_tip: latest_chain_tip.clone(), network, - } + queue_runner: runner.clone(), + }; + + // run the listener + tokio::spawn(async move { + listener.listen().await; + }); + + // run the process queue + tokio::spawn(async move { + runner.run(mempool, state, latest_chain_tip).await; + }); + + rpc_impl } } @@ -319,6 +344,7 @@ where raw_transaction_hex: String, ) -> BoxFuture> { let mempool = self.mempool.clone(); + let queue_sender = self.queue_runner.sender(); async move { let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| { @@ -329,6 +355,10 @@ where let transaction_hash = raw_transaction.hash(); + // send transaction to the rpc queue + let unmined_transaction = UnminedTx::from(raw_transaction.clone()); + let _ = queue_sender.send(Some(unmined_transaction)).await; + let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); let request = mempool::Request::Queue(vec![transaction_parameter]); diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 523924cc5d4..75ce5402d48 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -52,7 +52,7 @@ impl RpcServer { + Sync + 'static, State::Future: Send, - Tip: ChainTip + Send + Sync + 'static, + Tip: ChainTip + Clone + Send + Sync + 'static, { if let Some(listen_addr) = config.listen_addr { info!("Trying to open RPC endpoint at {}...", listen_addr,); From e3e26e942b6fa7c5d54c4e285366ab58c0114c7c Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Wed, 30 Mar 2022 22:09:11 -0300 Subject: [PATCH 03/26] Add rpc queue tests --- zebra-rpc/src/methods/tests/prop.rs | 88 ++++++++++++ zebra-rpc/src/queue.rs | 4 +- zebra-rpc/src/queue/tests.rs | 3 + zebra-rpc/src/queue/tests/prop.rs | 204 ++++++++++++++++++++++++++++ 4 files changed, 297 insertions(+), 2 deletions(-) create mode 100644 zebra-rpc/src/queue/tests.rs create mode 100644 zebra-rpc/src/queue/tests/prop.rs diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index aac81f0ccb3..4854e126474 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -512,6 +512,94 @@ proptest! { Ok::<_, TestCaseError>(()) })?; } + + /// Test the queue functionality using `send_raw_transaction` + #[test] + fn rpc_queue_main_loop(tx in any::()) + { + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + let transaction_hash = tx.hash(); + + runtime.block_on(async move { + tokio::time::pause(); + + let mut mempool = MockService::build().for_prop_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + + let rpc = RpcImpl::new( + "RPC test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + // send a transaction + let tx_bytes = tx + .zcash_serialize_to_vec() + .expect("Transaction serializes successfully"); + let tx_hex = hex::encode(&tx_bytes); + let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); + + let tx_unmined = UnminedTx::from(tx); + let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); + + // fail the mempool insertion + mempool + .expect_request(expected_request) + .await + .unwrap() + .respond(Err(DummyError)); + + let _ = send_task + .await + .expect("Sending raw transactions should not panic"); + + // make sure the transaction was inserted to the queue + prop_assert_eq!(rpc.queue_runner.queue().lock().unwrap().transactions().len(), 1); + + // advance enough time to have a new runner iteration + let spacing = chrono::Duration::seconds(150); + tokio::time::advance(spacing.to_std().unwrap()).await; + + // the runner will made a new call to TransactionsById + let mut transactions_hash_set = HashSet::new(); + transactions_hash_set.insert(tx_unmined.id); + let expected_request = mempool::Request::TransactionsById(transactions_hash_set); + let response = mempool::Response::Transactions(vec![]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + // the runner will also query the state again for the transaction + let expected_request = zebra_state::ReadRequest::Transaction(transaction_hash); + let response = zebra_state::ReadResponse::Transaction(None); + + state + .expect_request(expected_request) + .await? + .respond(response); + + // now a retry will be sent to the mempool + let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]); + let response = mempool::Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + // no more requets are done + mempool.expect_no_requests().await?; + state.expect_no_requests().await?; + + Ok::<_, TestCaseError>(()) + })?; + } } #[derive(Clone, Copy, Debug, Error)] diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 73f4c2df861..5d38ae9ca80 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -31,8 +31,8 @@ use zebra_node_services::{ use zebra_state::{ReadRequest, ReadResponse}; -//#[cfg(test)] -//mod tests; +#[cfg(test)] +mod tests; /// The number of blocks a transaction can be in the queue. const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; diff --git a/zebra-rpc/src/queue/tests.rs b/zebra-rpc/src/queue/tests.rs new file mode 100644 index 00000000000..300832fd142 --- /dev/null +++ b/zebra-rpc/src/queue/tests.rs @@ -0,0 +1,3 @@ +//! Test code for the RPC queue + +mod prop; diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs new file mode 100644 index 00000000000..dba24cf1b45 --- /dev/null +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -0,0 +1,204 @@ +//! Randomised property tests for the RPC Queue. + +use std::{collections::HashSet, ops::Mul}; + +use proptest::prelude::*; + +use tower::ServiceExt; + +use zebra_chain::transaction::{Transaction, UnminedTx, UnminedTxId}; +use zebra_node_services::mempool::{Gossip, Request, Response}; +use zebra_state::{BoxError, ReadRequest, ReadResponse}; +use zebra_test::mock_service::MockService; + +use crate::queue::{Queue, Runner}; + +proptest! { + /// Test insert to the queue and remove from it. + #[test] + fn insert_remove_to_from_queue(transaction in any::()) { + // create a queue + let (_listener, runner) = Queue::start(); + + // insert transaction + runner.queue.lock().unwrap().insert(transaction.clone()); + + // transaction was inserted to queue + let queue_transactions = runner.queue.lock().unwrap().transactions(); + prop_assert_eq!(1, queue_transactions.len()); + + // remove transaction from the queue + runner.queue.lock().unwrap().remove(transaction.id); + + // transaction was removed from queue + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 0); + } + + /// Test transactions are removed from the queue after time elapses. + #[test] + fn remove_expired_transactions_from_queue(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + // pause the clock + tokio::time::pause(); + + // create a queue + let (_listener, runner) = Queue::start(); + + // insert a transaction to the queue + runner.queue.lock().unwrap().insert(transaction); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + + // have a block interval value + let spacing = chrono::Duration::seconds(150); + + // apply expiration inmediatly, transaction will not be removed from queue + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + + // apply expiration after 1 block elapsed, transaction will not be removed from queue + tokio::time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + + // apply expiration after 2 blocks elapsed, transaction will not be removed from queue + tokio::time::advance(spacing.mul(2).to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + + // apply expiration after 3 block elapsed, transaction will be removed from queue + tokio::time::advance(spacing.mul(3).to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 0); + + Ok::<_, TestCaseError>(()) + })?; + } + + /// Test transactions are removed from queue after they get in the mempool + #[test] + fn queue_runner_mempool(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + let mut mempool = MockService::build().for_prop_tests(); + + // create a queue + let (_listener, runner) = Queue::start(); + + // insert a transaction to the queue + let unmined_transaction = UnminedTx::from(transaction); + runner.queue.lock().unwrap().insert(unmined_transaction.clone()); + + let transactions = runner.queue.lock().unwrap().transactions(); + prop_assert_eq!(transactions.len(), 1); + + // convert to hashset + let transactions_hash_set: HashSet = transactions.iter().map(|t| *t.0).collect(); + + // run the mempool checker + let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone())); + + let expected_request = Request::TransactionsById(transactions_hash_set.clone()); + let response = Response::Transactions(vec![]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transactions should not panic"); + + // empty results, transaction is not in the mempool + prop_assert_eq!(result, HashSet::new()); + + // now lets insert it to the mempool + let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); + let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); + + let send_task = tokio::spawn(mempool.clone().oneshot(request)); + + let response = Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let _ = send_task.await.expect("Inserting to mempool should not panic"); + + // check the mempool again + let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone())); + + let expected_request = Request::TransactionsById(transactions_hash_set); + let response = Response::Transactions(vec![unmined_transaction]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transactions should not panic"); + + prop_assert_eq!(result.len(), 1); + // not deleted yet + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + // delete + runner.remove_committed(result); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 0); + + // no more + mempool.expect_no_requests().await?; + + Ok::<_, TestCaseError>(()) + })?; + } + + /// Test transactions are removed from queue after they get in the state + #[test] + fn queue_runner_state(transaction in any::()) { + let runtime = zebra_test::init_async(); + + let transaction_hash = transaction.hash(); + + runtime.block_on(async move { + + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + + // create a queue + let (_listener, runner) = Queue::start(); + + // insert a transaction to the queue + let unmined_transaction = UnminedTx::from(&transaction); + runner.queue.lock().unwrap().insert(unmined_transaction.clone()); + prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + + // run the runner + let mut hs = HashSet::new(); + hs.insert(unmined_transaction.id); + + let send_task = tokio::spawn(Runner::check_state(state.clone(), hs)); + + let expected_request = ReadRequest::Transaction(transaction_hash); + let response = ReadResponse::Transaction(None); + + state + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transaction should not panic"); + + prop_assert_eq!(HashSet::new(), result); + + // TODO: finish this test + + state.expect_no_requests().await?; + + Ok::<_, TestCaseError>(()) + })?; + } + + // TODO: add a Runner::retry test +} From 3c3ef7b9aefd73c473d81cb97b4317bc28eeafee Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 31 Mar 2022 13:01:21 -0300 Subject: [PATCH 04/26] Remove mutex, use broadcast channel --- zebra-rpc/src/methods.rs | 22 +++--- zebra-rpc/src/methods/tests/prop.rs | 3 - zebra-rpc/src/queue.rs | 104 ++++++++++------------------ zebra-rpc/src/queue/tests/prop.rs | 40 +++++------ 4 files changed, 66 insertions(+), 103 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index ef16935db82..9a5ba611478 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -14,6 +14,7 @@ use hex::{FromHex, ToHex}; use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; +use tokio::sync::broadcast::Sender; use tower::{buffer::Buffer, Service, ServiceExt}; use zebra_chain::{ @@ -26,7 +27,7 @@ use zebra_chain::{ use zebra_network::constants::USER_AGENT; use zebra_node_services::{mempool, BoxError}; -use crate::queue::{Queue, Runner}; +use crate::queue::Queue; #[cfg(test)] mod tests; @@ -163,8 +164,8 @@ where #[allow(dead_code)] network: Network, - /// An instance of the RPC transaction queue - queue_runner: Runner, + /// A sender component of a channel used to send transactions to the queue. + queue_sender: Sender>, } impl RpcImpl @@ -193,7 +194,7 @@ where >::Future: Send, >::Future: Send, { - let (mut listener, runner) = Queue::start(); + let runner = Queue::start(); let rpc_impl = RpcImpl { app_version: app_version.to_string(), @@ -201,14 +202,9 @@ where state: state.clone(), latest_chain_tip: latest_chain_tip.clone(), network, - queue_runner: runner.clone(), + queue_sender: runner.sender(), }; - // run the listener - tokio::spawn(async move { - listener.listen().await; - }); - // run the process queue tokio::spawn(async move { runner.run(mempool, state, latest_chain_tip).await; @@ -344,7 +340,7 @@ where raw_transaction_hex: String, ) -> BoxFuture> { let mempool = self.mempool.clone(); - let queue_sender = self.queue_runner.sender(); + let queue_sender = self.queue_sender.clone(); async move { let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| { @@ -357,7 +353,9 @@ where // send transaction to the rpc queue let unmined_transaction = UnminedTx::from(raw_transaction.clone()); - let _ = queue_sender.send(Some(unmined_transaction)).await; + queue_sender + .send(Some(unmined_transaction)) + .expect("there is always at least one active receiver"); let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); let request = mempool::Request::Queue(vec![transaction_parameter]); diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 4854e126474..6e53bd570f9 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -557,9 +557,6 @@ proptest! { .await .expect("Sending raw transactions should not panic"); - // make sure the transaction was inserted to the queue - prop_assert_eq!(rpc.queue_runner.queue().lock().unwrap().transactions().len(), 1); - // advance enough time to have a new runner iteration let spacing = chrono::Duration::seconds(150); tokio::time::advance(spacing.to_std().unwrap()).await; diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 5d38ae9ca80..d3f09cec5ea 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -1,19 +1,20 @@ //! Transaction Queue. //! //! All transactions that are sent from RPC methods should be added to this queue for retries. -//! Transactions can fail to be inserted to the mempool inmediatly to different reasons, +//! Transactions can fail to be inserted to the mempool inmediatly by different reasons, //! like having not mined utxos. //! -//! The queue is a `HashMap` which can be shared by a `Listener` and a `Runner` component. +//! The [`Queue`] is just a `HashMap` of transactions with insertion date. +//! The [`Runner`] component will do the processing in it's [`Runner::run()`] method. use std::{ collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, + sync::Arc, }; use chrono::Duration; use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, + sync::broadcast::{channel, Receiver, Sender}, time::Instant, }; @@ -43,102 +44,72 @@ pub struct Queue { transactions: HashMap, Instant)>, } -#[derive(Clone, Debug)] +#[derive(Debug)] /// The runner pub struct Runner { - queue: Arc>, + queue: Queue, sender: Sender>, } -/// The listener -pub struct Listener { - queue: Arc>, - receiver: Receiver>, -} - impl Queue { /// Start a new queue - pub fn start() -> (Listener, Runner) { - let (sender, receiver) = channel(10); + pub fn start() -> Runner { + let (sender, _receiver) = channel(10); - let queue = Arc::new(Mutex::new(Queue { + let queue = Queue { transactions: HashMap::new(), - })); - - let runner = Runner { - queue: queue.clone(), - sender, }; - let listener = Listener { queue, receiver }; - - (listener, runner) + Runner { queue, sender } } - /// Get the transactions in the queue + /// Get the transactions in the queue. pub fn transactions(&self) -> HashMap, Instant)> { self.transactions.clone() } - /// Insert a transaction to the queue + /// Insert a transaction to the queue. pub fn insert(&mut self, unmined_tx: UnminedTx) { self.transactions .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now())); } - /// Remove a transaction from the queue + /// Remove a transaction from the queue. pub fn remove(&mut self, unmined_id: UnminedTxId) { self.transactions.remove(&unmined_id); } } -impl Listener { - /// Listen for transaction and insert them to the queue - pub async fn listen(&mut self) { - loop { - if let Some(Some(tx)) = self.receiver.recv().await { - self.queue - .lock() - .expect("queue mutex should be unpoisoned") - .insert(tx); - } - } - } -} - impl Runner { /// Access the sender field of the runner. pub fn sender(&self) -> Sender> { self.sender.clone() } - /// Access the mutable queue. - pub fn queue(&self) -> Arc> { + /// Create a new receiver. + pub fn receiver(&self) -> Receiver> { + self.sender.subscribe() + } + + /// Access the queue. + pub fn queue(&self) -> Queue { self.queue.clone() } /// Get the queue transactions as a `HashSet` of unmined ids. fn transactions_as_hash_set(&self) -> HashSet { - let transactions = self - .queue - .lock() - .expect("queue mutex should be unpoisoned") - .transactions(); + let transactions = self.queue.transactions(); transactions.iter().map(|t| *t.0).collect() } /// Get the queue transactions as a `Vec` of transactions. fn transactions_as_vec(&self) -> Vec> { - let transactions = self - .queue - .lock() - .expect("queue mutex should be unpoisoned") - .transactions(); + let transactions = self.queue.transactions(); transactions.iter().map(|t| t.1 .0.clone()).collect() } /// Retry sending to memempool if needed. - pub async fn run(self, mempool: Mempool, state: State, _tip: Tip) + pub async fn run(mut self, mempool: Mempool, state: State, _tip: Tip) where Mempool: Service + Clone + 'static, State: Service @@ -157,7 +128,14 @@ impl Runner { // get spacing between blocks let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height); + let mut receiver = self.sender.subscribe(); + loop { + // check the channel for new transactions + if let Ok(Some(tx)) = &receiver.recv().await { + let _ = &self.queue.insert(tx.clone()); + } + // sleep until the next block tokio::time::sleep(spacing.to_std().unwrap()).await; @@ -179,14 +157,10 @@ impl Runner { } /// Remove transactions that are expired according to number of blocks and current spacing between blocks. - fn remove_expired(&self, spacing: Duration) { + fn remove_expired(&mut self, spacing: Duration) { let duration_to_expire = Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()); - let transactions = self - .queue - .lock() - .expect("queue mutex should be unpoisoned") - .transactions(); + let transactions = self.queue.transactions(); let now = Instant::now(); for tx in transactions.iter() { @@ -196,21 +170,15 @@ impl Runner { .unwrap(); if now > tx_time { - self.queue - .lock() - .expect("queue mutex should be unpoisoned") - .remove(*tx.0); + self.queue.remove(*tx.0); } } } /// Remove transactions from the queue that had been inserted to the state or the mempool. - fn remove_committed(&self, to_remove: HashSet) { + fn remove_committed(&mut self, to_remove: HashSet) { for r in to_remove { - self.queue - .lock() - .expect("queue mutex should be unpoisoned") - .remove(r); + self.queue.remove(r); } } diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index dba24cf1b45..151530592ca 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -18,20 +18,20 @@ proptest! { #[test] fn insert_remove_to_from_queue(transaction in any::()) { // create a queue - let (_listener, runner) = Queue::start(); + let mut runner = Queue::start(); // insert transaction - runner.queue.lock().unwrap().insert(transaction.clone()); + runner.queue.insert(transaction.clone()); // transaction was inserted to queue - let queue_transactions = runner.queue.lock().unwrap().transactions(); + let queue_transactions = runner.queue.transactions(); prop_assert_eq!(1, queue_transactions.len()); // remove transaction from the queue - runner.queue.lock().unwrap().remove(transaction.id); + runner.queue.remove(transaction.id); // transaction was removed from queue - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 0); + prop_assert_eq!(runner.queue.transactions().len(), 0); } /// Test transactions are removed from the queue after time elapses. @@ -44,33 +44,33 @@ proptest! { tokio::time::pause(); // create a queue - let (_listener, runner) = Queue::start(); + let mut runner = Queue::start(); // insert a transaction to the queue - runner.queue.lock().unwrap().insert(transaction); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + runner.queue.insert(transaction); + prop_assert_eq!(runner.queue.transactions().len(), 1); // have a block interval value let spacing = chrono::Duration::seconds(150); // apply expiration inmediatly, transaction will not be removed from queue runner.remove_expired(spacing); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + prop_assert_eq!(runner.queue.transactions().len(), 1); // apply expiration after 1 block elapsed, transaction will not be removed from queue tokio::time::advance(spacing.to_std().unwrap()).await; runner.remove_expired(spacing); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + prop_assert_eq!(runner.queue.transactions().len(), 1); // apply expiration after 2 blocks elapsed, transaction will not be removed from queue tokio::time::advance(spacing.mul(2).to_std().unwrap()).await; runner.remove_expired(spacing); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + prop_assert_eq!(runner.queue.transactions().len(), 1); // apply expiration after 3 block elapsed, transaction will be removed from queue tokio::time::advance(spacing.mul(3).to_std().unwrap()).await; runner.remove_expired(spacing); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 0); + prop_assert_eq!(runner.queue.transactions().len(), 0); Ok::<_, TestCaseError>(()) })?; @@ -85,13 +85,13 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); // create a queue - let (_listener, runner) = Queue::start(); + let mut runner = Queue::start(); // insert a transaction to the queue let unmined_transaction = UnminedTx::from(transaction); - runner.queue.lock().unwrap().insert(unmined_transaction.clone()); + runner.queue.insert(unmined_transaction.clone()); - let transactions = runner.queue.lock().unwrap().transactions(); + let transactions = runner.queue.transactions(); prop_assert_eq!(transactions.len(), 1); // convert to hashset @@ -143,10 +143,10 @@ proptest! { prop_assert_eq!(result.len(), 1); // not deleted yet - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + prop_assert_eq!(runner.queue.transactions().len(), 1); // delete runner.remove_committed(result); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 0); + prop_assert_eq!(runner.queue.transactions().len(), 0); // no more mempool.expect_no_requests().await?; @@ -167,12 +167,12 @@ proptest! { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); // create a queue - let (_listener, runner) = Queue::start(); + let mut runner = Queue::start(); // insert a transaction to the queue let unmined_transaction = UnminedTx::from(&transaction); - runner.queue.lock().unwrap().insert(unmined_transaction.clone()); - prop_assert_eq!(runner.queue.lock().unwrap().transactions().len(), 1); + runner.queue.insert(unmined_transaction.clone()); + prop_assert_eq!(runner.queue.transactions().len(), 1); // run the runner let mut hs = HashSet::new(); From a7547aef1f8dacf4887c1086b137ec27dbf57f45 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Fri, 1 Apr 2022 15:30:57 -0300 Subject: [PATCH 05/26] Have order and limit in the queue --- zebra-rpc/src/queue.rs | 28 ++++++++++++----- zebra-rpc/src/queue/tests/prop.rs | 50 ++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index d3f09cec5ea..bb424817b03 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -7,12 +7,10 @@ //! The [`Queue`] is just a `HashMap` of transactions with insertion date. //! The [`Runner`] component will do the processing in it's [`Runner::run()`] method. -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::{collections::HashSet, sync::Arc}; use chrono::Duration; +use indexmap::IndexMap; use tokio::{ sync::broadcast::{channel, Receiver, Sender}, time::Instant, @@ -38,10 +36,14 @@ mod tests; /// The number of blocks a transaction can be in the queue. const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; +/// Size of the queue and channel. Suggested valus is equal to +/// `mempool::downloads::MAX_INBOUND_CONCURRENCY` +const CHANNEL_AND_QUEUE_CAPACITY: usize = 10; + #[derive(Clone, Debug)] /// The queue itself pub struct Queue { - transactions: HashMap, Instant)>, + transactions: IndexMap, Instant)>, } #[derive(Debug)] @@ -54,17 +56,17 @@ pub struct Runner { impl Queue { /// Start a new queue pub fn start() -> Runner { - let (sender, _receiver) = channel(10); + let (sender, _receiver) = channel(CHANNEL_AND_QUEUE_CAPACITY); let queue = Queue { - transactions: HashMap::new(), + transactions: IndexMap::new(), }; Runner { queue, sender } } /// Get the transactions in the queue. - pub fn transactions(&self) -> HashMap, Instant)> { + pub fn transactions(&self) -> IndexMap, Instant)> { self.transactions.clone() } @@ -72,12 +74,22 @@ impl Queue { pub fn insert(&mut self, unmined_tx: UnminedTx) { self.transactions .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now())); + + // remove if queue is over capacity + if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY { + self.remove_first(); + } } /// Remove a transaction from the queue. pub fn remove(&mut self, unmined_id: UnminedTxId) { self.transactions.remove(&unmined_id); } + + /// Remove the oldest transaction from the queue. + pub fn remove_first(&mut self) { + self.transactions.shift_remove_index(0); + } } impl Runner { diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 151530592ca..3a1b5889053 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -11,7 +11,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response}; use zebra_state::{BoxError, ReadRequest, ReadResponse}; use zebra_test::mock_service::MockService; -use crate::queue::{Queue, Runner}; +use crate::queue::{Queue, Runner, CHANNEL_AND_QUEUE_CAPACITY}; proptest! { /// Test insert to the queue and remove from it. @@ -34,6 +34,54 @@ proptest! { prop_assert_eq!(runner.queue.transactions().len(), 0); } + /// Test queue never grows above limit. + #[test] + fn queue_size_limit(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY + 1]>()) { + // create a queue + let mut runner = Queue::start(); + + // insert all transactions we have + transactions.iter().for_each(|t| runner.queue.insert(t.clone())); + + // transaction queue is never above limit + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len()); + } + + /// Test queue order. + #[test] + fn queue_order(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY * 2]>()) { + // create a queue + let mut runner = Queue::start(); + // fill the queue and check insertion order + for i in 0..CHANNEL_AND_QUEUE_CAPACITY { + let transaction = transactions[i].clone(); + runner.queue.insert(transaction.clone()); + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(i + 1, queue_transactions.len()); + prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction); + } + + // queue is full + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len()); + + // keep adding transaction, new transactions will always be on top of the queue + for transaction in transactions.iter().skip(CHANNEL_AND_QUEUE_CAPACITY) { + runner.queue.insert(transaction.clone()); + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len()); + prop_assert_eq!(UnminedTx::from(queue_transactions.last().unwrap().1.0.clone()), transaction.clone()); + } + + // check the order of the final queue + let queue_transactions = runner.queue.transactions(); + for i in 0..CHANNEL_AND_QUEUE_CAPACITY { + let transaction = transactions[CHANNEL_AND_QUEUE_CAPACITY + i].clone(); + prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction); + } + } + /// Test transactions are removed from the queue after time elapses. #[test] fn remove_expired_transactions_from_queue(transaction in any::()) { From d85a6b662f84e531e654925e04efd856010d3852 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Fri, 1 Apr 2022 21:26:41 -0300 Subject: [PATCH 06/26] fix multiple transactions channel --- zebra-rpc/src/methods/tests/prop.rs | 92 +++++++++++++++++++++++++++++ zebra-rpc/src/queue.rs | 41 +++++++------ 2 files changed, 114 insertions(+), 19 deletions(-) diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 6e53bd570f9..8544add0adf 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -597,6 +597,98 @@ proptest! { Ok::<_, TestCaseError>(()) })?; } + + /// Test we receive all transactions that are sent in a channel + #[test] + fn rpc_queue_receives_all_transactions_from_channel(txs in any::<[Transaction; 2]>()) + { + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + runtime.block_on(async move { + tokio::time::pause(); + + let mut mempool = MockService::build().for_prop_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + + let rpc = RpcImpl::new( + "RPC test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + let mut transactions_hash_set = HashSet::new(); + for tx in txs.clone() { + // send a transaction + let tx_bytes = tx + .zcash_serialize_to_vec() + .expect("Transaction serializes successfully"); + let tx_hex = hex::encode(&tx_bytes); + let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); + + let tx_unmined = UnminedTx::from(tx.clone()); + let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); + + // inser to hs we will use later + transactions_hash_set.insert(tx_unmined.id); + + // fail the mempool insertion + mempool + .clone() + .expect_request(expected_request) + .await + .unwrap() + .respond(Err(DummyError)); + + let _ = send_task + .await + .expect("Sending raw transactions should not panic"); + } + + // advance enough time to have a new runner iteration + let spacing = chrono::Duration::seconds(150); + tokio::time::advance(spacing.to_std().unwrap()).await; + + // the runner will made a new call to TransactionsById quering with both transactions + let expected_request = mempool::Request::TransactionsById(transactions_hash_set); + let response = mempool::Response::Transactions(vec![]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + // the runner will also query the state again for each transaction + for _tx in txs.clone() { + let response = zebra_state::ReadResponse::Transaction(None); + + // we use `expect_request_that` because we can't guarantee the state request order + state + .expect_request_that(|request| matches!(request, zebra_state::ReadRequest::Transaction(_))) + .await? + .respond(response); + } + + // each transaction will be retried + for tx in txs.clone() { + let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]); + let response = mempool::Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + } + + // no more requets are done + mempool.expect_no_requests().await?; + state.expect_no_requests().await?; + + Ok::<_, TestCaseError>(()) + })?; + } } #[derive(Clone, Copy, Debug, Error)] diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index bb424817b03..d02f70a58fb 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -143,15 +143,15 @@ impl Runner { let mut receiver = self.sender.subscribe(); loop { - // check the channel for new transactions - if let Ok(Some(tx)) = &receiver.recv().await { - let _ = &self.queue.insert(tx.clone()); - } - // sleep until the next block tokio::time::sleep(spacing.to_std().unwrap()).await; - // first, remove what is expired + // get transactions from the channel + while let Ok(Some(tx)) = receiver.try_recv() { + let _ = &self.queue.insert(tx.clone()); + } + + // remove what is expired self.remove_expired(spacing); // remove if any of the queued transactions is now in the mempool @@ -203,21 +203,24 @@ impl Runner { Mempool: Service + Clone + 'static, { let mut response = HashSet::new(); - let request = Request::TransactionsById(transactions); - - // TODO: ignore errors - let mempool_response = mempool - .oneshot(request) - .await - .expect("Requesting transactions should not panic"); - - match mempool_response { - Response::Transactions(txs) => { - for tx in txs { - response.insert(tx.id); + + if !transactions.is_empty() { + let request = Request::TransactionsById(transactions); + + // TODO: ignore errors + let mempool_response = mempool + .oneshot(request) + .await + .expect("Requesting transactions should not panic"); + + match mempool_response { + Response::Transactions(txs) => { + for tx in txs { + response.insert(tx.id); + } } + _ => unreachable!("TransactionsById always respond with at least an empty vector"), } - _ => unreachable!("TransactionsById always respond with at least an empty vector"), } response From 414d5cea26c58c81509061b48ec7ba99d5e6ec37 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sat, 2 Apr 2022 08:49:12 -0300 Subject: [PATCH 07/26] Use a network argument --- zebra-rpc/src/methods.rs | 2 +- zebra-rpc/src/queue.rs | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 9a5ba611478..c7514dd64a4 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -207,7 +207,7 @@ where // run the process queue tokio::spawn(async move { - runner.run(mempool, state, latest_chain_tip).await; + runner.run(mempool, state, latest_chain_tip, network).await; }); rpc_impl diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index d02f70a58fb..f859824763d 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -20,7 +20,7 @@ use tower::{Service, ServiceExt}; use zebra_chain::{ chain_tip::ChainTip, - parameters::NetworkUpgrade, + parameters::{Network, NetworkUpgrade}, transaction::{Transaction, UnminedTx, UnminedTxId}, }; use zebra_node_services::{ @@ -121,8 +121,13 @@ impl Runner { } /// Retry sending to memempool if needed. - pub async fn run(mut self, mempool: Mempool, state: State, _tip: Tip) - where + pub async fn run( + mut self, + mempool: Mempool, + state: State, + _tip: Tip, + network: Network, + ) where Mempool: Service + Clone + 'static, State: Service + Clone @@ -131,9 +136,6 @@ impl Runner { + 'static, Tip: ChainTip + Clone + Send + Sync + 'static, { - // TODO: this should be an argument - let network = zebra_chain::parameters::Network::Mainnet; - // TODO: Use tip.best_tip_height() let tip_height = zebra_chain::block::Height(1); From d17329fed089aff2a01ccb458ff3bb2bdba4f8a0 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sat, 2 Apr 2022 09:18:12 -0300 Subject: [PATCH 08/26] Use chain tip to calculate block spacing --- zebra-rpc/src/queue.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index f859824763d..0ca60584381 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -19,6 +19,7 @@ use tokio::{ use tower::{Service, ServiceExt}; use zebra_chain::{ + block::Height, chain_tip::ChainTip, parameters::{Network, NetworkUpgrade}, transaction::{Transaction, UnminedTx, UnminedTxId}, @@ -125,7 +126,7 @@ impl Runner { mut self, mempool: Mempool, state: State, - _tip: Tip, + tip: Tip, network: Network, ) where Mempool: Service + Clone + 'static, @@ -136,8 +137,11 @@ impl Runner { + 'static, Tip: ChainTip + Clone + Send + Sync + 'static, { - // TODO: Use tip.best_tip_height() - let tip_height = zebra_chain::block::Height(1); + // If we don't have a chain use height 1 to get block spacing. + let tip_height = match tip.best_tip_height() { + Some(height) => height, + _ => Height(1), + }; // get spacing between blocks let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height); From 2baff36f9dd0d52c96893e7deed05f796f6df744 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sat, 2 Apr 2022 11:11:53 -0300 Subject: [PATCH 09/26] Add extra time --- zebra-rpc/src/queue.rs | 14 +++++++++++--- zebra-rpc/src/queue/tests/prop.rs | 22 +++++++++++++++------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 0ca60584381..afea7180ae2 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -176,16 +176,24 @@ impl Runner { /// Remove transactions that are expired according to number of blocks and current spacing between blocks. fn remove_expired(&mut self, spacing: Duration) { + // To make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE` times, + // as the main loop also takes some time to run. + let extra_time = Duration::seconds(5); + let duration_to_expire = - Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()); + Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time; let transactions = self.queue.transactions(); let now = Instant::now(); for tx in transactions.iter() { let tx_time = tx.1 .1 - .checked_add(duration_to_expire.to_std().unwrap()) - .unwrap(); + .checked_add( + duration_to_expire + .to_std() + .expect("should never be less than zero"), + ) + .expect("this is low numbers, should always be inside bounds"); if now > tx_time { self.queue.remove(*tx.0); diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 3a1b5889053..6ccce45ab4a 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -1,9 +1,11 @@ //! Randomised property tests for the RPC Queue. -use std::{collections::HashSet, ops::Mul}; +use std::collections::HashSet; use proptest::prelude::*; +use chrono::Duration; +use tokio::time; use tower::ServiceExt; use zebra_chain::transaction::{Transaction, UnminedTx, UnminedTxId}; @@ -89,7 +91,7 @@ proptest! { runtime.block_on(async move { // pause the clock - tokio::time::pause(); + time::pause(); // create a queue let mut runner = Queue::start(); @@ -99,24 +101,30 @@ proptest! { prop_assert_eq!(runner.queue.transactions().len(), 1); // have a block interval value - let spacing = chrono::Duration::seconds(150); + let spacing = Duration::seconds(150); // apply expiration inmediatly, transaction will not be removed from queue runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 1); // apply expiration after 1 block elapsed, transaction will not be removed from queue - tokio::time::advance(spacing.to_std().unwrap()).await; + time::advance(spacing.to_std().unwrap()).await; runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 1); // apply expiration after 2 blocks elapsed, transaction will not be removed from queue - tokio::time::advance(spacing.mul(2).to_std().unwrap()).await; + time::advance(spacing.to_std().unwrap()).await; runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 1); - // apply expiration after 3 block elapsed, transaction will be removed from queue - tokio::time::advance(spacing.mul(3).to_std().unwrap()).await; + // apply expiration after 3 block elapsed, transaction will not be removed from queue + // as it needs the extra time of 5 seconds + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply 5 more seconcs, transaction will be removed from the queue + time::advance(chrono::Duration::seconds(6).to_std().unwrap()).await; runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 0); From 1bb7279adbf07761c8cd2d278676f42594e0a1a0 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sat, 2 Apr 2022 13:30:06 -0300 Subject: [PATCH 10/26] Finalize the state check test --- zebra-rpc/src/queue.rs | 2 +- zebra-rpc/src/queue/tests/prop.rs | 89 +++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index afea7180ae2..2f4444fffd2 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -110,7 +110,7 @@ impl Runner { } /// Get the queue transactions as a `HashSet` of unmined ids. - fn transactions_as_hash_set(&self) -> HashSet { + pub(crate) fn transactions_as_hash_set(&self) -> HashSet { let transactions = self.queue.transactions(); transactions.iter().map(|t| *t.0).collect() } diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 6ccce45ab4a..fc236b23f33 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -1,6 +1,6 @@ //! Randomised property tests for the RPC Queue. -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; use proptest::prelude::*; @@ -8,7 +8,11 @@ use chrono::Duration; use tokio::time; use tower::ServiceExt; -use zebra_chain::transaction::{Transaction, UnminedTx, UnminedTxId}; +use zebra_chain::{ + block::{Block, Height}, + serialization::ZcashDeserializeInto, + transaction::{Transaction, UnminedTx}, +}; use zebra_node_services::mempool::{Gossip, Request, Response}; use zebra_state::{BoxError, ReadRequest, ReadResponse}; use zebra_test::mock_service::MockService; @@ -100,7 +104,7 @@ proptest! { runner.queue.insert(transaction); prop_assert_eq!(runner.queue.transactions().len(), 1); - // have a block interval value + // have a block interval value equal to the one at Height(1) let spacing = Duration::seconds(150); // apply expiration inmediatly, transaction will not be removed from queue @@ -123,7 +127,7 @@ proptest! { runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 1); - // apply 5 more seconcs, transaction will be removed from the queue + // apply 6 seconds more, transaction will be removed from the queue time::advance(chrono::Duration::seconds(6).to_std().unwrap()).await; runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 0); @@ -146,16 +150,16 @@ proptest! { // insert a transaction to the queue let unmined_transaction = UnminedTx::from(transaction); runner.queue.insert(unmined_transaction.clone()); - let transactions = runner.queue.transactions(); prop_assert_eq!(transactions.len(), 1); - // convert to hashset - let transactions_hash_set: HashSet = transactions.iter().map(|t| *t.0).collect(); + // get a `HashSet` of transactions to call mempool with + let transactions_hash_set = runner.transactions_as_hash_set(); // run the mempool checker let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone())); + // mempool checker will call the mempool looking for the transaction let expected_request = Request::TransactionsById(transactions_hash_set.clone()); let response = Response::Transactions(vec![]); @@ -163,18 +167,15 @@ proptest! { .expect_request(expected_request) .await? .respond(response); - let result = send_task.await.expect("Requesting transactions should not panic"); // empty results, transaction is not in the mempool prop_assert_eq!(result, HashSet::new()); - // now lets insert it to the mempool + // insert transaction to the mempool let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); - let send_task = tokio::spawn(mempool.clone().oneshot(request)); - let response = Response::Queued(vec![Ok(())]); mempool @@ -187,6 +188,7 @@ proptest! { // check the mempool again let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone())); + // mempool checker will call the mempool looking for the transaction let expected_request = Request::TransactionsById(transactions_hash_set); let response = Response::Transactions(vec![unmined_transaction]); @@ -197,14 +199,17 @@ proptest! { let result = send_task.await.expect("Requesting transactions should not panic"); + // transaction is in the mempool prop_assert_eq!(result.len(), 1); - // not deleted yet + + // but it is not deleted from the queue yet prop_assert_eq!(runner.queue.transactions().len(), 1); - // delete + + // delete by calling remove_committed runner.remove_committed(result); prop_assert_eq!(runner.queue.transactions().len(), 0); - // no more + // no more requets expected mempool.expect_no_requests().await?; Ok::<_, TestCaseError>(()) @@ -216,11 +221,9 @@ proptest! { fn queue_runner_state(transaction in any::()) { let runtime = zebra_test::init_async(); - let transaction_hash = transaction.hash(); - runtime.block_on(async move { - - let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + let mut write_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); // create a queue let mut runner = Queue::start(); @@ -230,27 +233,59 @@ proptest! { runner.queue.insert(unmined_transaction.clone()); prop_assert_eq!(runner.queue.transactions().len(), 1); - // run the runner - let mut hs = HashSet::new(); - hs.insert(unmined_transaction.id); + // get a `HashSet` of transactions to call state with + let transactions_hash_set = runner.transactions_as_hash_set(); - let send_task = tokio::spawn(Runner::check_state(state.clone(), hs)); + let send_task = tokio::spawn(Runner::check_state(read_state.clone(), transactions_hash_set.clone())); - let expected_request = ReadRequest::Transaction(transaction_hash); + let expected_request = ReadRequest::Transaction(transaction.hash()); let response = ReadResponse::Transaction(None); - state + read_state .expect_request(expected_request) .await? .respond(response); let result = send_task.await.expect("Requesting transaction should not panic"); - + // transaction is not in the state prop_assert_eq!(HashSet::new(), result); - // TODO: finish this test + // get a block and push our transaction to it + let block = + zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into::>()?; + let mut block = Arc::try_unwrap(block).expect("block should unwrap"); + block.transactions.push(Arc::new(transaction.clone())); + + // commit the created block + let request = zebra_state::Request::CommitFinalizedBlock(zebra_state::FinalizedBlock::from(Arc::new(block.clone()))); + let send_task = tokio::spawn(write_state.clone().oneshot(request.clone())); + let response = zebra_state::Response::Committed(block.hash()); + + write_state + .expect_request(request) + .await? + .respond(response); + + let _ = send_task.await.expect("Inserting block to state should not panic"); + + // check the state again + let send_task = tokio::spawn(Runner::check_state(read_state.clone(), transactions_hash_set)); + + let expected_request = ReadRequest::Transaction(transaction.hash()); + let response = ReadResponse::Transaction(Some((Arc::new(transaction), Height(1)))); + + read_state + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transaction should not panic"); + + // transaction was found in the state + prop_assert_eq!(result.len(), 1); - state.expect_no_requests().await?; + read_state.expect_no_requests().await?; + write_state.expect_no_requests().await?; Ok::<_, TestCaseError>(()) })?; From 473fd02b01345b8e2aa47d7378acdbd48a70bcde Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sat, 2 Apr 2022 18:13:31 -0300 Subject: [PATCH 11/26] Add a retry test --- zebra-rpc/src/queue.rs | 35 ++++++++++++++++++-------- zebra-rpc/src/queue/tests/prop.rs | 42 ++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 2f4444fffd2..a3e2942a83b 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -110,7 +110,7 @@ impl Runner { } /// Get the queue transactions as a `HashSet` of unmined ids. - pub(crate) fn transactions_as_hash_set(&self) -> HashSet { + fn transactions_as_hash_set(&self) -> HashSet { let transactions = self.queue.transactions(); transactions.iter().map(|t| *t.0).collect() } @@ -150,7 +150,7 @@ impl Runner { loop { // sleep until the next block - tokio::time::sleep(spacing.to_std().unwrap()).await; + tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await; // get transactions from the channel while let Ok(Some(tx)) = receiver.try_recv() { @@ -209,6 +209,8 @@ impl Runner { } /// Check the mempool for given transactions. + /// + /// Returns transactions that are in the mempool. async fn check_mempool( mempool: Mempool, transactions: HashSet, @@ -221,7 +223,6 @@ impl Runner { if !transactions.is_empty() { let request = Request::TransactionsById(transactions); - // TODO: ignore errors let mempool_response = mempool .oneshot(request) .await @@ -241,6 +242,8 @@ impl Runner { } /// Check the state for given transactions. + /// + /// Returns transactions that are in the state. async fn check_state( state: State, transactions: HashSet, @@ -276,19 +279,29 @@ impl Runner { } /// Retry sending given transactions to mempool. - async fn retry(mempool: Mempool, transactions: Vec>) + /// + /// Returns the transaction ids what were retried. + async fn retry( + mempool: Mempool, + transactions: Vec>, + ) -> HashSet where Mempool: Service + Clone + 'static, { + let mut retried = HashSet::new(); + for tx in transactions { - let transaction_parameter = Gossip::Tx(UnminedTx::from(tx.clone())); - let request = Request::Queue(vec![transaction_parameter]); + let unmined = UnminedTx::from(tx); + let gossip = Gossip::Tx(unmined.clone()); + let request = Request::Queue(vec![gossip]); - let _ = mempool - .clone() - .oneshot(request) - .await - .expect("Sending to memmpool should not panic"); + // Send to memmpool and ignore any error + let _ = mempool.clone().oneshot(request).await; + + // retrurn what we retried but don't delete from the queue, + // we might retry again in a next call. + retried.insert(unmined.id); } + retried } } diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index fc236b23f33..218042409c4 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -291,5 +291,45 @@ proptest! { })?; } - // TODO: add a Runner::retry test + // Test any given transaction can be mempool retried. + #[test] + fn queue_mempool_retry(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + let mut mempool = MockService::build().for_prop_tests(); + + // create a queue + let mut runner = Queue::start(); + + // insert a transaction to the queue + let unmined_transaction = UnminedTx::from(transaction.clone()); + runner.queue.insert(unmined_transaction.clone()); + let transactions = runner.queue.transactions(); + prop_assert_eq!(transactions.len(), 1); + + // get a `Vec` of transactions to do retries + let transactions_vec = runner.transactions_as_vec(); + + // run retry + let send_task = tokio::spawn(Runner::retry(mempool.clone(), transactions_vec.clone())); + + // retry will queue the transaction to mempool + let gossip = Gossip::Tx(UnminedTx::from(transaction.clone())); + let expected_request = Request::Queue(vec![gossip]); + let response = Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transactions should not panic"); + + // retry was done + prop_assert_eq!(result.len(), 1); + + Ok::<_, TestCaseError>(()) + })?; + } } From 1d2879dfab8666ac796ea959c0e3966443dfcd44 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sat, 2 Apr 2022 18:32:07 -0300 Subject: [PATCH 12/26] Fix description --- zebra-rpc/src/queue.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index a3e2942a83b..27a4151babc 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -4,7 +4,8 @@ //! Transactions can fail to be inserted to the mempool inmediatly by different reasons, //! like having not mined utxos. //! -//! The [`Queue`] is just a `HashMap` of transactions with insertion date. +//! The [`Queue`] is just an `IndexMap` of transactions with insertion date. +//! We use this data type because we want the transactions in the queue to be sorted. //! The [`Runner`] component will do the processing in it's [`Runner::run()`] method. use std::{collections::HashSet, sync::Arc}; @@ -176,8 +177,8 @@ impl Runner { /// Remove transactions that are expired according to number of blocks and current spacing between blocks. fn remove_expired(&mut self, spacing: Duration) { - // To make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE` times, - // as the main loop also takes some time to run. + // Have some extra time to to make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE` + // times, as the main loop also takes some time to run. let extra_time = Duration::seconds(5); let duration_to_expire = From a88561dd1512733a73d20cde44f4e89718357e65 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sun, 3 Apr 2022 09:43:36 -0300 Subject: [PATCH 13/26] fix some docs --- zebra-rpc/src/queue.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 27a4151babc..57751fb9efe 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -5,7 +5,7 @@ //! like having not mined utxos. //! //! The [`Queue`] is just an `IndexMap` of transactions with insertion date. -//! We use this data type because we want the transactions in the queue to be sorted. +//! We use this data type because we want the transactions in the queue to be in order. //! The [`Runner`] component will do the processing in it's [`Runner::run()`] method. use std::{collections::HashSet, sync::Arc}; @@ -38,18 +38,19 @@ mod tests; /// The number of blocks a transaction can be in the queue. const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; -/// Size of the queue and channel. Suggested valus is equal to +/// Size of the queue and channel. Suggested value is equal to /// `mempool::downloads::MAX_INBOUND_CONCURRENCY` const CHANNEL_AND_QUEUE_CAPACITY: usize = 10; #[derive(Clone, Debug)] -/// The queue itself +/// The queue is a container of transactions that are going to be +/// sent to the mempool again. pub struct Queue { transactions: IndexMap, Instant)>, } #[derive(Debug)] -/// The runner +/// The runner will make the processing of the transactions in the queue. pub struct Runner { queue: Queue, sender: Sender>, @@ -281,7 +282,7 @@ impl Runner { /// Retry sending given transactions to mempool. /// - /// Returns the transaction ids what were retried. + /// Returns the transaction ids that were retried. async fn retry( mempool: Mempool, transactions: Vec>, From e83fe4fe9cc16de6e305a5d65b51e780fef16e1e Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Sun, 3 Apr 2022 10:37:30 -0300 Subject: [PATCH 14/26] add additional empty check to `Runner::run` --- zebra-rpc/src/queue.rs | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 57751fb9efe..8ec86825e4f 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -124,6 +124,17 @@ impl Runner { } /// Retry sending to memempool if needed. + /// + /// Creates a loop that will run each time a new block is mined. + /// In this loop, get the transactions that are in the queue and: + /// - Check if they are now in the mempool and if so, delete the transaction from the queue. + /// - Check if the transaction is now part of a block in the state and if so, + /// delete the transaction from the queue. + /// - With the transactions left in the queue, retry sending them to the mempool ignoring + /// the result of this operation. + /// + /// Addtionally, each iteration of the above loop, will receive and insert to the queue + /// transactions that are pending in the channel. pub async fn run( mut self, mempool: Mempool, @@ -159,20 +170,23 @@ impl Runner { let _ = &self.queue.insert(tx.clone()); } - // remove what is expired - self.remove_expired(spacing); + if !self.queue.transactions().is_empty() { + // remove what is expired + self.remove_expired(spacing); - // remove if any of the queued transactions is now in the mempool - let in_mempool = - Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; - self.remove_committed(in_mempool); + // remove if any of the queued transactions is now in the mempool + let in_mempool = + Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_mempool); - // remove if any of the queued transactions is now in the state - let in_state = Self::check_state(state.clone(), self.transactions_as_hash_set()).await; - self.remove_committed(in_state); + // remove if any of the queued transactions is now in the state + let in_state = + Self::check_state(state.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_state); - // retry what is left in the queue - let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; + // retry what is left in the queue + let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; + } } } From 9dc2531b00230c1004f061c0e4af5df451726f51 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 08:51:55 -0300 Subject: [PATCH 15/26] remove non used method --- zebra-rpc/src/queue.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 8ec86825e4f..49d3deabd2b 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -106,11 +106,6 @@ impl Runner { self.sender.subscribe() } - /// Access the queue. - pub fn queue(&self) -> Queue { - self.queue.clone() - } - /// Get the queue transactions as a `HashSet` of unmined ids. fn transactions_as_hash_set(&self) -> HashSet { let transactions = self.queue.transactions(); From f82528104bd9b0c8573f196958f61cdd43a6652b Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 09:29:40 -0300 Subject: [PATCH 16/26] ignore some errors --- zebra-rpc/src/methods.rs | 6 ++---- zebra-rpc/src/queue.rs | 32 +++++++++----------------------- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index c7514dd64a4..561f66152d2 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -351,11 +351,9 @@ where let transaction_hash = raw_transaction.hash(); - // send transaction to the rpc queue + // send transaction to the rpc queue, ignore any error. let unmined_transaction = UnminedTx::from(raw_transaction.clone()); - queue_sender - .send(Some(unmined_transaction)) - .expect("there is always at least one active receiver"); + let _ = queue_sender.send(Some(unmined_transaction)); let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); let request = mempool::Request::Queue(vec![transaction_parameter]); diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 49d3deabd2b..a211647de67 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -234,18 +234,12 @@ impl Runner { if !transactions.is_empty() { let request = Request::TransactionsById(transactions); - let mempool_response = mempool - .oneshot(request) - .await - .expect("Requesting transactions should not panic"); - - match mempool_response { - Response::Transactions(txs) => { - for tx in txs { - response.insert(tx.id); - } + // ignore any error coming from the mempool + let mempool_response = mempool.oneshot(request).await; + if let Ok(Response::Transactions(txs)) = mempool_response { + for tx in txs { + response.insert(tx.id); } - _ => unreachable!("TransactionsById always respond with at least an empty vector"), } } @@ -271,18 +265,10 @@ impl Runner { for t in transactions { let request = ReadRequest::Transaction(t.mined_id()); - let state_response = state - .clone() - .oneshot(request) - .await - .expect("Requesting transactions should not panic"); - - match state_response { - ReadResponse::Transaction(Some(tx)) => { - response.insert(tx.0.unmined_id()); - } - ReadResponse::Transaction(None) => {} - _ => unreachable!("ReadResponse::Transaction is always some or none"), + // ignore any error coming from the state + let state_response = state.clone().oneshot(request).await; + if let Ok(ReadResponse::Transaction(Some(tx))) = state_response { + response.insert(tx.0.unmined_id()); } } From b2923c388a508a78ec7c5320d5010a9117817ffc Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 09:42:34 -0300 Subject: [PATCH 17/26] fix some docs --- zebra-rpc/src/queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index a211647de67..a85eba64658 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -35,7 +35,7 @@ use zebra_state::{ReadRequest, ReadResponse}; #[cfg(test)] mod tests; -/// The number of blocks a transaction can be in the queue. +/// The approximate target number of blocks a transaction can be in the queue. const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; /// Size of the queue and channel. Suggested value is equal to @@ -96,7 +96,7 @@ impl Queue { } impl Runner { - /// Access the sender field of the runner. + /// Create a new sender for this runner. pub fn sender(&self) -> Sender> { self.sender.clone() } From 12ce44932988ec5d4236c000ede04a3b7899f316 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 12:06:11 -0300 Subject: [PATCH 18/26] add a panic checker to the queue --- zebra-rpc/src/methods.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 561f66152d2..05773542393 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -206,10 +206,17 @@ where }; // run the process queue - tokio::spawn(async move { + let queue_task_handler = tokio::spawn(async move { runner.run(mempool, state, latest_chain_tip, network).await; }); + // queue panic checker + tokio::spawn(async move { + if queue_task_handler.await.is_err() { + panic!("Unexpected panic in the RPC queue"); + } + }); + rpc_impl } } From 05c8797a2468f5d25d946c60d2415cb559bdb895 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 12:27:02 -0300 Subject: [PATCH 19/26] add missing file changes for panic checker --- zebra-rpc/src/methods.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 05773542393..e4477cc04eb 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -206,14 +206,17 @@ where }; // run the process queue - let queue_task_handler = tokio::spawn(async move { + let mut queue_task_handler = tokio::spawn(async move { runner.run(mempool, state, latest_chain_tip, network).await; }); // queue panic checker tokio::spawn(async move { - if queue_task_handler.await.is_err() { - panic!("Unexpected panic in the RPC queue"); + loop { + let queue_task_handler = &mut queue_task_handler; + if queue_task_handler.await.is_err() { + panic!("Unexpected panic in the RPC queue"); + } } }); From e01859011a097debf995bf3c60cb98b68b973852 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 15:26:32 -0300 Subject: [PATCH 20/26] skip checks and retries if height has not changed --- zebra-rpc/src/queue.rs | 65 +++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index a85eba64658..727457bf39e 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -42,6 +42,9 @@ const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; /// `mempool::downloads::MAX_INBOUND_CONCURRENCY` const CHANNEL_AND_QUEUE_CAPACITY: usize = 10; +/// The height to use in spacing calculation if we don't have a chain tip. +const NO_CHAIN_TIP_HEIGHT: Height = Height(1); + #[derive(Clone, Debug)] /// The queue is a container of transactions that are going to be /// sent to the mempool again. @@ -54,6 +57,7 @@ pub struct Queue { pub struct Runner { queue: Queue, sender: Sender>, + tip_height: Height, } impl Queue { @@ -65,7 +69,11 @@ impl Queue { transactions: IndexMap::new(), }; - Runner { queue, sender } + Runner { + queue, + sender, + tip_height: Height(0), + } } /// Get the transactions in the queue. @@ -118,6 +126,11 @@ impl Runner { transactions.iter().map(|t| t.1 .0.clone()).collect() } + /// Update the `tip_height` field with a new height. + pub fn update_tip_height(&mut self, height: Height) { + self.tip_height = height; + } + /// Retry sending to memempool if needed. /// /// Creates a loop that will run each time a new block is mined. @@ -145,18 +158,18 @@ impl Runner { + 'static, Tip: ChainTip + Clone + Send + Sync + 'static, { - // If we don't have a chain use height 1 to get block spacing. - let tip_height = match tip.best_tip_height() { - Some(height) => height, - _ => Height(1), - }; - - // get spacing between blocks - let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height); - let mut receiver = self.sender.subscribe(); loop { + // if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing + let tip_height = match tip.best_tip_height() { + Some(height) => height, + _ => NO_CHAIN_TIP_HEIGHT, + }; + + // get spacing between blocks + let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height); + // sleep until the next block tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await; @@ -165,22 +178,28 @@ impl Runner { let _ = &self.queue.insert(tx.clone()); } - if !self.queue.transactions().is_empty() { - // remove what is expired - self.remove_expired(spacing); + // skip some work if stored tip height is the same as the one arriving + if tip_height > self.tip_height { + // update the chain tip + self.update_tip_height(tip_height); - // remove if any of the queued transactions is now in the mempool - let in_mempool = - Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; - self.remove_committed(in_mempool); + if !self.queue.transactions().is_empty() { + // remove what is expired + self.remove_expired(spacing); - // remove if any of the queued transactions is now in the state - let in_state = - Self::check_state(state.clone(), self.transactions_as_hash_set()).await; - self.remove_committed(in_state); + // remove if any of the queued transactions is now in the mempool + let in_mempool = + Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_mempool); - // retry what is left in the queue - let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; + // remove if any of the queued transactions is now in the state + let in_state = + Self::check_state(state.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_state); + + // retry what is left in the queue + let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; + } } } } From 4afcfba4c28cfb83407ede3c19779fe52a49700a Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 16:53:26 -0300 Subject: [PATCH 21/26] change constants --- zebra-rpc/src/queue.rs | 4 ++-- zebra-rpc/src/queue/tests/prop.rs | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 727457bf39e..85bd336267a 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -36,11 +36,11 @@ use zebra_state::{ReadRequest, ReadResponse}; mod tests; /// The approximate target number of blocks a transaction can be in the queue. -const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 3; +const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5; /// Size of the queue and channel. Suggested value is equal to /// `mempool::downloads::MAX_INBOUND_CONCURRENCY` -const CHANNEL_AND_QUEUE_CAPACITY: usize = 10; +const CHANNEL_AND_QUEUE_CAPACITY: usize = 20; /// The height to use in spacing calculation if we don't have a chain tip. const NO_CHAIN_TIP_HEIGHT: Height = Height(1); diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 218042409c4..89a16bb0e92 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -56,7 +56,7 @@ proptest! { /// Test queue order. #[test] - fn queue_order(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY * 2]>()) { + fn queue_order(transactions in any::<[UnminedTx; 32]>()) { // create a queue let mut runner = Queue::start(); // fill the queue and check insertion order @@ -83,7 +83,7 @@ proptest! { // check the order of the final queue let queue_transactions = runner.queue.transactions(); for i in 0..CHANNEL_AND_QUEUE_CAPACITY { - let transaction = transactions[CHANNEL_AND_QUEUE_CAPACITY + i].clone(); + let transaction = transactions[(CHANNEL_AND_QUEUE_CAPACITY - 8) + i].clone(); prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction); } } @@ -121,7 +121,17 @@ proptest! { runner.remove_expired(spacing); prop_assert_eq!(runner.queue.transactions().len(), 1); - // apply expiration after 3 block elapsed, transaction will not be removed from queue + // apply expiration after 3 blocks elapsed, transaction will not be removed from queue + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 4 blocks elapsed, transaction will not be removed from queue + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 5 block elapsed, transaction will not be removed from queue // as it needs the extra time of 5 seconds time::advance(spacing.to_std().unwrap()).await; runner.remove_expired(spacing); From bba48a2bfb75983874c5262ce372f93375a89845 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 17:04:52 -0300 Subject: [PATCH 22/26] reduce the number of queue test cases --- zebra-rpc/src/queue/tests/prop.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 89a16bb0e92..e4851d75627 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -1,6 +1,6 @@ //! Randomised property tests for the RPC Queue. -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, env, sync::Arc}; use proptest::prelude::*; @@ -19,7 +19,17 @@ use zebra_test::mock_service::MockService; use crate::queue::{Queue, Runner, CHANNEL_AND_QUEUE_CAPACITY}; +/// The default number of proptest cases for these tests. +const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 2; + proptest! { + #![proptest_config( + proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_BLOCK_VEC_PROPTEST_CASES)) + )] + /// Test insert to the queue and remove from it. #[test] fn insert_remove_to_from_queue(transaction in any::()) { From bfd4a04ae0f31eca5b3b09f09a1f61b52278607d Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 17:08:52 -0300 Subject: [PATCH 23/26] remove suggestion --- zebra-rpc/src/queue.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index 85bd336267a..cf6e4bd3aac 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -38,8 +38,7 @@ mod tests; /// The approximate target number of blocks a transaction can be in the queue. const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5; -/// Size of the queue and channel. Suggested value is equal to -/// `mempool::downloads::MAX_INBOUND_CONCURRENCY` +/// Size of the queue and channel. const CHANNEL_AND_QUEUE_CAPACITY: usize = 20; /// The height to use in spacing calculation if we don't have a chain tip. From 4e3229bf3f1f972fa7fbc90170fe639ad0ba65d8 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 5 Apr 2022 18:18:54 -0300 Subject: [PATCH 24/26] change best tip check --- zebra-rpc/src/queue.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index cf6e4bd3aac..6b960360fe7 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -178,7 +178,8 @@ impl Runner { } // skip some work if stored tip height is the same as the one arriving - if tip_height > self.tip_height { + // TODO: check tip block hashes instead, so we always retry when there is a chain fork (these are rare) + if tip_height != self.tip_height { // update the chain tip self.update_tip_height(tip_height); From ecfb7edb05922ad2d77a13bc32a3d55f16b7377b Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 6 Apr 2022 23:19:13 +1000 Subject: [PATCH 25/26] fix(rpc): Check for panics in the transaction queue (#4046) * Check for panics in the RPC transaction queue * Add missing pin! and abort in the start task * Check for transaction queue panics in tests --- zebra-rpc/src/methods.rs | 25 +++------ zebra-rpc/src/methods/tests/prop.rs | 77 ++++++++++++++++++++++---- zebra-rpc/src/methods/tests/vectors.rs | 30 ++++++++-- zebra-rpc/src/server.rs | 18 ++++-- zebrad/src/commands/start.rs | 17 +++++- 5 files changed, 126 insertions(+), 41 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index e4477cc04eb..6959fcfc057 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -14,8 +14,9 @@ use hex::{FromHex, ToHex}; use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; -use tokio::sync::broadcast::Sender; +use tokio::{sync::broadcast::Sender, task::JoinHandle}; use tower::{buffer::Buffer, Service, ServiceExt}; +use tracing::Instrument; use zebra_chain::{ block::{self, Height, SerializedBlock}, @@ -188,7 +189,7 @@ where state: State, latest_chain_tip: Tip, network: Network, - ) -> Self + ) -> (Self, JoinHandle<()>) where Version: ToString, >::Future: Send, @@ -206,21 +207,13 @@ where }; // run the process queue - let mut queue_task_handler = tokio::spawn(async move { - runner.run(mempool, state, latest_chain_tip, network).await; - }); - - // queue panic checker - tokio::spawn(async move { - loop { - let queue_task_handler = &mut queue_task_handler; - if queue_task_handler.await.is_err() { - panic!("Unexpected panic in the RPC queue"); - } - } - }); + let rpc_tx_queue_task_handle = tokio::spawn( + runner + .run(mempool, state, latest_chain_tip, network) + .in_current_span(), + ); - rpc_impl + (rpc_impl, rpc_tx_queue_task_handle) } } diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 8544add0adf..2288ff9abfb 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; +use futures::FutureExt; use hex::ToHex; use jsonrpc_core::{Error, ErrorCode}; use proptest::prelude::*; @@ -34,7 +35,7 @@ proptest! { runtime.block_on(async move { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -67,6 +68,10 @@ proptest! { prop_assert_eq!(result, Ok(hash)); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -82,7 +87,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -122,6 +127,10 @@ proptest! { "Result is not a server error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -135,7 +144,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -176,6 +185,10 @@ proptest! { "Result is not a server error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -196,7 +209,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -224,6 +237,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -246,7 +263,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -274,6 +291,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -295,7 +316,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -323,6 +344,10 @@ proptest! { prop_assert_eq!(result, Ok(expected_response)); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -343,7 +368,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -371,6 +396,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -393,7 +422,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -420,6 +449,11 @@ proptest! { ), "Result is not an invalid parameters error: {result:?}" ); + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -431,16 +465,23 @@ proptest! { let _guard = runtime.enter(); let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + // look for an error with a `NoChainTip` - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, network, ); + let response = rpc.get_blockchain_info(); prop_assert_eq!(&response.err().unwrap().message, "No Chain tip available yet"); + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + runtime.block_on(async move { mempool.expect_no_requests().await?; state.expect_no_requests().await?; @@ -471,7 +512,7 @@ proptest! { mock_chain_tip_sender.send_best_tip_block_time(block_time); // Start RPC with the mocked `ChainTip` - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -504,6 +545,10 @@ proptest! { }, }; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + // check no requests were made during this test runtime.block_on(async move { mempool.expect_no_requests().await?; @@ -528,7 +573,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -594,6 +639,10 @@ proptest! { mempool.expect_no_requests().await?; state.expect_no_requests().await?; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -611,7 +660,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -686,6 +735,10 @@ proptest! { mempool.expect_no_requests().await?; state.expect_no_requests().await?; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 8301fa5fd5f..2e5a49e17e7 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -25,7 +25,7 @@ async fn rpc_getinfo() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -45,6 +45,10 @@ async fn rpc_getinfo() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -63,7 +67,7 @@ async fn rpc_getblock() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -82,6 +86,10 @@ async fn rpc_getblock() { } mempool.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -92,7 +100,7 @@ async fn rpc_getblock_error() { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -108,6 +116,10 @@ async fn rpc_getblock_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -132,7 +144,7 @@ async fn rpc_getbestblockhash() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -150,6 +162,10 @@ async fn rpc_getbestblockhash() { assert_eq!(response_hash, tip_block_hash); mempool.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -168,7 +184,7 @@ async fn rpc_getrawtransaction() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -231,4 +247,8 @@ async fn rpc_getrawtransaction() { } } } + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 75ce5402d48..c759e32cb6d 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -9,6 +9,7 @@ use jsonrpc_core; use jsonrpc_http_server::ServerBuilder; +use tokio::task::JoinHandle; use tower::{buffer::Buffer, Service}; use tracing::*; use tracing_futures::Instrument; @@ -37,7 +38,7 @@ impl RpcServer { state: State, latest_chain_tip: Tip, network: Network, - ) -> tokio::task::JoinHandle<()> + ) -> (JoinHandle<()>, JoinHandle<()>) where Version: ToString, Mempool: tower::Service @@ -58,7 +59,8 @@ impl RpcServer { info!("Trying to open RPC endpoint at {}...", listen_addr,); // Initialize the rpc methods with the zebra version - let rpc_impl = RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); + let (rpc_impl, rpc_tx_queue_task_handle) = + RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); // Create handler compatible with V1 and V2 RPC protocols let mut io = @@ -87,10 +89,16 @@ impl RpcServer { }) }; - tokio::task::spawn_blocking(server) + ( + tokio::task::spawn_blocking(server), + rpc_tx_queue_task_handle, + ) } else { - // There is no RPC port, so the RPC task does nothing. - tokio::task::spawn(futures::future::pending().in_current_span()) + // There is no RPC port, so the RPC tasks do nothing. + ( + tokio::task::spawn(futures::future::pending().in_current_span()), + tokio::task::spawn(futures::future::pending().in_current_span()), + ) } } } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 01e3d093775..05b4d726fcb 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -160,7 +160,7 @@ impl StartCmd { .service(mempool); // Launch RPC server - let rpc_task_handle = RpcServer::spawn( + let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( config.rpc, app_version(), mempool.clone(), @@ -183,7 +183,7 @@ impl StartCmd { let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); - let mut block_gossip_task_handle = tokio::spawn( + let block_gossip_task_handle = tokio::spawn( sync::gossip_best_tip_block_hashes( sync_status.clone(), chain_tip_change.clone(), @@ -218,7 +218,9 @@ impl StartCmd { // ongoing tasks pin!(rpc_task_handle); + pin!(rpc_tx_queue_task_handle); pin!(syncer_task_handle); + pin!(block_gossip_task_handle); pin!(mempool_crawler_task_handle); pin!(mempool_queue_checker_task_handle); pin!(tx_gossip_task_handle); @@ -240,6 +242,13 @@ impl StartCmd { Ok(()) } + rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => { + rpc_tx_queue_result + .expect("unexpected panic in the rpc transaction queue task"); + info!("rpc transaction queue task exited"); + Ok(()) + } + sync_result = &mut syncer_task_handle => sync_result .expect("unexpected panic in the syncer task") .map(|_| info!("syncer task exited")), @@ -298,12 +307,14 @@ impl StartCmd { info!("exiting Zebra because an ongoing task exited: stopping other tasks"); // ongoing tasks + rpc_task_handle.abort(); + rpc_tx_queue_task_handle.abort(); syncer_task_handle.abort(); block_gossip_task_handle.abort(); mempool_crawler_task_handle.abort(); mempool_queue_checker_task_handle.abort(); tx_gossip_task_handle.abort(); - rpc_task_handle.abort(); + progress_task_handle.abort(); // startup tasks groth16_download_handle.abort(); From 2890b85b0f92c895a608f85a3dfc270584888210 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 7 Apr 2022 16:20:25 +1000 Subject: [PATCH 26/26] Fixup a new RPC test from the main branch --- zebra-rpc/src/methods/tests/vectors.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 0817ac28c99..99143f371c1 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -131,7 +131,7 @@ async fn rpc_getblock_missing_error() { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -169,6 +169,10 @@ async fn rpc_getblock_missing_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test]