diff --git a/zebra-rpc/src/lib.rs b/zebra-rpc/src/lib.rs index ce6072e2589..0bb4b072e2e 100644 --- a/zebra-rpc/src/lib.rs +++ b/zebra-rpc/src/lib.rs @@ -6,6 +6,7 @@ pub mod config; pub mod methods; +pub mod queue; pub mod server; #[cfg(test)] mod tests; diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index d6ba68e8796..80f36d8449c 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -14,18 +14,22 @@ use hex::{FromHex, ToHex}; use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; +use tokio::{sync::broadcast::Sender, task::JoinHandle}; use tower::{buffer::Buffer, Service, ServiceExt}; +use tracing::Instrument; use zebra_chain::{ block::{self, Height, SerializedBlock}, chain_tip::ChainTip, parameters::{ConsensusBranchId, Network, NetworkUpgrade}, serialization::{SerializationError, ZcashDeserialize}, - transaction::{self, SerializedTransaction, Transaction}, + transaction::{self, SerializedTransaction, Transaction, UnminedTx}, }; use zebra_network::constants::USER_AGENT; use zebra_node_services::{mempool, BoxError}; +use crate::queue::Queue; + #[cfg(test)] mod tests; @@ -168,17 +172,23 @@ where /// The configured network for this RPC service. #[allow(dead_code)] network: Network, + + /// A sender component of a channel used to send transactions to the queue. + queue_sender: Sender>, } impl RpcImpl where - Mempool: Service, + Mempool: Service + 'static, State: Service< - zebra_state::ReadRequest, - Response = zebra_state::ReadResponse, - Error = zebra_state::BoxError, - >, - Tip: ChainTip + Send + Sync, + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + Tip: ChainTip + Clone + Send + Sync + 'static, { /// Create a new instance of the RPC handler. pub fn new( @@ -187,17 +197,31 @@ where state: State, latest_chain_tip: Tip, network: Network, - ) -> Self + ) -> (Self, JoinHandle<()>) where Version: ToString, + >::Future: Send, + >::Future: Send, { - RpcImpl { + let runner = Queue::start(); + + let rpc_impl = RpcImpl { app_version: app_version.to_string(), - mempool, - state, - latest_chain_tip, + mempool: mempool.clone(), + state: state.clone(), + latest_chain_tip: latest_chain_tip.clone(), network, - } + queue_sender: runner.sender(), + }; + + // run the process queue + let rpc_tx_queue_task_handle = tokio::spawn( + runner + .run(mempool, state, latest_chain_tip, network) + .in_current_span(), + ); + + (rpc_impl, rpc_tx_queue_task_handle) } } @@ -327,6 +351,7 @@ where raw_transaction_hex: String, ) -> BoxFuture> { let mempool = self.mempool.clone(); + let queue_sender = self.queue_sender.clone(); async move { let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| { @@ -337,6 +362,10 @@ where let transaction_hash = raw_transaction.hash(); + // send transaction to the rpc queue, ignore any error. + let unmined_transaction = UnminedTx::from(raw_transaction.clone()); + let _ = queue_sender.send(Some(unmined_transaction)); + let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); let request = mempool::Request::Queue(vec![transaction_parameter]); diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index aac81f0ccb3..2288ff9abfb 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; +use futures::FutureExt; use hex::ToHex; use jsonrpc_core::{Error, ErrorCode}; use proptest::prelude::*; @@ -34,7 +35,7 @@ proptest! { runtime.block_on(async move { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -67,6 +68,10 @@ proptest! { prop_assert_eq!(result, Ok(hash)); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -82,7 +87,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -122,6 +127,10 @@ proptest! { "Result is not a server error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -135,7 +144,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -176,6 +185,10 @@ proptest! { "Result is not a server error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -196,7 +209,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -224,6 +237,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -246,7 +263,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -274,6 +291,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -295,7 +316,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -323,6 +344,10 @@ proptest! { prop_assert_eq!(result, Ok(expected_response)); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -343,7 +368,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -371,6 +396,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -393,7 +422,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -420,6 +449,11 @@ proptest! { ), "Result is not an invalid parameters error: {result:?}" ); + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -431,16 +465,23 @@ proptest! { let _guard = runtime.enter(); let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + // look for an error with a `NoChainTip` - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, network, ); + let response = rpc.get_blockchain_info(); prop_assert_eq!(&response.err().unwrap().message, "No Chain tip available yet"); + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + runtime.block_on(async move { mempool.expect_no_requests().await?; state.expect_no_requests().await?; @@ -471,7 +512,7 @@ proptest! { mock_chain_tip_sender.send_best_tip_block_time(block_time); // Start RPC with the mocked `ChainTip` - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -504,6 +545,10 @@ proptest! { }, }; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + // check no requests were made during this test runtime.block_on(async move { mempool.expect_no_requests().await?; @@ -512,6 +557,191 @@ proptest! { Ok::<_, TestCaseError>(()) })?; } + + /// Test the queue functionality using `send_raw_transaction` + #[test] + fn rpc_queue_main_loop(tx in any::()) + { + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + let transaction_hash = tx.hash(); + + runtime.block_on(async move { + tokio::time::pause(); + + let mut mempool = MockService::build().for_prop_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( + "RPC test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + // send a transaction + let tx_bytes = tx + .zcash_serialize_to_vec() + .expect("Transaction serializes successfully"); + let tx_hex = hex::encode(&tx_bytes); + let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); + + let tx_unmined = UnminedTx::from(tx); + let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); + + // fail the mempool insertion + mempool + .expect_request(expected_request) + .await + .unwrap() + .respond(Err(DummyError)); + + let _ = send_task + .await + .expect("Sending raw transactions should not panic"); + + // advance enough time to have a new runner iteration + let spacing = chrono::Duration::seconds(150); + tokio::time::advance(spacing.to_std().unwrap()).await; + + // the runner will made a new call to TransactionsById + let mut transactions_hash_set = HashSet::new(); + transactions_hash_set.insert(tx_unmined.id); + let expected_request = mempool::Request::TransactionsById(transactions_hash_set); + let response = mempool::Response::Transactions(vec![]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + // the runner will also query the state again for the transaction + let expected_request = zebra_state::ReadRequest::Transaction(transaction_hash); + let response = zebra_state::ReadResponse::Transaction(None); + + state + .expect_request(expected_request) + .await? + .respond(response); + + // now a retry will be sent to the mempool + let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]); + let response = mempool::Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + // no more requets are done + mempool.expect_no_requests().await?; + state.expect_no_requests().await?; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + + Ok::<_, TestCaseError>(()) + })?; + } + + /// Test we receive all transactions that are sent in a channel + #[test] + fn rpc_queue_receives_all_transactions_from_channel(txs in any::<[Transaction; 2]>()) + { + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + runtime.block_on(async move { + tokio::time::pause(); + + let mut mempool = MockService::build().for_prop_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( + "RPC test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + let mut transactions_hash_set = HashSet::new(); + for tx in txs.clone() { + // send a transaction + let tx_bytes = tx + .zcash_serialize_to_vec() + .expect("Transaction serializes successfully"); + let tx_hex = hex::encode(&tx_bytes); + let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex)); + + let tx_unmined = UnminedTx::from(tx.clone()); + let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]); + + // inser to hs we will use later + transactions_hash_set.insert(tx_unmined.id); + + // fail the mempool insertion + mempool + .clone() + .expect_request(expected_request) + .await + .unwrap() + .respond(Err(DummyError)); + + let _ = send_task + .await + .expect("Sending raw transactions should not panic"); + } + + // advance enough time to have a new runner iteration + let spacing = chrono::Duration::seconds(150); + tokio::time::advance(spacing.to_std().unwrap()).await; + + // the runner will made a new call to TransactionsById quering with both transactions + let expected_request = mempool::Request::TransactionsById(transactions_hash_set); + let response = mempool::Response::Transactions(vec![]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + // the runner will also query the state again for each transaction + for _tx in txs.clone() { + let response = zebra_state::ReadResponse::Transaction(None); + + // we use `expect_request_that` because we can't guarantee the state request order + state + .expect_request_that(|request| matches!(request, zebra_state::ReadRequest::Transaction(_))) + .await? + .respond(response); + } + + // each transaction will be retried + for tx in txs.clone() { + let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]); + let response = mempool::Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + } + + // no more requets are done + mempool.expect_no_requests().await?; + state.expect_no_requests().await?; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + + Ok::<_, TestCaseError>(()) + })?; + } } #[derive(Clone, Copy, Debug, Error)] diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 9dddd5a1c2d..99143f371c1 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -26,7 +26,7 @@ async fn rpc_getinfo() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -46,6 +46,10 @@ async fn rpc_getinfo() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -64,7 +68,7 @@ async fn rpc_getblock() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -83,6 +87,10 @@ async fn rpc_getblock() { } mempool.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -93,7 +101,7 @@ async fn rpc_getblock_parse_error() { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -109,6 +117,10 @@ async fn rpc_getblock_parse_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -119,7 +131,7 @@ async fn rpc_getblock_missing_error() { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -157,6 +169,10 @@ async fn rpc_getblock_missing_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -181,7 +197,7 @@ async fn rpc_getbestblockhash() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -199,6 +215,10 @@ async fn rpc_getbestblockhash() { assert_eq!(response_hash, tip_block_hash); mempool.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -217,7 +237,7 @@ async fn rpc_getrawtransaction() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -280,4 +300,8 @@ async fn rpc_getrawtransaction() { } } } + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs new file mode 100644 index 00000000000..6b960360fe7 --- /dev/null +++ b/zebra-rpc/src/queue.rs @@ -0,0 +1,323 @@ +//! Transaction Queue. +//! +//! All transactions that are sent from RPC methods should be added to this queue for retries. +//! Transactions can fail to be inserted to the mempool inmediatly by different reasons, +//! like having not mined utxos. +//! +//! The [`Queue`] is just an `IndexMap` of transactions with insertion date. +//! We use this data type because we want the transactions in the queue to be in order. +//! The [`Runner`] component will do the processing in it's [`Runner::run()`] method. + +use std::{collections::HashSet, sync::Arc}; + +use chrono::Duration; +use indexmap::IndexMap; +use tokio::{ + sync::broadcast::{channel, Receiver, Sender}, + time::Instant, +}; + +use tower::{Service, ServiceExt}; + +use zebra_chain::{ + block::Height, + chain_tip::ChainTip, + parameters::{Network, NetworkUpgrade}, + transaction::{Transaction, UnminedTx, UnminedTxId}, +}; +use zebra_node_services::{ + mempool::{Gossip, Request, Response}, + BoxError, +}; + +use zebra_state::{ReadRequest, ReadResponse}; + +#[cfg(test)] +mod tests; + +/// The approximate target number of blocks a transaction can be in the queue. +const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5; + +/// Size of the queue and channel. +const CHANNEL_AND_QUEUE_CAPACITY: usize = 20; + +/// The height to use in spacing calculation if we don't have a chain tip. +const NO_CHAIN_TIP_HEIGHT: Height = Height(1); + +#[derive(Clone, Debug)] +/// The queue is a container of transactions that are going to be +/// sent to the mempool again. +pub struct Queue { + transactions: IndexMap, Instant)>, +} + +#[derive(Debug)] +/// The runner will make the processing of the transactions in the queue. +pub struct Runner { + queue: Queue, + sender: Sender>, + tip_height: Height, +} + +impl Queue { + /// Start a new queue + pub fn start() -> Runner { + let (sender, _receiver) = channel(CHANNEL_AND_QUEUE_CAPACITY); + + let queue = Queue { + transactions: IndexMap::new(), + }; + + Runner { + queue, + sender, + tip_height: Height(0), + } + } + + /// Get the transactions in the queue. + pub fn transactions(&self) -> IndexMap, Instant)> { + self.transactions.clone() + } + + /// Insert a transaction to the queue. + pub fn insert(&mut self, unmined_tx: UnminedTx) { + self.transactions + .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now())); + + // remove if queue is over capacity + if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY { + self.remove_first(); + } + } + + /// Remove a transaction from the queue. + pub fn remove(&mut self, unmined_id: UnminedTxId) { + self.transactions.remove(&unmined_id); + } + + /// Remove the oldest transaction from the queue. + pub fn remove_first(&mut self) { + self.transactions.shift_remove_index(0); + } +} + +impl Runner { + /// Create a new sender for this runner. + pub fn sender(&self) -> Sender> { + self.sender.clone() + } + + /// Create a new receiver. + pub fn receiver(&self) -> Receiver> { + self.sender.subscribe() + } + + /// Get the queue transactions as a `HashSet` of unmined ids. + fn transactions_as_hash_set(&self) -> HashSet { + let transactions = self.queue.transactions(); + transactions.iter().map(|t| *t.0).collect() + } + + /// Get the queue transactions as a `Vec` of transactions. + fn transactions_as_vec(&self) -> Vec> { + let transactions = self.queue.transactions(); + transactions.iter().map(|t| t.1 .0.clone()).collect() + } + + /// Update the `tip_height` field with a new height. + pub fn update_tip_height(&mut self, height: Height) { + self.tip_height = height; + } + + /// Retry sending to memempool if needed. + /// + /// Creates a loop that will run each time a new block is mined. + /// In this loop, get the transactions that are in the queue and: + /// - Check if they are now in the mempool and if so, delete the transaction from the queue. + /// - Check if the transaction is now part of a block in the state and if so, + /// delete the transaction from the queue. + /// - With the transactions left in the queue, retry sending them to the mempool ignoring + /// the result of this operation. + /// + /// Addtionally, each iteration of the above loop, will receive and insert to the queue + /// transactions that are pending in the channel. + pub async fn run( + mut self, + mempool: Mempool, + state: State, + tip: Tip, + network: Network, + ) where + Mempool: Service + Clone + 'static, + State: Service + + Clone + + Send + + Sync + + 'static, + Tip: ChainTip + Clone + Send + Sync + 'static, + { + let mut receiver = self.sender.subscribe(); + + loop { + // if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing + let tip_height = match tip.best_tip_height() { + Some(height) => height, + _ => NO_CHAIN_TIP_HEIGHT, + }; + + // get spacing between blocks + let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height); + + // sleep until the next block + tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await; + + // get transactions from the channel + while let Ok(Some(tx)) = receiver.try_recv() { + let _ = &self.queue.insert(tx.clone()); + } + + // skip some work if stored tip height is the same as the one arriving + // TODO: check tip block hashes instead, so we always retry when there is a chain fork (these are rare) + if tip_height != self.tip_height { + // update the chain tip + self.update_tip_height(tip_height); + + if !self.queue.transactions().is_empty() { + // remove what is expired + self.remove_expired(spacing); + + // remove if any of the queued transactions is now in the mempool + let in_mempool = + Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_mempool); + + // remove if any of the queued transactions is now in the state + let in_state = + Self::check_state(state.clone(), self.transactions_as_hash_set()).await; + self.remove_committed(in_state); + + // retry what is left in the queue + let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; + } + } + } + } + + /// Remove transactions that are expired according to number of blocks and current spacing between blocks. + fn remove_expired(&mut self, spacing: Duration) { + // Have some extra time to to make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE` + // times, as the main loop also takes some time to run. + let extra_time = Duration::seconds(5); + + let duration_to_expire = + Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time; + let transactions = self.queue.transactions(); + let now = Instant::now(); + + for tx in transactions.iter() { + let tx_time = + tx.1 .1 + .checked_add( + duration_to_expire + .to_std() + .expect("should never be less than zero"), + ) + .expect("this is low numbers, should always be inside bounds"); + + if now > tx_time { + self.queue.remove(*tx.0); + } + } + } + + /// Remove transactions from the queue that had been inserted to the state or the mempool. + fn remove_committed(&mut self, to_remove: HashSet) { + for r in to_remove { + self.queue.remove(r); + } + } + + /// Check the mempool for given transactions. + /// + /// Returns transactions that are in the mempool. + async fn check_mempool( + mempool: Mempool, + transactions: HashSet, + ) -> HashSet + where + Mempool: Service + Clone + 'static, + { + let mut response = HashSet::new(); + + if !transactions.is_empty() { + let request = Request::TransactionsById(transactions); + + // ignore any error coming from the mempool + let mempool_response = mempool.oneshot(request).await; + if let Ok(Response::Transactions(txs)) = mempool_response { + for tx in txs { + response.insert(tx.id); + } + } + } + + response + } + + /// Check the state for given transactions. + /// + /// Returns transactions that are in the state. + async fn check_state( + state: State, + transactions: HashSet, + ) -> HashSet + where + State: Service + + Clone + + Send + + Sync + + 'static, + { + let mut response = HashSet::new(); + + for t in transactions { + let request = ReadRequest::Transaction(t.mined_id()); + + // ignore any error coming from the state + let state_response = state.clone().oneshot(request).await; + if let Ok(ReadResponse::Transaction(Some(tx))) = state_response { + response.insert(tx.0.unmined_id()); + } + } + + response + } + + /// Retry sending given transactions to mempool. + /// + /// Returns the transaction ids that were retried. + async fn retry( + mempool: Mempool, + transactions: Vec>, + ) -> HashSet + where + Mempool: Service + Clone + 'static, + { + let mut retried = HashSet::new(); + + for tx in transactions { + let unmined = UnminedTx::from(tx); + let gossip = Gossip::Tx(unmined.clone()); + let request = Request::Queue(vec![gossip]); + + // Send to memmpool and ignore any error + let _ = mempool.clone().oneshot(request).await; + + // retrurn what we retried but don't delete from the queue, + // we might retry again in a next call. + retried.insert(unmined.id); + } + retried + } +} diff --git a/zebra-rpc/src/queue/tests.rs b/zebra-rpc/src/queue/tests.rs new file mode 100644 index 00000000000..300832fd142 --- /dev/null +++ b/zebra-rpc/src/queue/tests.rs @@ -0,0 +1,3 @@ +//! Test code for the RPC queue + +mod prop; diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs new file mode 100644 index 00000000000..e4851d75627 --- /dev/null +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -0,0 +1,355 @@ +//! Randomised property tests for the RPC Queue. + +use std::{collections::HashSet, env, sync::Arc}; + +use proptest::prelude::*; + +use chrono::Duration; +use tokio::time; +use tower::ServiceExt; + +use zebra_chain::{ + block::{Block, Height}, + serialization::ZcashDeserializeInto, + transaction::{Transaction, UnminedTx}, +}; +use zebra_node_services::mempool::{Gossip, Request, Response}; +use zebra_state::{BoxError, ReadRequest, ReadResponse}; +use zebra_test::mock_service::MockService; + +use crate::queue::{Queue, Runner, CHANNEL_AND_QUEUE_CAPACITY}; + +/// The default number of proptest cases for these tests. +const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 2; + +proptest! { + #![proptest_config( + proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_BLOCK_VEC_PROPTEST_CASES)) + )] + + /// Test insert to the queue and remove from it. + #[test] + fn insert_remove_to_from_queue(transaction in any::()) { + // create a queue + let mut runner = Queue::start(); + + // insert transaction + runner.queue.insert(transaction.clone()); + + // transaction was inserted to queue + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(1, queue_transactions.len()); + + // remove transaction from the queue + runner.queue.remove(transaction.id); + + // transaction was removed from queue + prop_assert_eq!(runner.queue.transactions().len(), 0); + } + + /// Test queue never grows above limit. + #[test] + fn queue_size_limit(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY + 1]>()) { + // create a queue + let mut runner = Queue::start(); + + // insert all transactions we have + transactions.iter().for_each(|t| runner.queue.insert(t.clone())); + + // transaction queue is never above limit + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len()); + } + + /// Test queue order. + #[test] + fn queue_order(transactions in any::<[UnminedTx; 32]>()) { + // create a queue + let mut runner = Queue::start(); + // fill the queue and check insertion order + for i in 0..CHANNEL_AND_QUEUE_CAPACITY { + let transaction = transactions[i].clone(); + runner.queue.insert(transaction.clone()); + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(i + 1, queue_transactions.len()); + prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction); + } + + // queue is full + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len()); + + // keep adding transaction, new transactions will always be on top of the queue + for transaction in transactions.iter().skip(CHANNEL_AND_QUEUE_CAPACITY) { + runner.queue.insert(transaction.clone()); + let queue_transactions = runner.queue.transactions(); + prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len()); + prop_assert_eq!(UnminedTx::from(queue_transactions.last().unwrap().1.0.clone()), transaction.clone()); + } + + // check the order of the final queue + let queue_transactions = runner.queue.transactions(); + for i in 0..CHANNEL_AND_QUEUE_CAPACITY { + let transaction = transactions[(CHANNEL_AND_QUEUE_CAPACITY - 8) + i].clone(); + prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction); + } + } + + /// Test transactions are removed from the queue after time elapses. + #[test] + fn remove_expired_transactions_from_queue(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + // pause the clock + time::pause(); + + // create a queue + let mut runner = Queue::start(); + + // insert a transaction to the queue + runner.queue.insert(transaction); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // have a block interval value equal to the one at Height(1) + let spacing = Duration::seconds(150); + + // apply expiration inmediatly, transaction will not be removed from queue + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 1 block elapsed, transaction will not be removed from queue + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 2 blocks elapsed, transaction will not be removed from queue + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 3 blocks elapsed, transaction will not be removed from queue + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 4 blocks elapsed, transaction will not be removed from queue + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply expiration after 5 block elapsed, transaction will not be removed from queue + // as it needs the extra time of 5 seconds + time::advance(spacing.to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // apply 6 seconds more, transaction will be removed from the queue + time::advance(chrono::Duration::seconds(6).to_std().unwrap()).await; + runner.remove_expired(spacing); + prop_assert_eq!(runner.queue.transactions().len(), 0); + + Ok::<_, TestCaseError>(()) + })?; + } + + /// Test transactions are removed from queue after they get in the mempool + #[test] + fn queue_runner_mempool(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + let mut mempool = MockService::build().for_prop_tests(); + + // create a queue + let mut runner = Queue::start(); + + // insert a transaction to the queue + let unmined_transaction = UnminedTx::from(transaction); + runner.queue.insert(unmined_transaction.clone()); + let transactions = runner.queue.transactions(); + prop_assert_eq!(transactions.len(), 1); + + // get a `HashSet` of transactions to call mempool with + let transactions_hash_set = runner.transactions_as_hash_set(); + + // run the mempool checker + let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone())); + + // mempool checker will call the mempool looking for the transaction + let expected_request = Request::TransactionsById(transactions_hash_set.clone()); + let response = Response::Transactions(vec![]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + let result = send_task.await.expect("Requesting transactions should not panic"); + + // empty results, transaction is not in the mempool + prop_assert_eq!(result, HashSet::new()); + + // insert transaction to the mempool + let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); + let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); + let send_task = tokio::spawn(mempool.clone().oneshot(request)); + let response = Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let _ = send_task.await.expect("Inserting to mempool should not panic"); + + // check the mempool again + let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone())); + + // mempool checker will call the mempool looking for the transaction + let expected_request = Request::TransactionsById(transactions_hash_set); + let response = Response::Transactions(vec![unmined_transaction]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transactions should not panic"); + + // transaction is in the mempool + prop_assert_eq!(result.len(), 1); + + // but it is not deleted from the queue yet + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // delete by calling remove_committed + runner.remove_committed(result); + prop_assert_eq!(runner.queue.transactions().len(), 0); + + // no more requets expected + mempool.expect_no_requests().await?; + + Ok::<_, TestCaseError>(()) + })?; + } + + /// Test transactions are removed from queue after they get in the state + #[test] + fn queue_runner_state(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + let mut write_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + + // create a queue + let mut runner = Queue::start(); + + // insert a transaction to the queue + let unmined_transaction = UnminedTx::from(&transaction); + runner.queue.insert(unmined_transaction.clone()); + prop_assert_eq!(runner.queue.transactions().len(), 1); + + // get a `HashSet` of transactions to call state with + let transactions_hash_set = runner.transactions_as_hash_set(); + + let send_task = tokio::spawn(Runner::check_state(read_state.clone(), transactions_hash_set.clone())); + + let expected_request = ReadRequest::Transaction(transaction.hash()); + let response = ReadResponse::Transaction(None); + + read_state + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transaction should not panic"); + // transaction is not in the state + prop_assert_eq!(HashSet::new(), result); + + // get a block and push our transaction to it + let block = + zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into::>()?; + let mut block = Arc::try_unwrap(block).expect("block should unwrap"); + block.transactions.push(Arc::new(transaction.clone())); + + // commit the created block + let request = zebra_state::Request::CommitFinalizedBlock(zebra_state::FinalizedBlock::from(Arc::new(block.clone()))); + let send_task = tokio::spawn(write_state.clone().oneshot(request.clone())); + let response = zebra_state::Response::Committed(block.hash()); + + write_state + .expect_request(request) + .await? + .respond(response); + + let _ = send_task.await.expect("Inserting block to state should not panic"); + + // check the state again + let send_task = tokio::spawn(Runner::check_state(read_state.clone(), transactions_hash_set)); + + let expected_request = ReadRequest::Transaction(transaction.hash()); + let response = ReadResponse::Transaction(Some((Arc::new(transaction), Height(1)))); + + read_state + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transaction should not panic"); + + // transaction was found in the state + prop_assert_eq!(result.len(), 1); + + read_state.expect_no_requests().await?; + write_state.expect_no_requests().await?; + + Ok::<_, TestCaseError>(()) + })?; + } + + // Test any given transaction can be mempool retried. + #[test] + fn queue_mempool_retry(transaction in any::()) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + let mut mempool = MockService::build().for_prop_tests(); + + // create a queue + let mut runner = Queue::start(); + + // insert a transaction to the queue + let unmined_transaction = UnminedTx::from(transaction.clone()); + runner.queue.insert(unmined_transaction.clone()); + let transactions = runner.queue.transactions(); + prop_assert_eq!(transactions.len(), 1); + + // get a `Vec` of transactions to do retries + let transactions_vec = runner.transactions_as_vec(); + + // run retry + let send_task = tokio::spawn(Runner::retry(mempool.clone(), transactions_vec.clone())); + + // retry will queue the transaction to mempool + let gossip = Gossip::Tx(UnminedTx::from(transaction.clone())); + let expected_request = Request::Queue(vec![gossip]); + let response = Response::Queued(vec![Ok(())]); + + mempool + .expect_request(expected_request) + .await? + .respond(response); + + let result = send_task.await.expect("Requesting transactions should not panic"); + + // retry was done + prop_assert_eq!(result.len(), 1); + + Ok::<_, TestCaseError>(()) + })?; + } +} diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 523924cc5d4..c759e32cb6d 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -9,6 +9,7 @@ use jsonrpc_core; use jsonrpc_http_server::ServerBuilder; +use tokio::task::JoinHandle; use tower::{buffer::Buffer, Service}; use tracing::*; use tracing_futures::Instrument; @@ -37,7 +38,7 @@ impl RpcServer { state: State, latest_chain_tip: Tip, network: Network, - ) -> tokio::task::JoinHandle<()> + ) -> (JoinHandle<()>, JoinHandle<()>) where Version: ToString, Mempool: tower::Service @@ -52,13 +53,14 @@ impl RpcServer { + Sync + 'static, State::Future: Send, - Tip: ChainTip + Send + Sync + 'static, + Tip: ChainTip + Clone + Send + Sync + 'static, { if let Some(listen_addr) = config.listen_addr { info!("Trying to open RPC endpoint at {}...", listen_addr,); // Initialize the rpc methods with the zebra version - let rpc_impl = RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); + let (rpc_impl, rpc_tx_queue_task_handle) = + RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); // Create handler compatible with V1 and V2 RPC protocols let mut io = @@ -87,10 +89,16 @@ impl RpcServer { }) }; - tokio::task::spawn_blocking(server) + ( + tokio::task::spawn_blocking(server), + rpc_tx_queue_task_handle, + ) } else { - // There is no RPC port, so the RPC task does nothing. - tokio::task::spawn(futures::future::pending().in_current_span()) + // There is no RPC port, so the RPC tasks do nothing. + ( + tokio::task::spawn(futures::future::pending().in_current_span()), + tokio::task::spawn(futures::future::pending().in_current_span()), + ) } } } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 01e3d093775..05b4d726fcb 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -160,7 +160,7 @@ impl StartCmd { .service(mempool); // Launch RPC server - let rpc_task_handle = RpcServer::spawn( + let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( config.rpc, app_version(), mempool.clone(), @@ -183,7 +183,7 @@ impl StartCmd { let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); - let mut block_gossip_task_handle = tokio::spawn( + let block_gossip_task_handle = tokio::spawn( sync::gossip_best_tip_block_hashes( sync_status.clone(), chain_tip_change.clone(), @@ -218,7 +218,9 @@ impl StartCmd { // ongoing tasks pin!(rpc_task_handle); + pin!(rpc_tx_queue_task_handle); pin!(syncer_task_handle); + pin!(block_gossip_task_handle); pin!(mempool_crawler_task_handle); pin!(mempool_queue_checker_task_handle); pin!(tx_gossip_task_handle); @@ -240,6 +242,13 @@ impl StartCmd { Ok(()) } + rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => { + rpc_tx_queue_result + .expect("unexpected panic in the rpc transaction queue task"); + info!("rpc transaction queue task exited"); + Ok(()) + } + sync_result = &mut syncer_task_handle => sync_result .expect("unexpected panic in the syncer task") .map(|_| info!("syncer task exited")), @@ -298,12 +307,14 @@ impl StartCmd { info!("exiting Zebra because an ongoing task exited: stopping other tasks"); // ongoing tasks + rpc_task_handle.abort(); + rpc_tx_queue_task_handle.abort(); syncer_task_handle.abort(); block_gossip_task_handle.abort(); mempool_crawler_task_handle.abort(); mempool_queue_checker_task_handle.abort(); tx_gossip_task_handle.abort(); - rpc_task_handle.abort(); + progress_task_handle.abort(); // startup tasks groth16_download_handle.abort();