Skip to content

Commit

Permalink
Merge of #4015
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 7, 2022
2 parents 6aba60d + 2890b85 commit 970dd63
Show file tree
Hide file tree
Showing 9 changed files with 1,022 additions and 38 deletions.
1 change: 1 addition & 0 deletions zebra-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

pub mod config;
pub mod methods;
pub mod queue;
pub mod server;
#[cfg(test)]
mod tests;
55 changes: 42 additions & 13 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;

use zebra_chain::{
block::{self, Height, SerializedBlock},
chain_tip::ChainTip,
parameters::{ConsensusBranchId, Network, NetworkUpgrade},
serialization::{SerializationError, ZcashDeserialize},
transaction::{self, SerializedTransaction, Transaction},
transaction::{self, SerializedTransaction, Transaction, UnminedTx},
};
use zebra_network::constants::USER_AGENT;
use zebra_node_services::{mempool, BoxError};

use crate::queue::Queue;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -168,17 +172,23 @@ where
/// The configured network for this RPC service.
#[allow(dead_code)]
network: Network,

/// A sender component of a channel used to send transactions to the queue.
queue_sender: Sender<Option<UnminedTx>>,
}

impl<Mempool, State, Tip> RpcImpl<Mempool, State, Tip>
where
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>,
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError> + 'static,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
>,
Tip: ChainTip + Send + Sync,
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
/// Create a new instance of the RPC handler.
pub fn new<Version>(
Expand All @@ -187,17 +197,31 @@ where
state: State,
latest_chain_tip: Tip,
network: Network,
) -> Self
) -> (Self, JoinHandle<()>)
where
Version: ToString,
<Mempool as Service<mempool::Request>>::Future: Send,
<State as Service<zebra_state::ReadRequest>>::Future: Send,
{
RpcImpl {
let runner = Queue::start();

let rpc_impl = RpcImpl {
app_version: app_version.to_string(),
mempool,
state,
latest_chain_tip,
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
network,
}
queue_sender: runner.sender(),
};

// run the process queue
let rpc_tx_queue_task_handle = tokio::spawn(
runner
.run(mempool, state, latest_chain_tip, network)
.in_current_span(),
);

(rpc_impl, rpc_tx_queue_task_handle)
}
}

Expand Down Expand Up @@ -327,6 +351,7 @@ where
raw_transaction_hex: String,
) -> BoxFuture<Result<SentTransactionHash>> {
let mempool = self.mempool.clone();
let queue_sender = self.queue_sender.clone();

async move {
let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| {
Expand All @@ -337,6 +362,10 @@ where

let transaction_hash = raw_transaction.hash();

// send transaction to the rpc queue, ignore any error.
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(Some(unmined_transaction));

let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);

Expand Down
Loading

0 comments on commit 970dd63

Please sign in to comment.