Skip to content

Commit

Permalink
refactor: use Box<dyn FnMut> for PubSub filter functions to allow…
Browse files Browse the repository at this point in the history
… dynamic filtering behavior
  • Loading branch information
tqwewe committed Jan 20, 2025
1 parent d09439a commit 96d6a36
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/actor/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use crate::{

use super::{ActorID, ActorRef};

type SubscriberMap<M> =
HashMap<ActorID, (Box<dyn MessageSubscriber<M> + Send + Sync>, fn(&M) -> bool)>;
type Subscriber<M> = Box<dyn MessageSubscriber<M> + Send>;
type FilterFn<M> = Box<dyn FnMut(&M) -> bool + Send>;

/// A publish-subscribe (pubsub) actor that allows message broadcasting to multiple subscribers.
///
Expand All @@ -70,7 +70,7 @@ type SubscriberMap<M> =
/// to manage it directly or interact with it via messages.
#[allow(missing_debug_implementations)]
pub struct PubSub<M> {
subscribers: SubscriberMap<M>,
subscribers: HashMap<ActorID, (Subscriber<M>, FilterFn<M>)>,
}

impl<M> PubSub<M> {
Expand Down Expand Up @@ -108,16 +108,14 @@ impl<M> PubSub<M> {
where
M: Clone + Send + 'static,
{
let results = join_all(
self.subscribers
.iter()
.filter_map(|(id, (subscriber, filter))| {
filter(&msg).then_some({
let msg = msg.clone();
async move { (*id, subscriber.tell(msg).await) }
})
}),
)
let results = join_all(self.subscribers.iter_mut().filter_map(
|(id, (subscriber, filter))| {
filter(&msg).then_some({
let msg = msg.clone();
async move { (*id, subscriber.tell(msg).await) }
})
},
))
.await;
for (id, result) in results.into_iter() {
match result {
Expand Down Expand Up @@ -171,7 +169,7 @@ impl<M> PubSub<M> {
MessageSend<Ok = (), Error = SendError<M, <A::Reply as Reply>::Error>>,
{
self.subscribers
.insert(actor_ref.id(), (Box::new(actor_ref), |_| true));
.insert(actor_ref.id(), (Box::new(actor_ref), Box::new(|_| true)));
}

/// Subscribes an actor to receive only messages published by the pubsub actor that pass the given
Expand Down Expand Up @@ -207,15 +205,18 @@ impl<M> PubSub<M> {
/// # })
/// ```
#[inline]
pub fn subscribe_filter<A>(&mut self, actor_ref: ActorRef<A>, filter: fn(&M) -> bool)
where
pub fn subscribe_filter<A>(
&mut self,
actor_ref: ActorRef<A>,
filter: impl FnMut(&M) -> bool + Send + 'static,
) where
A: Actor + Message<M>,
M: Send + 'static,
for<'a> TellRequest<LocalTellRequest<'a, A, A::Mailbox>, A::Mailbox, M, WithoutRequestTimeout>:
MessageSend<Ok = (), Error = SendError<M, <A::Reply as Reply>::Error>>,
{
self.subscribers
.insert(actor_ref.id(), (Box::new(actor_ref), filter));
.insert(actor_ref.id(), (Box::new(actor_ref), Box::new(filter)));
}
}

Expand Down

0 comments on commit 96d6a36

Please sign in to comment.