Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): Implement an RPC transaction queue #4015

Merged
merged 28 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db75beb
Add a rpc queue
oxarbitrage Mar 30, 2022
706c89d
Implement the rpc queue
oxarbitrage Mar 31, 2022
e3e26e9
Add rpc queue tests
oxarbitrage Mar 31, 2022
3c3ef7b
Remove mutex, use broadcast channel
oxarbitrage Mar 31, 2022
a7547ae
Have order and limit in the queue
oxarbitrage Apr 1, 2022
d85a6b6
fix multiple transactions channel
oxarbitrage Apr 2, 2022
414d5ce
Use a network argument
oxarbitrage Apr 2, 2022
d17329f
Use chain tip to calculate block spacing
oxarbitrage Apr 2, 2022
2baff36
Add extra time
oxarbitrage Apr 2, 2022
1bb7279
Finalize the state check test
oxarbitrage Apr 2, 2022
473fd02
Add a retry test
oxarbitrage Apr 2, 2022
1d2879d
Fix description
oxarbitrage Apr 2, 2022
a88561d
fix some docs
oxarbitrage Apr 3, 2022
e83fe4f
add additional empty check to `Runner::run`
oxarbitrage Apr 3, 2022
9dc2531
remove non used method
oxarbitrage Apr 5, 2022
f825281
ignore some errors
oxarbitrage Apr 5, 2022
b2923c3
fix some docs
oxarbitrage Apr 5, 2022
12ce449
add a panic checker to the queue
oxarbitrage Apr 5, 2022
05c8797
add missing file changes for panic checker
oxarbitrage Apr 5, 2022
e018590
skip checks and retries if height has not changed
oxarbitrage Apr 5, 2022
4afcfba
change constants
oxarbitrage Apr 5, 2022
bba48a2
reduce the number of queue test cases
oxarbitrage Apr 5, 2022
bfd4a04
remove suggestion
oxarbitrage Apr 5, 2022
4e3229b
change best tip check
oxarbitrage Apr 5, 2022
ecfb7ed
fix(rpc): Check for panics in the transaction queue (#4046)
teor2345 Apr 6, 2022
31de051
Merge remote-tracking branch 'origin/main' into issue3654
teor2345 Apr 7, 2022
2890b85
Fixup a new RPC test from the main branch
teor2345 Apr 7, 2022
79b5937
Merge branch 'main' into issue3654
mergify[bot] Apr 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zebra-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

pub mod config;
pub mod methods;
pub mod queue;
pub mod server;
#[cfg(test)]
mod tests;
55 changes: 42 additions & 13 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;

use zebra_chain::{
block::{self, Height, SerializedBlock},
chain_tip::ChainTip,
parameters::{ConsensusBranchId, Network, NetworkUpgrade},
serialization::{SerializationError, ZcashDeserialize},
transaction::{self, SerializedTransaction, Transaction},
transaction::{self, SerializedTransaction, Transaction, UnminedTx},
};
use zebra_network::constants::USER_AGENT;
use zebra_node_services::{mempool, BoxError};

use crate::queue::Queue;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -168,17 +172,23 @@ where
/// The configured network for this RPC service.
#[allow(dead_code)]
network: Network,

/// A sender component of a channel used to send transactions to the queue.
queue_sender: Sender<Option<UnminedTx>>,
}

impl<Mempool, State, Tip> RpcImpl<Mempool, State, Tip>
where
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>,
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError> + 'static,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
>,
Tip: ChainTip + Send + Sync,
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
/// Create a new instance of the RPC handler.
pub fn new<Version>(
Expand All @@ -187,17 +197,31 @@ where
state: State,
latest_chain_tip: Tip,
network: Network,
) -> Self
) -> (Self, JoinHandle<()>)
where
Version: ToString,
<Mempool as Service<mempool::Request>>::Future: Send,
<State as Service<zebra_state::ReadRequest>>::Future: Send,
{
RpcImpl {
let runner = Queue::start();

let rpc_impl = RpcImpl {
app_version: app_version.to_string(),
mempool,
state,
latest_chain_tip,
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
network,
}
queue_sender: runner.sender(),
};

// run the process queue
let rpc_tx_queue_task_handle = tokio::spawn(
runner
.run(mempool, state, latest_chain_tip, network)
.in_current_span(),
);

(rpc_impl, rpc_tx_queue_task_handle)
}
}

Expand Down Expand Up @@ -327,6 +351,7 @@ where
raw_transaction_hex: String,
) -> BoxFuture<Result<SentTransactionHash>> {
let mempool = self.mempool.clone();
let queue_sender = self.queue_sender.clone();

async move {
let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| {
Expand All @@ -337,6 +362,10 @@ where

let transaction_hash = raw_transaction.hash();

// send transaction to the rpc queue, ignore any error.
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(Some(unmined_transaction));

let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);

Expand Down
Loading