Skip to content

Commit

Permalink
dispatch to available children only. (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
o0Ignition0o authored Aug 20, 2020
1 parent 3cbda62 commit 6a9f11e
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 42 deletions.
32 changes: 28 additions & 4 deletions src/bastion/examples/round_robin_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use bastion::prelude::*;
use futures_timer::Delay;
use std::sync::Arc;
use std::time::Duration;
use tracing::Level;

///
/// Prologue:
Expand All @@ -22,7 +25,28 @@ use std::sync::Arc;
/// 3. We want to use a dispatcher on the second group because we don't want to
/// target a particular child in the first to process the message.
///
/// The output looks like:
/// ```
/// Running `target\debug\examples\round_robin_dispatcher.exe`
/// Aug 20 16:52:19.925 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:19.926 WARN round_robin_dispatcher: Received data_1
/// Aug 20 16:52:20.932 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:20.933 WARN round_robin_dispatcher: Received data_2
/// Aug 20 16:52:21.939 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:21.941 WARN round_robin_dispatcher: Received data_3
/// Aug 20 16:52:22.947 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:22.948 WARN round_robin_dispatcher: Received data_4
/// Aug 20 16:52:23.954 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:23.955 WARN round_robin_dispatcher: Received data_5
/// ```
fn main() {
// Initialize tracing logger
// so we get nice output on the console.
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();

// We need bastion to run our program
Bastion::init();
// We create the supervisor and we add both groups on it
Expand Down Expand Up @@ -57,8 +81,10 @@ fn caller_group(children: Children) -> Children {
let target = BroadcastTarget::Group("Receiver".to_string());
// We iterate on each data
for data in data_to_send {
Delay::new(Duration::from_secs(1)).await;
tracing::warn!("sending message");
// We broadcast the message containing the data to the defined target
ctx.broadcast_message(target.clone(), data)
ctx.broadcast_message(target.clone(), data);
}
// We stop bastion here, because we don't have more data to send
Bastion::stop();
Expand All @@ -70,8 +96,6 @@ fn caller_group(children: Children) -> Children {
fn receiver_group(children: Children) -> Children {
// We create the second group of children
children
// We want to have 5 children in this group
.with_redundancy(5)
// We want to have a disptacher named `Receiver`
.with_dispatcher(Dispatcher::with_type(DispatcherType::Named(
"Receiver".to_string(),
Expand All @@ -93,7 +117,7 @@ fn receiver_group(children: Children) -> Children {
// Because it's a broadcasted message we can use directly the ref
ref data: &str => {
// And we print it
println!("Received {}", data);
tracing::warn!("Received {}", data);
};
_: _ => ();
}
Expand Down
52 changes: 52 additions & 0 deletions src/bastion/src/child_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,28 @@ pub struct ChildRef {
sender: Sender,
name: String,
path: Arc<BastionPath>,
// True if the ChildRef references a child that will receive user defined messages.
// use `ChildRef::new_internal` to set it to false, for internal use children,
// such as the heartbeat children for example
is_public: bool,
}

impl ChildRef {
pub(crate) fn new_internal(
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
) -> ChildRef {
ChildRef {
id,
sender,
name,
path,
is_public: false,
}
}

pub(crate) fn new(
id: BastionId,
sender: Sender,
Expand All @@ -33,6 +52,7 @@ impl ChildRef {
sender,
name,
path,
is_public: true,
}
}

Expand Down Expand Up @@ -67,6 +87,38 @@ impl ChildRef {
&self.id
}

/// Returns true if the child this `ChildRef` is referencing is public,
/// Which means it can receive messages. private `ChildRef`s
/// reference bastion internal children, such as the heartbeat child for example.
/// This function comes in handy when implementing your own dispatchers.
///
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx| {
/// async move {
/// if ctx.current().is_public() {
/// // ...
/// }
/// # Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
pub fn is_public(&self) -> bool {
self.is_public
}

/// Sends a message to the child this `ChildRef` is referencing.
/// This message is intended to be used outside of Bastion context when
/// there is no way for receiver to identify message sender
Expand Down
36 changes: 17 additions & 19 deletions src/bastion/src/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,26 +453,24 @@ impl Children {
/// # use bastion::prelude::*;
/// # use std::time::Duration;
/// #
/// # fn main() {
/// # Bastion::init();
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children
/// .with_heartbeat_tick(Duration::from_secs(5))
/// .with_exec(|ctx| {
/// // -- Children group started.
/// async move {
/// // ...
/// # Ok(())
/// }
/// // -- Children group stopped.
/// })
/// children
/// .with_heartbeat_tick(Duration::from_secs(5))
/// .with_exec(|ctx| {
/// // -- Children group started.
/// async move {
/// // ...
/// # Ok(())
/// }
/// // -- Children group stopped.
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
/// [`std::time::Duration`]: https://doc.rust-lang.org/nightly/core/time/struct.Duration.html
pub fn with_heartbeat_tick(mut self, interval: Duration) -> Self {
Expand Down Expand Up @@ -936,7 +934,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), name, path);
let child_ref = ChildRef::new_internal(id.clone(), sender.clone(), name, path);

let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
Expand Down
40 changes: 21 additions & 19 deletions src/bastion/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use lever::prelude::*;
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
};
use tracing::{trace, warn};
use tracing::{debug, trace, warn};

/// Type alias for the concurrency hashmap. Each key-value pair stores
/// the Bastion identifier as the key and the module name as the value.
Expand Down Expand Up @@ -66,7 +66,7 @@ pub type DefaultDispatcherHandler = RoundRobinHandler;
/// Dispatcher that will do simple round-robin distribution
#[derive(Default, Debug)]
pub struct RoundRobinHandler {
index: AtomicU64,
index: AtomicUsize,
}

impl DispatcherHandler for RoundRobinHandler {
Expand All @@ -80,25 +80,27 @@ impl DispatcherHandler for RoundRobinHandler {
}
// Each child in turn will receive a message.
fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>) {
if entries.len() == 0 {
return;
}

let current_index = self.index.load(Ordering::SeqCst) % entries.len() as u64;

let mut skipped = 0;
for pair in entries.iter() {
if skipped != current_index {
skipped += 1;
continue;
}
let entries = entries
.iter()
.filter(|entry| entry.0.is_public())
.collect::<Vec<_>>();

let entry = pair.0;
entry.tell_anonymously(message.clone()).unwrap();
break;
if entries.is_empty() {
debug!("no public children to broadcast message to");
return;
}
let current_index = self.index.load(Ordering::SeqCst) % entries.len();

self.index.store(current_index + 1, Ordering::SeqCst);
if let Some(entry) = entries.get(current_index) {
warn!(
"sending message to child {}/{} - {}",
current_index + 1,
entries.len(),
entry.0.path()
);
entry.0.tell_anonymously(message.clone()).unwrap();
self.index.store(current_index + 1, Ordering::SeqCst);
};
}
}
/// Generic trait which any custom dispatcher handler must implement for
Expand Down

0 comments on commit 6a9f11e

Please sign in to comment.