Skip to content

Commit

Permalink
Observer actor
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed May 8, 2024
1 parent 8410280 commit d7b05a2
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 23 deletions.
2 changes: 1 addition & 1 deletion messages/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Debug, Formatter};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
ops::{Deref, DerefMut},
};

Expand Down
10 changes: 6 additions & 4 deletions transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ subsquid-messages = { path = "../messages", features = ["signatures"] }

[features]
actors = []
proto = []
request-client = []
request-server = []
gateway = ["actors", "request-client"]
logs-collector = ["actors", "request-server"]
scheduler = ["actors", "request-client"]
worker = ["actors", "request-client", "request-server"]
gateway = ["actors", "request-client", "proto"]
logs-collector = ["actors", "request-server", "proto"]
observer = ["actors"]
scheduler = ["actors", "request-client", "proto"]
worker = ["actors", "request-client", "request-server", "proto"]
metrics = ["libp2p/metrics", "prometheus-client"]
2 changes: 2 additions & 0 deletions transport/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
pub mod gateway;
#[cfg(feature = "logs-collector")]
pub mod logs_collector;
#[cfg(feature = "observer")]
pub mod observer;
#[cfg(feature = "scheduler")]
pub mod scheduler;
#[cfg(feature = "worker")]
Expand Down
4 changes: 2 additions & 2 deletions transport/src/actors/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use subsquid_messages::{

use crate::{
behaviour::{
base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE},
base::{BaseBehaviour, BaseBehaviourEvent},
request_client::{ClientBehaviour, ClientConfig, ClientEvent},
wrapped::{BehaviourWrapper, TToSwarm, Wrapped},
},
codec::ProtoCodec,
codec::{ProtoCodec, ACK_SIZE},
protocol::{
GATEWAY_LOGS_PROTOCOL, MAX_GATEWAY_LOG_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_SIZE,
QUERY_PROTOCOL,
Expand Down
4 changes: 2 additions & 2 deletions transport/src/actors/logs_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use subsquid_messages::{

use crate::{
behaviour::{
base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE},
base::{BaseBehaviour, BaseBehaviourEvent},
request_server::{Request, ServerBehaviour},
wrapped::{BehaviourWrapper, TToSwarm, Wrapped},
},
codec::ProtoCodec,
codec::{ProtoCodec, ACK_SIZE},
protocol::{
GATEWAY_LOGS_PROTOCOL, MAX_GATEWAY_LOG_SIZE, MAX_WORKER_LOGS_SIZE, WORKER_LOGS_PROTOCOL,
},
Expand Down
149 changes: 149 additions & 0 deletions transport/src/actors/observer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use futures_core::Stream;
use libp2p::{
swarm::{NetworkBehaviour, SwarmEvent, ToSwarm},
PeerId, Swarm,
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;

use subsquid_messages::{broadcast_msg, BroadcastMsg, LogsCollected, Ping};

use crate::{
behaviour::{
base::{BaseBehaviour, BaseBehaviourEvent},
wrapped::{BehaviourWrapper, TToSwarm, Wrapped},
},
record_event,
util::{TaskManager, DEFAULT_SHUTDOWN_TIMEOUT},
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ObserverEvent {
Ping { peer_id: PeerId, ping: Ping },
LogsCollected(LogsCollected),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObserverConfig {
pub logs_collector_id: PeerId,
pub events_queue_size: usize,
pub shutdown_timeout: Duration,
}

impl ObserverConfig {
pub fn new(logs_collector_id: PeerId) -> Self {
Self {
logs_collector_id,
events_queue_size: 100,
shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT,
}
}
}

pub struct ObserverBehaviour {
base: Wrapped<BaseBehaviour>,
logs_collector_id: PeerId,
}

impl ObserverBehaviour {
pub fn new(mut base: BaseBehaviour, logs_collector_id: PeerId) -> Wrapped<Self> {
base.subscribe_pings();
base.subscribe_logs_collected();
Self {
base: base.into(),
logs_collector_id,
}
.into()
}

fn on_base_event(&mut self, ev: BaseBehaviourEvent) -> Option<ObserverEvent> {
let (peer_id, msg) = match ev {
BaseBehaviourEvent::BroadcastMsg {
peer_id,
msg: BroadcastMsg { msg: Some(msg) },
} => (peer_id, msg),
_ => return None,
};
match msg {
broadcast_msg::Msg::Ping(ping) => Some(ObserverEvent::Ping { peer_id, ping }),
broadcast_msg::Msg::LogsCollected(logs) if peer_id == self.logs_collector_id => {
Some(ObserverEvent::LogsCollected(logs))
}
_ => None,
}
}
}

impl BehaviourWrapper for ObserverBehaviour {
type Inner = Wrapped<BaseBehaviour>;
type Event = ObserverEvent;

fn inner(&mut self) -> &mut Self::Inner {
&mut self.base
}

fn on_inner_event(
&mut self,
ev: <Self::Inner as NetworkBehaviour>::ToSwarm,
) -> impl IntoIterator<Item = TToSwarm<Self>> {
self.on_base_event(ev).map(ToSwarm::GenerateEvent)
}
}

struct ObserverTransport {
swarm: Swarm<Wrapped<ObserverBehaviour>>,
events_tx: mpsc::Sender<ObserverEvent>,
}

impl ObserverTransport {
pub async fn run(mut self, cancel_token: CancellationToken) {
log::info!("Starting observer P2P transport");
loop {
tokio::select! {
_ = cancel_token.cancelled() => break,
ev = self.swarm.select_next_some() => self.on_swarm_event(ev),
}
}
log::info!("Shutting down observer P2P transport");
}

fn on_swarm_event(&mut self, ev: SwarmEvent<ObserverEvent>) {
log::trace!("Swarm event: {ev:?}");
record_event(&ev);
if let SwarmEvent::Behaviour(ev) = ev {
self.events_tx
.try_send(ev)
.unwrap_or_else(|e| log::error!("Observer event queue full. Event dropped: {e:?}"))
}
}
}

#[derive(Clone)]
pub struct ObserverTransportHandle {
_task_manager: Arc<TaskManager>,
}

impl ObserverTransportHandle {
fn new(transport: ObserverTransport, shutdown_timeout: Duration) -> Self {
let mut task_manager = TaskManager::new(shutdown_timeout);
task_manager.spawn(|c| transport.run(c));
Self {
_task_manager: Arc::new(task_manager),
}
}
}

pub fn start_transport(
swarm: Swarm<Wrapped<ObserverBehaviour>>,
config: ObserverConfig,
) -> (impl Stream<Item = ObserverEvent>, ObserverTransportHandle) {
let (events_tx, events_rx) = mpsc::channel(config.events_queue_size);
let transport = ObserverTransport { swarm, events_tx };
let handle = ObserverTransportHandle::new(transport, config.shutdown_timeout);
(ReceiverStream::new(events_rx), handle)
}
4 changes: 2 additions & 2 deletions transport/src/actors/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use subsquid_messages::{broadcast_msg, envelope, BroadcastMsg, Envelope, Ping, P

use crate::{
behaviour::{
base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE},
base::{BaseBehaviour, BaseBehaviourEvent},
request_client::{ClientBehaviour, ClientConfig, ClientEvent},
wrapped::{BehaviourWrapper, TToSwarm, Wrapped},
},
codec::ProtoCodec,
codec::{ProtoCodec, ACK_SIZE},
protocol::{MAX_PONG_SIZE, PONG_PROTOCOL},
record_event,
util::{TaskManager, DEFAULT_SHUTDOWN_TIMEOUT},
Expand Down
5 changes: 3 additions & 2 deletions transport/src/actors/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use subsquid_messages::{

use crate::{
behaviour::{
base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE},
base::{BaseBehaviour, BaseBehaviourEvent},
request_client::{ClientBehaviour, ClientConfig, ClientEvent},
request_server::{Request, ServerBehaviour},
wrapped::{BehaviourWrapper, TToSwarm, Wrapped},
},
codec::ProtoCodec,
codec::{ProtoCodec, ACK_SIZE},
protocol::{
MAX_PONG_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_SIZE, MAX_WORKER_LOGS_SIZE, PONG_PROTOCOL,
QUERY_PROTOCOL, WORKER_LOGS_PROTOCOL,
Expand Down Expand Up @@ -294,6 +294,7 @@ fn bundle_messages<T: prost::Message>(
.filter_map(move |msg| {
let msg_size = msg.encoded_len();
if msg_size > size_limit {
// TODO: Send oversized messages back as events, don't drop
log::warn!("Message too big ({msg_size} > {size_limit})");
return None;
}
Expand Down
8 changes: 3 additions & 5 deletions transport/src/behaviour/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use libp2p::{
StreamProtocol,
};
use libp2p_swarm_derive::NetworkBehaviour;
use prost::bytes::Buf;
use prost::Message;
use prost::{bytes::Buf, Message};
use serde::{Deserialize, Serialize};

use subsquid_messages::{
Expand All @@ -53,8 +52,6 @@ use crate::{
use crate::metrics::ONGOING_QUERIES;
use crate::protocol::{LEGACY_LOGS_TOPIC, LEGACY_PING_TOPIC};

pub const ACK_SIZE: u64 = 4;

#[derive(NetworkBehaviour)]
pub struct InnerBehaviour {
identify: identify::Behaviour,
Expand Down Expand Up @@ -98,6 +95,7 @@ pub struct BaseBehaviour {
probe_timeouts: FuturesMap<PeerId, ()>,
}

#[allow(dead_code)]
impl BaseBehaviour {
pub fn new(
keypair: &Keypair,
Expand Down Expand Up @@ -421,7 +419,6 @@ impl BaseBehaviour {
&mut self,
ev: request_response::Event<Vec<u8>, u8>,
) -> Option<TToSwarm<Self>> {
log::debug!("Request-Response event received: {ev:?}");
let (peer_id, msg_content, channel) = match ev {
request_response::Event::Message {
peer,
Expand All @@ -440,6 +437,7 @@ impl BaseBehaviour {
}
_ => return None,
};
log::debug!("Legacy message ({} bytes) received from {peer_id}", msg_content.len());

// Send minimal response to prevent errors being emitted on the sender side
_ = self.inner.legacy.send_response(channel, 1u8);
Expand Down
14 changes: 14 additions & 0 deletions transport/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ use crate::actors::logs_collector::{
self, LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent,
LogsCollectorTransportHandle,
};
#[cfg(feature = "observer")]
use crate::actors::observer::{
self, ObserverBehaviour, ObserverConfig, ObserverEvent, ObserverTransportHandle,
};
#[cfg(feature = "scheduler")]
use crate::actors::scheduler::{
self, SchedulerBehaviour, SchedulerConfig, SchedulerEvent, SchedulerTransportHandle,
Expand Down Expand Up @@ -203,6 +207,16 @@ impl P2PTransportBuilder {
Ok(logs_collector::start_transport(swarm, config))
}

#[cfg(feature = "observer")]
pub fn build_observer(
self,
config: ObserverConfig,
) -> Result<(impl Stream<Item = ObserverEvent>, ObserverTransportHandle), Error> {
let swarm =
self.build_swarm(|base| ObserverBehaviour::new(base, config.logs_collector_id))?;
Ok(observer::start_transport(swarm, config))
}

#[cfg(feature = "scheduler")]
pub fn build_scheduler(
self,
Expand Down
8 changes: 5 additions & 3 deletions transport/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod legacy;
mod proto;

pub use legacy::LegacyCodec;
pub use proto::ProtoCodec;

#[cfg(feature = "proto")]
mod proto;
#[cfg(feature = "proto")]
pub use proto::{ProtoCodec, ACK_SIZE};
1 change: 1 addition & 0 deletions transport/src/codec/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl<M: MsgContent> request_response::Codec for LegacyCodec<M> {
}
}

#[allow(dead_code)]
pub trait MsgContent: Sized + Send + Debug + 'static {
fn new(size: usize) -> Self;
fn as_slice(&self) -> &[u8];
Expand Down
2 changes: 2 additions & 0 deletions transport/src/codec/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use futures::{AsyncReadExt, AsyncWriteExt};
use libp2p::request_response;
use prost::Message;

pub const ACK_SIZE: u64 = 4;

pub struct ProtoCodec<Req, Res> {
_req: PhantomData<Req>,
_res: PhantomData<Res>,
Expand Down
4 changes: 4 additions & 0 deletions transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub use crate::actors::gateway::{
pub use crate::actors::logs_collector::{
LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent, LogsCollectorTransportHandle,
};
#[cfg(feature = "observer")]
pub use crate::actors::observer::{
ObserverBehaviour, ObserverConfig, ObserverEvent, ObserverTransportHandle,
};
#[cfg(feature = "scheduler")]
pub use crate::actors::scheduler::{
SchedulerBehaviour, SchedulerConfig, SchedulerEvent, SchedulerTransportHandle,
Expand Down
7 changes: 5 additions & 2 deletions transport/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use libp2p::identity::ed25519;
use libp2p::{identity::Keypair, multiaddr::Protocol, Multiaddr};
use libp2p::{
identity::{ed25519, Keypair},
multiaddr::Protocol,
Multiaddr,
};
use std::path::PathBuf;

mod task_manager;
Expand Down

0 comments on commit d7b05a2

Please sign in to comment.