Skip to content

Commit

Permalink
feat!: add support for remote actor links (tqwewe#116)
Browse files Browse the repository at this point in the history
* feat!: add support for remote linking and signal_link_died events

* feat!: signal links died when peer id disconnected

* feat: add `unlink_remote` method to `ActorRef`

* feat: add `link_remote` and `unlink_remote` methods to `RemoteActorRef`

* docs: add codedocs and examples to remote linking types

* fix: feature flags compilation

* fix: missing tracing feature gate

* docs: fix RemoteActor derive example
  • Loading branch information
tqwewe authored and hirschenberger committed Jan 20, 2025
1 parent 3b4567e commit dba23e9
Show file tree
Hide file tree
Showing 13 changed files with 1,233 additions and 151 deletions.
124 changes: 112 additions & 12 deletions examples/manual_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use kameo::{
actor::ActorID,
error::RemoteSendError,
remote::{
ActorSwarm, ActorSwarmBehaviourEvent, ActorSwarmHandler, SwarmBehaviour, SwarmRequest,
SwarmResponse,
ActorSwarm, ActorSwarmBehaviourEvent, ActorSwarmEvent, ActorSwarmHandler, SwarmBehaviour,
SwarmRequest, SwarmResponse,
},
};
use libp2p::{
Expand Down Expand Up @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
let (actor_swarm, mut swarm_handler) =
ActorSwarm::bootstrap_manual(swarm.local_peer_id().clone()).unwrap();
ActorSwarm::bootstrap_manual(*swarm.local_peer_id()).unwrap();

actor_swarm.listen_on(format!("/ip4/0.0.0.0/udp/{port}/quic-v1").parse()?);

Expand All @@ -83,16 +83,37 @@ fn handle_event(
.custom_request_response
.send_request(&peer_id, CustomRequest::Greet { name });
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
} => {
swarm_handler.handle_event(
swarm,
ActorSwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
},
);
}
SwarmEvent::Behaviour(event) => match event {
CustomBehaviourEvent::Kademlia(event) => {
swarm_handler.handle_event(swarm, ActorSwarmBehaviourEvent::Kademlia(event))
}
CustomBehaviourEvent::ActorRequestResponse(event) => {
swarm_handler.handle_event(swarm, ActorSwarmBehaviourEvent::RequestResponse(event))
}
CustomBehaviourEvent::Mdns(event) => {
swarm_handler.handle_event(swarm, ActorSwarmBehaviourEvent::Mdns(event))
}
CustomBehaviourEvent::Kademlia(event) => swarm_handler.handle_event(
swarm,
ActorSwarmEvent::Behaviour(ActorSwarmBehaviourEvent::Kademlia(event)),
),
CustomBehaviourEvent::ActorRequestResponse(event) => swarm_handler.handle_event(
swarm,
ActorSwarmEvent::Behaviour(ActorSwarmBehaviourEvent::RequestResponse(event)),
),
CustomBehaviourEvent::Mdns(event) => swarm_handler.handle_event(
swarm,
ActorSwarmEvent::Behaviour(ActorSwarmBehaviourEvent::Mdns(event)),
),
CustomBehaviourEvent::CustomRequestResponse(request_response::Event::Message {
message,
..
Expand Down Expand Up @@ -192,6 +213,58 @@ impl SwarmBehaviour for CustomBehaviour {
)
}

fn link(
&mut self,
actor_id: ActorID,
actor_remote_id: Cow<'static, str>,
sibbling_id: ActorID,
sibbling_remote_id: Cow<'static, str>,
) -> OutboundRequestId {
self.actor_request_response.send_request(
actor_id.peer_id().unwrap(),
SwarmRequest::Link {
actor_id,
actor_remote_id,
sibbling_id,
sibbling_remote_id,
},
)
}

fn unlink(
&mut self,
actor_id: ActorID,
actor_remote_id: Cow<'static, str>,
sibbling_id: ActorID,
) -> OutboundRequestId {
self.actor_request_response.send_request(
actor_id.peer_id().unwrap(),
SwarmRequest::Unlink {
actor_id,
actor_remote_id,
sibbling_id,
},
)
}

fn signal_link_died(
&mut self,
dead_actor_id: ActorID,
notified_actor_id: ActorID,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: kameo::error::ActorStopReason,
) -> OutboundRequestId {
self.actor_request_response.send_request(
notified_actor_id.peer_id().unwrap(),
SwarmRequest::SignalLinkDied {
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
},
)
}

fn send_ask_response(
&mut self,
channel: ResponseChannel<kameo::remote::SwarmResponse>,
Expand All @@ -210,6 +283,33 @@ impl SwarmBehaviour for CustomBehaviour {
.send_response(channel, SwarmResponse::Tell(result))
}

fn send_link_response(
&mut self,
channel: ResponseChannel<SwarmResponse>,
result: Result<(), RemoteSendError<kameo::error::Infallible>>,
) -> Result<(), SwarmResponse> {
self.actor_request_response
.send_response(channel, SwarmResponse::Link(result))
}

fn send_unlink_response(
&mut self,
channel: ResponseChannel<SwarmResponse>,
result: Result<(), RemoteSendError<kameo::error::Infallible>>,
) -> Result<(), SwarmResponse> {
self.actor_request_response
.send_response(channel, SwarmResponse::Unlink(result))
}

fn send_signal_link_died_response(
&mut self,
channel: ResponseChannel<SwarmResponse>,
result: Result<(), RemoteSendError<kameo::error::Infallible>>,
) -> Result<(), SwarmResponse> {
self.actor_request_response
.send_response(channel, SwarmResponse::SignalLinkDied(result))
}

fn kademlia_add_address(&mut self, peer: &PeerId, address: Multiaddr) -> kad::RoutingUpdate {
self.kademlia.add_address(peer, address)
}
Expand Down
68 changes: 65 additions & 3 deletions macros/src/derive_remote_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ use quote::{quote, ToTokens};
use syn::{
parse::{Parse, ParseStream},
spanned::Spanned,
DeriveInput, Expr, ExprAssign, ExprLit, Ident, Lit, LitStr,
DeriveInput, Expr, ExprAssign, ExprLit, Generics, Ident, Lit, LitStr,
};

pub struct DeriveRemoteActor {
attrs: DeriveRemoteActorAttrs,
generics: Generics,
ident: Ident,
}

impl ToTokens for DeriveRemoteActor {
fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) {
let Self { attrs, ident } = self;
let Self {
attrs,
generics,
ident,
} = self;
let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();

let id = match &attrs.id {
Some(id) => quote! { #id },
Expand All @@ -23,9 +29,64 @@ impl ToTokens for DeriveRemoteActor {

tokens.extend(quote! {
#[automatically_derived]
impl ::kameo::remote::RemoteActor for #ident {
impl #impl_generics ::kameo::remote::RemoteActor for #ident #ty_generics #where_clause {
const REMOTE_ID: &'static str = #id;
}

const _: () = {
#[::kameo::remote::_internal::linkme::distributed_slice(
::kameo::remote::_internal::REMOTE_ACTORS
)]
#[linkme(crate = ::kameo::remote::_internal::linkme)]
static REG: (
&'static str,
::kameo::remote::_internal::RemoteActorFns,
) = (
<#ident #ty_generics as ::kameo::remote::RemoteActor>::REMOTE_ID,
::kameo::remote::_internal::RemoteActorFns {
link: (
|
actor_id: ::kameo::actor::ActorID,
sibbling_id: ::kameo::actor::ActorID,
sibbling_remote_id: ::std::borrow::Cow<'static, str>,
| {
::std::boxed::Box::pin(::kameo::remote::_internal::link::<
#ident #ty_generics,
>(
actor_id,
sibbling_id,
sibbling_remote_id,
))
}) as ::kameo::remote::_internal::RemoteLinkFn,
unlink: (
|
actor_id: ::kameo::actor::ActorID,
sibbling_id: ::kameo::actor::ActorID,
| {
::std::boxed::Box::pin(::kameo::remote::_internal::unlink::<
#ident #ty_generics,
>(
actor_id,
sibbling_id,
))
}) as ::kameo::remote::_internal::RemoteUnlinkFn,
signal_link_died: (
|
dead_actor_id: ::kameo::actor::ActorID,
notified_actor_id: ::kameo::actor::ActorID,
stop_reason: kameo::error::ActorStopReason,
| {
::std::boxed::Box::pin(::kameo::remote::_internal::signal_link_died::<
#ident #ty_generics,
>(
dead_actor_id,
notified_actor_id,
stop_reason,
))
}) as ::kameo::remote::_internal::RemoteSignalLinkDiedFn,
},
);
};
});
}
}
Expand All @@ -49,6 +110,7 @@ impl Parse for DeriveRemoteActor {

Ok(DeriveRemoteActor {
attrs: attrs.unwrap_or_default(),
generics: input.generics,
ident,
})
}
Expand Down
4 changes: 0 additions & 4 deletions macros/src/remote_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ impl Parse for RemoteMessage {
"expected angle bracket arguments",
));
};
// let mut trait_path_segments_iter = trait_path.segments.into_iter();
// let trait_ident = trait_path_segments_iter.next().ok_or_else(|| {
// syn::Error::new(trait_path_span, "expected trait");
// });

Ok(RemoteMessage {
item_impl,
Expand Down
2 changes: 2 additions & 0 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ pub trait Actor: Sized + Send + 'static {
id,
reason: Box::new(reason),
})),
#[cfg(feature = "remote")]
ActorStopReason::PeerDisconnected => Ok(Some(ActorStopReason::PeerDisconnected)),
}
}
}
Expand Down
Loading

0 comments on commit dba23e9

Please sign in to comment.