Skip to content

Commit

Permalink
refactor!: remove internment dependency and simplify peer ID handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tqwewe committed Jan 20, 2025
1 parent 05d22e4 commit c1cfdc3
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 79 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ keywords = ["actor", "tokio"]
[features]
default = ["macros", "tracing"]
macros = ["dep:kameo_macros"]
remote = ["dep:internment", "dep:libp2p", "dep:libp2p-identity", "dep:linkme", "dep:rmp-serde"]
remote = ["dep:libp2p", "dep:libp2p-identity", "dep:linkme", "dep:rmp-serde"]
tracing = ["dep:tracing", "tokio/tracing"]

[dependencies]
kameo_macros = { version = "0.14.0", path = "./macros", optional = true }

dyn-clone = "1.0"
futures = "0.3"
internment = { version = "0.8.5", features = ["serde"], optional = true }
libp2p = { version = "0.55.0", features = ["cbor", "dns", "kad", "mdns", "macros", "quic", "request-response", "rsa", "serde", "tokio"], optional = true }
libp2p-identity = { version = "0.2.9", features = ["rand", "rsa"], optional = true }
linkme = { version= "0.3.28", optional = true }
Expand Down
10 changes: 2 additions & 8 deletions src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,7 @@ where
);
remote::ActorSwarm::get()
.ok_or(RemoteSendError::SwarmNotBootstrapped)?
.link::<A, B>(
self.id.with_hydrate_peer_id(),
sibbling_ref.id.with_hydrate_peer_id(),
)
.link::<A, B>(self.id, sibbling_ref.id)
.await
}

Expand Down Expand Up @@ -574,10 +571,7 @@ where
self.links.lock().await.remove(&sibbling_ref.id);
remote::ActorSwarm::get()
.ok_or(RemoteSendError::SwarmNotBootstrapped)?
.unlink::<B>(
self.id.with_hydrate_peer_id(),
sibbling_ref.id.with_hydrate_peer_id(),
)
.unlink::<B>(self.id, sibbling_ref.id)
.await
}

Expand Down
72 changes: 32 additions & 40 deletions src/actor/id.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::sync::atomic::Ordering;
use std::{fmt, sync::atomic::AtomicU64};

#[cfg(feature = "remote")]
use internment::Intern;
use serde::{Deserialize, Serialize};

use crate::error::ActorIDFromBytesError;
Expand All @@ -18,9 +16,8 @@ static ACTOR_COUNTER: AtomicU64 = AtomicU64::new(0);
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct ActorID {
sequence_id: u64,
/// None indicates a local actor; the local peer ID should be used in this case.
#[cfg(feature = "remote")]
peer_id: Option<Intern<libp2p::PeerId>>,
peer_id: PeerIdKind,
}

impl ActorID {
Expand All @@ -40,7 +37,7 @@ impl ActorID {
ActorID {
sequence_id,
#[cfg(feature = "remote")]
peer_id: ActorSwarm::get().map(|swarm| *swarm.local_peer_id_intern()),
peer_id: PeerIdKind::Local,
}
}

Expand All @@ -58,7 +55,7 @@ impl ActorID {
pub fn new_with_peer_id(sequence_id: u64, peer_id: libp2p::PeerId) -> Self {
ActorID {
sequence_id,
peer_id: Some(Intern::new(peer_id)),
peer_id: PeerIdKind::PeerId(peer_id),
}
}

Expand Down Expand Up @@ -89,10 +86,10 @@ impl ActorID {
///
/// # Returns
///
/// An `Option<PeerId>`. `None` indicates a local actor.
/// An `Option<PeerId>`. `None` is returned if the peer ID is local and no [`ActorSwarm`] has been bootstrapped.
#[cfg(feature = "remote")]
pub fn peer_id(&self) -> Option<&libp2p::PeerId> {
self.peer_id.as_deref()
self.peer_id.peer_id()
}

/// Serializes the `ActorID` into a byte vector.
Expand All @@ -109,6 +106,7 @@ impl ActorID {
#[cfg(feature = "remote")]
let peer_id_bytes = self
.peer_id
.peer_id()
.map(|peer_id| peer_id.to_bytes())
.or_else(|| ActorSwarm::get().map(|swarm| swarm.local_peer_id().to_bytes()));
#[cfg(feature = "remote")]
Expand Down Expand Up @@ -139,9 +137,9 @@ impl ActorID {
// Extract the peer id
#[cfg(feature = "remote")]
let peer_id = if bytes.len() > 8 {
Some(Intern::new(libp2p::PeerId::from_bytes(&bytes[8..])?))
PeerIdKind::PeerId(libp2p::PeerId::from_bytes(&bytes[8..])?)
} else {
None
PeerIdKind::Local
};

Ok(ActorID {
Expand All @@ -150,34 +148,6 @@ impl ActorID {
peer_id,
})
}

/// Returns the interned `PeerId` associated with this `ActorID`, if any.
///
/// This method is primarily for internal use.
///
/// # Returns
///
/// An `Option<&Intern<PeerId>>`. `None` indicates a local actor.
#[cfg(feature = "remote")]
pub(crate) fn peer_id_intern(&self) -> Option<&Intern<libp2p::PeerId>> {
self.peer_id.as_ref()
}

/// Ensures the `ActorID` has an associated `peer_id`.
///
/// If the `ActorID` doesn't have a `peer_id`, this method associates it with the local `PeerId`.
/// This method is primarily for internal use.
///
/// # Returns
///
/// An `ActorID` with a guaranteed `peer_id`.
#[cfg(feature = "remote")]
pub(crate) fn with_hydrate_peer_id(mut self) -> ActorID {
if self.peer_id.is_none() {
self.peer_id = Some(*ActorSwarm::get().unwrap().local_peer_id_intern());
}
self
}
}

impl fmt::Display for ActorID {
Expand All @@ -186,7 +156,7 @@ impl fmt::Display for ActorID {
return write!(f, "ActorID({})", self.sequence_id);

#[cfg(feature = "remote")]
match self.peer_id {
match self.peer_id.peer_id() {
Some(peer_id) => write!(f, "ActorID({}, {peer_id})", self.sequence_id),
None => write!(f, "ActorID({}, local)", self.sequence_id),
}
Expand All @@ -196,7 +166,12 @@ impl fmt::Display for ActorID {
impl fmt::Debug for ActorID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(feature = "remote")]
return write!(f, "ActorID({:?}, {:?})", self.sequence_id, self.peer_id);
return write!(
f,
"ActorID({:?}, {:?})",
self.sequence_id,
self.peer_id.peer_id()
);

#[cfg(not(feature = "remote"))]
return write!(f, "ActorID({:?})", self.sequence_id);
Expand Down Expand Up @@ -228,3 +203,20 @@ impl<'de> Deserialize<'de> for ActorID {
})
}
}

#[cfg(feature = "remote")]
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
enum PeerIdKind {
Local,
PeerId(libp2p::PeerId),
}

#[cfg(feature = "remote")]
impl PeerIdKind {
fn peer_id(&self) -> Option<&libp2p::PeerId> {
match self {
PeerIdKind::Local => ActorSwarm::get().map(ActorSwarm::local_peer_id),
PeerIdKind::PeerId(peer_id) => Some(peer_id),
}
}
}
4 changes: 0 additions & 4 deletions src/actor/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,6 @@ where
let mut actor = state.shutdown().await;

let mut link_notificication_futures = FuturesUnordered::new();
#[cfg(feature = "remote")]
{
id = id.with_hydrate_peer_id();
}
{
let mut links = links.lock().await;
#[allow(unused_variables)]
Expand Down
24 changes: 8 additions & 16 deletions src/remote/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use core::task;
use std::{borrow::Cow, collections::HashMap, io, pin, time::Duration};

use futures::{ready, stream::FuturesUnordered, Future, FutureExt};
use internment::Intern;
use libp2p::{
core::{transport::ListenerId, ConnectedPoint},
identity::Keypair,
Expand Down Expand Up @@ -67,7 +66,7 @@ static ACTOR_SWARM: OnceCell<ActorSwarm> = OnceCell::new();
#[derive(Clone, Debug)]
pub struct ActorSwarm {
swarm_tx: SwarmSender,
local_peer_id: Intern<PeerId>,
local_peer_id: PeerId,
}

impl ActorSwarm {
Expand Down Expand Up @@ -141,7 +140,7 @@ impl ActorSwarm {
pub fn bootstrap_with_swarm(
mut swarm: Swarm<ActorSwarmBehaviour>,
) -> Result<&'static Self, BootstrapError> {
let local_peer_id = Intern::new(*swarm.local_peer_id());
let local_peer_id = *swarm.local_peer_id();
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let swarm_tx = SwarmSender(cmd_tx);

Expand Down Expand Up @@ -172,7 +171,6 @@ impl ActorSwarm {
/// This is for advanced cases and provides full control, returning an `ActorSwarmBehaviour` instance which
/// should be used to process the swarm manually.
pub fn bootstrap_manual(local_peer_id: PeerId) -> Option<(&'static Self, ActorSwarmHandler)> {
let local_peer_id = Intern::new(local_peer_id);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let swarm_tx = SwarmSender(cmd_tx);

Expand Down Expand Up @@ -239,10 +237,6 @@ impl ActorSwarm {
&self.local_peer_id
}

pub(crate) fn local_peer_id_intern(&self) -> &Intern<PeerId> {
&self.local_peer_id
}

/// Dials a peer using the provided dialing options.
///
/// This method can be used to connect to a known or unknown peer, specified by the options
Expand Down Expand Up @@ -362,10 +356,8 @@ impl ActorSwarm {
actor_ref: ActorRef<A>,
name: String,
) -> impl Future<Output = Result<(), RegistryError>> {
let actor_registration = ActorRegistration::new(
actor_ref.id().with_hydrate_peer_id(),
Cow::Borrowed(A::REMOTE_ID),
);
let actor_registration =
ActorRegistration::new(actor_ref.id(), Cow::Borrowed(A::REMOTE_ID));
let reply_rx = self
.swarm_tx
.send_with_reply(|reply| SwarmCommand::Register {
Expand All @@ -379,7 +371,7 @@ impl ActorSwarm {
let signal_mailbox = actor_ref.weak_signal_mailbox();
let links = actor_ref.links.clone();
REMOTE_REGISTRY.lock().await.insert(
actor_ref.id().with_hydrate_peer_id(),
actor_ref.id(),
RemoteRegistryActorRef {
actor_ref: Box::new(actor_ref),
signal_mailbox,
Expand Down Expand Up @@ -1039,7 +1031,7 @@ pub enum SwarmCommand {
/// An actor ask request.
Ask {
/// Peer ID.
peer_id: Intern<PeerId>,
peer_id: PeerId,
/// Actor ID.
actor_id: ActorID,
/// Actor remote ID.
Expand All @@ -1060,7 +1052,7 @@ pub enum SwarmCommand {
/// An actor tell request.
Tell {
/// Peer ID.
peer_id: Intern<PeerId>,
peer_id: PeerId,
/// Actor ID.
actor_id: ActorID,
/// Actor remote ID.
Expand Down Expand Up @@ -1544,7 +1536,7 @@ impl SwarmBehaviour for ActorSwarmBehaviour {
sibbling_remote_id: Cow<'static, str>,
) -> OutboundRequestId {
self.request_response.send_request(
actor_id.peer_id().unwrap(),
actor_id.peer_id().expect("swarm should be bootstrapped"),
SwarmRequest::Link {
actor_id,
actor_remote_id,
Expand Down
9 changes: 4 additions & 5 deletions src/request/ask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{future::IntoFuture, marker::PhantomData, time::Duration};
use tokio::{sync::oneshot, time::timeout};

#[cfg(feature = "remote")]
use crate::remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand, SwarmResponse};
use crate::remote::{RemoteActor, RemoteMessage, SwarmCommand, SwarmResponse};

use crate::{
actor,
Expand Down Expand Up @@ -955,10 +955,9 @@ where
let actor_id = actor_ref.id();
let (reply_tx, reply_rx) = oneshot::channel();
actor_ref.send_to_swarm(SwarmCommand::Ask {
peer_id: actor_id
.peer_id_intern()
.cloned()
.unwrap_or_else(|| *ActorSwarm::get().unwrap().local_peer_id_intern()),
peer_id: *actor_id
.peer_id()
.expect("actor swarm should be bootstrapped"),
actor_id,
actor_remote_id: Cow::Borrowed(<A as RemoteActor>::REMOTE_ID),
message_remote_id: Cow::Borrowed(<A as RemoteMessage<M>>::REMOTE_ID),
Expand Down
7 changes: 3 additions & 4 deletions src/request/tell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,9 @@ where
let actor_id = actor_ref.id();
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
actor_ref.send_to_swarm(SwarmCommand::Tell {
peer_id: actor_id
.peer_id_intern()
.cloned()
.unwrap_or_else(|| *ActorSwarm::get().unwrap().local_peer_id_intern()),
peer_id: *actor_id
.peer_id()
.expect("actor swarm should be bootstrapped"),
actor_id,
actor_remote_id: Cow::Borrowed(<A as RemoteActor>::REMOTE_ID),
message_remote_id: Cow::Borrowed(<A as RemoteMessage<M>>::REMOTE_ID),
Expand Down

0 comments on commit c1cfdc3

Please sign in to comment.