Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): Implement an RPC transaction queue #4015

Merged
merged 28 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db75beb
Add a rpc queue
oxarbitrage Mar 30, 2022
706c89d
Implement the rpc queue
oxarbitrage Mar 31, 2022
e3e26e9
Add rpc queue tests
oxarbitrage Mar 31, 2022
3c3ef7b
Remove mutex, use broadcast channel
oxarbitrage Mar 31, 2022
a7547ae
Have order and limit in the queue
oxarbitrage Apr 1, 2022
d85a6b6
fix multiple transactions channel
oxarbitrage Apr 2, 2022
414d5ce
Use a network argument
oxarbitrage Apr 2, 2022
d17329f
Use chain tip to calculate block spacing
oxarbitrage Apr 2, 2022
2baff36
Add extra time
oxarbitrage Apr 2, 2022
1bb7279
Finalize the state check test
oxarbitrage Apr 2, 2022
473fd02
Add a retry test
oxarbitrage Apr 2, 2022
1d2879d
Fix description
oxarbitrage Apr 2, 2022
a88561d
fix some docs
oxarbitrage Apr 3, 2022
e83fe4f
add additional empty check to `Runner::run`
oxarbitrage Apr 3, 2022
9dc2531
remove non used method
oxarbitrage Apr 5, 2022
f825281
ignore some errors
oxarbitrage Apr 5, 2022
b2923c3
fix some docs
oxarbitrage Apr 5, 2022
12ce449
add a panic checker to the queue
oxarbitrage Apr 5, 2022
05c8797
add missing file changes for panic checker
oxarbitrage Apr 5, 2022
e018590
skip checks and retries if height has not changed
oxarbitrage Apr 5, 2022
4afcfba
change constants
oxarbitrage Apr 5, 2022
bba48a2
reduce the number of queue test cases
oxarbitrage Apr 5, 2022
bfd4a04
remove suggestion
oxarbitrage Apr 5, 2022
4e3229b
change best tip check
oxarbitrage Apr 5, 2022
ecfb7ed
fix(rpc): Check for panics in the transaction queue (#4046)
teor2345 Apr 6, 2022
31de051
Merge remote-tracking branch 'origin/main' into issue3654
teor2345 Apr 7, 2022
2890b85
Fixup a new RPC test from the main branch
teor2345 Apr 7, 2022
79b5937
Merge branch 'main' into issue3654
mergify[bot] Apr 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zebra-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

pub mod config;
pub mod methods;
pub mod queue;
pub mod server;
#[cfg(test)]
mod tests;
54 changes: 42 additions & 12 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use zebra_chain::{
chain_tip::ChainTip,
parameters::{ConsensusBranchId, Network, NetworkUpgrade},
serialization::{SerializationError, ZcashDeserialize},
transaction::{self, SerializedTransaction, Transaction},
transaction::{self, SerializedTransaction, Transaction, UnminedTx},
};
use zebra_network::constants::USER_AGENT;
use zebra_node_services::{mempool, BoxError};

use crate::queue::{Queue, Runner};

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -160,17 +162,23 @@ where
/// The configured network for this RPC service.
#[allow(dead_code)]
network: Network,

/// An instance of the RPC transaction queue
queue_runner: Runner,
}

impl<Mempool, State, Tip> RpcImpl<Mempool, State, Tip>
where
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>,
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError> + 'static,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
>,
Tip: ChainTip + Send + Sync,
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
/// Create a new instance of the RPC handler.
pub fn new<Version>(
Expand All @@ -182,14 +190,31 @@ where
) -> Self
where
Version: ToString,
<Mempool as Service<mempool::Request>>::Future: Send,
<State as Service<zebra_state::ReadRequest>>::Future: Send,
{
RpcImpl {
let (mut listener, runner) = Queue::start();

let rpc_impl = RpcImpl {
app_version: app_version.to_string(),
mempool,
state,
latest_chain_tip,
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
network,
}
queue_runner: runner.clone(),
};

// run the listener
tokio::spawn(async move {
listener.listen().await;
});

// run the process queue
tokio::spawn(async move {
runner.run(mempool, state, latest_chain_tip).await;
});

rpc_impl
}
}

Expand Down Expand Up @@ -319,6 +344,7 @@ where
raw_transaction_hex: String,
) -> BoxFuture<Result<SentTransactionHash>> {
let mempool = self.mempool.clone();
let queue_sender = self.queue_runner.sender();

async move {
let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| {
Expand All @@ -329,6 +355,10 @@ where

let transaction_hash = raw_transaction.hash();

// send transaction to the rpc queue
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(Some(unmined_transaction)).await;

let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);

Expand Down
88 changes: 88 additions & 0 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,94 @@ proptest! {
Ok::<_, TestCaseError>(())
})?;
}

/// Test the queue functionality using `send_raw_transaction`
#[test]
fn rpc_queue_main_loop(tx in any::<Transaction>())
{
let runtime = zebra_test::init_async();
let _guard = runtime.enter();

let transaction_hash = tx.hash();

runtime.block_on(async move {
tokio::time::pause();

let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
Mainnet,
);

// send a transaction
let tx_bytes = tx
.zcash_serialize_to_vec()
.expect("Transaction serializes successfully");
let tx_hex = hex::encode(&tx_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex));

let tx_unmined = UnminedTx::from(tx);
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);

// fail the mempool insertion
mempool
.expect_request(expected_request)
.await
.unwrap()
.respond(Err(DummyError));

let _ = send_task
.await
.expect("Sending raw transactions should not panic");

// make sure the transaction was inserted to the queue
prop_assert_eq!(rpc.queue_runner.queue().lock().unwrap().transactions().len(), 1);

// advance enough time to have a new runner iteration
let spacing = chrono::Duration::seconds(150);
tokio::time::advance(spacing.to_std().unwrap()).await;

// the runner will made a new call to TransactionsById
let mut transactions_hash_set = HashSet::new();
transactions_hash_set.insert(tx_unmined.id);
let expected_request = mempool::Request::TransactionsById(transactions_hash_set);
let response = mempool::Response::Transactions(vec![]);

mempool
.expect_request(expected_request)
.await?
.respond(response);

// the runner will also query the state again for the transaction
let expected_request = zebra_state::ReadRequest::Transaction(transaction_hash);
let response = zebra_state::ReadResponse::Transaction(None);

state
.expect_request(expected_request)
.await?
.respond(response);

// now a retry will be sent to the mempool
let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]);
let response = mempool::Response::Queued(vec![Ok(())]);

mempool
.expect_request(expected_request)
.await?
.respond(response);

// no more requets are done
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;

Ok::<_, TestCaseError>(())
})?;
}
}

#[derive(Clone, Copy, Debug, Error)]
Expand Down
Loading