Skip to content

Commit

Permalink
add bound parameter to Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
oxarbitrage committed Feb 4, 2021
1 parent d7c40af commit ba63e1a
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 19 deletions.
22 changes: 20 additions & 2 deletions tower-batch/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tower::Service;
pub struct BatchLayer<Request> {
max_items: usize,
max_latency: std::time::Duration,
bound: usize,
_p: PhantomData<fn(Request)>,
}

Expand All @@ -23,10 +24,27 @@ impl<Request> BatchLayer<Request> {
///
/// * `max_items` gives the maximum number of items per batch.
/// * `max_latency` gives the maximum latency for a batch item.
pub fn new(max_items: usize, max_latency: std::time::Duration) -> Self {
/// * `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// # A note on choosing a `bound`
///
/// When [`Batch`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
/// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the [`Batch`] will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
/// requests.
///
/// [`Poll::Ready`]: std::task::Poll::Ready
/// [`call`]: crate::Service::call
/// [`poll_ready`]: crate::Service::poll_ready
pub fn new(max_items: usize, max_latency: std::time::Duration, bound: usize) -> Self {
BatchLayer {
max_items,
max_latency,
bound,
_p: PhantomData,
}
}
Expand All @@ -42,7 +60,7 @@ where
type Service = Batch<S, Request>;

fn layer(&self, service: S) -> Self::Service {
Batch::new(service, self.max_items, self.max_latency)
Batch::new(service, self.max_items, self.max_latency, self.bound)
}
}

Expand Down
20 changes: 19 additions & 1 deletion tower-batch/src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
future::Future,
mem,
pin::Pin,
sync::Arc,
sync::{Arc, Weak},
task::{Context, Poll},
};
use tokio::sync;
Expand All @@ -20,13 +20,31 @@ pub(crate) struct Semaphore {
state: State,
}

#[derive(Debug)]
pub(crate) struct Close {
semaphore: Weak<sync::Semaphore>,
}

enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync + 'static>>),
Ready(Permit),
Empty,
}

impl Semaphore {
pub(crate) fn new_with_close(permits: usize) -> (Self, Close) {
let semaphore = Arc::new(sync::Semaphore::new(permits));
let close = Close {
semaphore: Arc::downgrade(&semaphore),
};
let semaphore = Self {
semaphore,
state: State::Empty,
};
(semaphore, close)
}

#[allow(dead_code)]
pub(crate) fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(sync::Semaphore::new(permits)),
Expand Down
59 changes: 48 additions & 11 deletions tower-batch/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,64 @@ where
///
/// * `max_items` gives the maximum number of items per batch.
/// * `max_latency` gives the maximum latency for a batch item.
/// * `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// The default Tokio executor is used to run the given service, which means
/// that this method must be called while on the Tokio runtime.
pub fn new(service: T, max_items: usize, max_latency: std::time::Duration) -> Self
///
/// # A note on choosing a `bound`
///
/// When [`Batch`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
/// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the [`Batch`] will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
/// requests.
///
/// [`Poll::Ready`]: std::task::Poll::Ready
/// [`call`]: crate::Service::call
/// [`poll_ready`]: crate::Service::poll_ready
pub fn new(service: T, max_items: usize, max_latency: std::time::Duration, bound: usize) -> Self
where
T: Send + 'static,
T::Future: Send,
T::Error: Send + Sync,
Request: Send + 'static,
{
// XXX(hdevalence): is this bound good
let bound = 1;
let (tx, rx) = mpsc::unbounded_channel();
let (handle, worker) = Worker::new(service, rx, max_items, max_latency);
let (service, worker) = Self::pair(service, max_items, max_latency, bound);
tokio::spawn(worker.run());
let semaphore = Semaphore::new(bound);
Batch {
tx,
handle,
semaphore,
}
service
}

/// Creates a new [`Batch`] wrapping `service`, but returns the background worker.
///
/// This is useful if you do not want to spawn directly onto the tokio runtime
/// but instead want to use your own executor. This will return the [`Batch`] and
/// the background `Worker` that you can then spawn.
pub fn pair(
service: T,
max_items: usize,
max_latency: std::time::Duration,
bound: usize,
) -> (Batch<T, Request>, Worker<T, Request>)
where
T: Send + 'static,
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let (semaphore, wake_waiters) = Semaphore::new_with_close(bound);
let (handle, worker) = Worker::new(service, rx, max_items, max_latency, wake_waiters);
(
Batch {
tx,
handle,
semaphore,
},
worker,
)
}

fn get_worker_error(&self) -> crate::BoxError {
Expand Down
3 changes: 3 additions & 0 deletions tower-batch/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
handle: Handle,
max_items: usize,
max_latency: std::time::Duration,
close: Option<crate::semaphore::Close>,
}

/// Get the error out
Expand All @@ -54,6 +55,7 @@ where
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
max_items: usize,
max_latency: std::time::Duration,
close: crate::semaphore::Close,
) -> (Handle, Worker<T, Request>) {
let handle = Handle {
inner: Arc::new(Mutex::new(None)),
Expand All @@ -66,6 +68,7 @@ where
failed: None,
max_items,
max_latency,
close: Some(close),
};

(handle, worker)
Expand Down
6 changes: 3 additions & 3 deletions tower-batch/tests/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> {

// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000));
let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000), 1);
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await
.map_err(|e| eyre!(e))?
Expand All @@ -145,7 +145,7 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> {

// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500));
let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500), 1);
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await
.map_err(|e| eyre!(e))?
Expand All @@ -159,7 +159,7 @@ async fn fallback_verification() -> Result<(), Report> {
zebra_test::init();

let verifier = Fallback::new(
Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100)),
Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100), 1),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
);

Expand Down
2 changes: 2 additions & 0 deletions zebra-consensus/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub mod redjubjub;
const MAX_BATCH_SIZE: usize = 64;
/// The maximum latency bound for any of the batch verifiers.
const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100);
/// The bound size
pub const MAX_REQUESTS: usize = 1;
1 change: 1 addition & 0 deletions zebra-consensus/src/primitives/redjubjub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub static VERIFIER: Lazy<
Verifier::default(),
super::MAX_BATCH_SIZE,
super::MAX_BATCH_LATENCY,
super::MAX_REQUESTS,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/src/primitives/redjubjub/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {

// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000));
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000), 1);
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100))
.await?
.map_err(|e| eyre!(e))?;
Expand All @@ -75,7 +75,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> {

// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500));
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500), 1);
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10))
.await?
.map_err(|e| eyre!(e))?;
Expand Down

0 comments on commit ba63e1a

Please sign in to comment.