diff --git a/messages/src/lib.rs b/messages/src/lib.rs index a1c8115..ef5f06c 100644 --- a/messages/src/lib.rs +++ b/messages/src/lib.rs @@ -1,6 +1,6 @@ -use std::fmt::{Debug, Formatter}; use std::{ collections::HashMap, + fmt::{Debug, Formatter}, ops::{Deref, DerefMut}, }; diff --git a/transport/Cargo.toml b/transport/Cargo.toml index 4aee63a..dc6f4de 100644 --- a/transport/Cargo.toml +++ b/transport/Cargo.toml @@ -38,10 +38,12 @@ subsquid-messages = { path = "../messages", features = ["signatures"] } [features] actors = [] +proto = [] request-client = [] request-server = [] -gateway = ["actors", "request-client"] -logs-collector = ["actors", "request-server"] -scheduler = ["actors", "request-client"] -worker = ["actors", "request-client", "request-server"] +gateway = ["actors", "request-client", "proto"] +logs-collector = ["actors", "request-server", "proto"] +observer = ["actors"] +scheduler = ["actors", "request-client", "proto"] +worker = ["actors", "request-client", "request-server", "proto"] metrics = ["libp2p/metrics", "prometheus-client"] diff --git a/transport/src/actors.rs b/transport/src/actors.rs index c153725..ec524dc 100644 --- a/transport/src/actors.rs +++ b/transport/src/actors.rs @@ -2,6 +2,8 @@ pub mod gateway; #[cfg(feature = "logs-collector")] pub mod logs_collector; +#[cfg(feature = "observer")] +pub mod observer; #[cfg(feature = "scheduler")] pub mod scheduler; #[cfg(feature = "worker")] diff --git a/transport/src/actors/gateway.rs b/transport/src/actors/gateway.rs index 0ffef12..37e9107 100644 --- a/transport/src/actors/gateway.rs +++ b/transport/src/actors/gateway.rs @@ -24,11 +24,11 @@ use subsquid_messages::{ use crate::{ behaviour::{ - base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE}, + base::{BaseBehaviour, BaseBehaviourEvent}, request_client::{ClientBehaviour, ClientConfig, ClientEvent}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::ProtoCodec, + codec::{ProtoCodec, ACK_SIZE}, protocol::{ GATEWAY_LOGS_PROTOCOL, MAX_GATEWAY_LOG_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_SIZE, QUERY_PROTOCOL, diff --git a/transport/src/actors/logs_collector.rs b/transport/src/actors/logs_collector.rs index b0c4483..2066b7a 100644 --- a/transport/src/actors/logs_collector.rs +++ b/transport/src/actors/logs_collector.rs @@ -19,11 +19,11 @@ use subsquid_messages::{ use crate::{ behaviour::{ - base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE}, + base::{BaseBehaviour, BaseBehaviourEvent}, request_server::{Request, ServerBehaviour}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::ProtoCodec, + codec::{ProtoCodec, ACK_SIZE}, protocol::{ GATEWAY_LOGS_PROTOCOL, MAX_GATEWAY_LOG_SIZE, MAX_WORKER_LOGS_SIZE, WORKER_LOGS_PROTOCOL, }, diff --git a/transport/src/actors/observer.rs b/transport/src/actors/observer.rs new file mode 100644 index 0000000..032e70c --- /dev/null +++ b/transport/src/actors/observer.rs @@ -0,0 +1,149 @@ +use std::{sync::Arc, time::Duration}; + +use futures::StreamExt; +use futures_core::Stream; +use libp2p::{ + swarm::{NetworkBehaviour, SwarmEvent, ToSwarm}, + PeerId, Swarm, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::CancellationToken; + +use subsquid_messages::{broadcast_msg, BroadcastMsg, LogsCollected, Ping}; + +use crate::{ + behaviour::{ + base::{BaseBehaviour, BaseBehaviourEvent}, + wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, + }, + record_event, + util::{TaskManager, DEFAULT_SHUTDOWN_TIMEOUT}, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ObserverEvent { + Ping { peer_id: PeerId, ping: Ping }, + LogsCollected(LogsCollected), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObserverConfig { + pub logs_collector_id: PeerId, + pub events_queue_size: usize, + pub shutdown_timeout: Duration, +} + +impl ObserverConfig { + pub fn new(logs_collector_id: PeerId) -> Self { + Self { + logs_collector_id, + events_queue_size: 100, + shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT, + } + } +} + +pub struct ObserverBehaviour { + base: Wrapped, + logs_collector_id: PeerId, +} + +impl ObserverBehaviour { + pub fn new(mut base: BaseBehaviour, logs_collector_id: PeerId) -> Wrapped { + base.subscribe_pings(); + base.subscribe_logs_collected(); + Self { + base: base.into(), + logs_collector_id, + } + .into() + } + + fn on_base_event(&mut self, ev: BaseBehaviourEvent) -> Option { + let (peer_id, msg) = match ev { + BaseBehaviourEvent::BroadcastMsg { + peer_id, + msg: BroadcastMsg { msg: Some(msg) }, + } => (peer_id, msg), + _ => return None, + }; + match msg { + broadcast_msg::Msg::Ping(ping) => Some(ObserverEvent::Ping { peer_id, ping }), + broadcast_msg::Msg::LogsCollected(logs) if peer_id == self.logs_collector_id => { + Some(ObserverEvent::LogsCollected(logs)) + } + _ => None, + } + } +} + +impl BehaviourWrapper for ObserverBehaviour { + type Inner = Wrapped; + type Event = ObserverEvent; + + fn inner(&mut self) -> &mut Self::Inner { + &mut self.base + } + + fn on_inner_event( + &mut self, + ev: ::ToSwarm, + ) -> impl IntoIterator> { + self.on_base_event(ev).map(ToSwarm::GenerateEvent) + } +} + +struct ObserverTransport { + swarm: Swarm>, + events_tx: mpsc::Sender, +} + +impl ObserverTransport { + pub async fn run(mut self, cancel_token: CancellationToken) { + log::info!("Starting observer P2P transport"); + loop { + tokio::select! { + _ = cancel_token.cancelled() => break, + ev = self.swarm.select_next_some() => self.on_swarm_event(ev), + } + } + log::info!("Shutting down observer P2P transport"); + } + + fn on_swarm_event(&mut self, ev: SwarmEvent) { + log::trace!("Swarm event: {ev:?}"); + record_event(&ev); + if let SwarmEvent::Behaviour(ev) = ev { + self.events_tx + .try_send(ev) + .unwrap_or_else(|e| log::error!("Observer event queue full. Event dropped: {e:?}")) + } + } +} + +#[derive(Clone)] +pub struct ObserverTransportHandle { + _task_manager: Arc, +} + +impl ObserverTransportHandle { + fn new(transport: ObserverTransport, shutdown_timeout: Duration) -> Self { + let mut task_manager = TaskManager::new(shutdown_timeout); + task_manager.spawn(|c| transport.run(c)); + Self { + _task_manager: Arc::new(task_manager), + } + } +} + +pub fn start_transport( + swarm: Swarm>, + config: ObserverConfig, +) -> (impl Stream, ObserverTransportHandle) { + let (events_tx, events_rx) = mpsc::channel(config.events_queue_size); + let transport = ObserverTransport { swarm, events_tx }; + let handle = ObserverTransportHandle::new(transport, config.shutdown_timeout); + (ReceiverStream::new(events_rx), handle) +} diff --git a/transport/src/actors/scheduler.rs b/transport/src/actors/scheduler.rs index 34d06bb..a719367 100644 --- a/transport/src/actors/scheduler.rs +++ b/transport/src/actors/scheduler.rs @@ -17,11 +17,11 @@ use subsquid_messages::{broadcast_msg, envelope, BroadcastMsg, Envelope, Ping, P use crate::{ behaviour::{ - base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE}, + base::{BaseBehaviour, BaseBehaviourEvent}, request_client::{ClientBehaviour, ClientConfig, ClientEvent}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::ProtoCodec, + codec::{ProtoCodec, ACK_SIZE}, protocol::{MAX_PONG_SIZE, PONG_PROTOCOL}, record_event, util::{TaskManager, DEFAULT_SHUTDOWN_TIMEOUT}, diff --git a/transport/src/actors/worker.rs b/transport/src/actors/worker.rs index 0492b81..63b92bc 100644 --- a/transport/src/actors/worker.rs +++ b/transport/src/actors/worker.rs @@ -25,12 +25,12 @@ use subsquid_messages::{ use crate::{ behaviour::{ - base::{BaseBehaviour, BaseBehaviourEvent, ACK_SIZE}, + base::{BaseBehaviour, BaseBehaviourEvent}, request_client::{ClientBehaviour, ClientConfig, ClientEvent}, request_server::{Request, ServerBehaviour}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::ProtoCodec, + codec::{ProtoCodec, ACK_SIZE}, protocol::{ MAX_PONG_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_SIZE, MAX_WORKER_LOGS_SIZE, PONG_PROTOCOL, QUERY_PROTOCOL, WORKER_LOGS_PROTOCOL, @@ -294,6 +294,7 @@ fn bundle_messages( .filter_map(move |msg| { let msg_size = msg.encoded_len(); if msg_size > size_limit { + // TODO: Send oversized messages back as events, don't drop log::warn!("Message too big ({msg_size} > {size_limit})"); return None; } diff --git a/transport/src/behaviour/base.rs b/transport/src/behaviour/base.rs index 18b7778..70e0d88 100644 --- a/transport/src/behaviour/base.rs +++ b/transport/src/behaviour/base.rs @@ -28,8 +28,7 @@ use libp2p::{ StreamProtocol, }; use libp2p_swarm_derive::NetworkBehaviour; -use prost::bytes::Buf; -use prost::Message; +use prost::{bytes::Buf, Message}; use serde::{Deserialize, Serialize}; use subsquid_messages::{ @@ -53,8 +52,6 @@ use crate::{ use crate::metrics::ONGOING_QUERIES; use crate::protocol::{LEGACY_LOGS_TOPIC, LEGACY_PING_TOPIC}; -pub const ACK_SIZE: u64 = 4; - #[derive(NetworkBehaviour)] pub struct InnerBehaviour { identify: identify::Behaviour, @@ -98,6 +95,7 @@ pub struct BaseBehaviour { probe_timeouts: FuturesMap, } +#[allow(dead_code)] impl BaseBehaviour { pub fn new( keypair: &Keypair, @@ -421,7 +419,6 @@ impl BaseBehaviour { &mut self, ev: request_response::Event, u8>, ) -> Option> { - log::debug!("Request-Response event received: {ev:?}"); let (peer_id, msg_content, channel) = match ev { request_response::Event::Message { peer, @@ -440,6 +437,7 @@ impl BaseBehaviour { } _ => return None, }; + log::debug!("Legacy message ({} bytes) received from {peer_id}", msg_content.len()); // Send minimal response to prevent errors being emitted on the sender side _ = self.inner.legacy.send_response(channel, 1u8); diff --git a/transport/src/builder.rs b/transport/src/builder.rs index e1454b4..1f736a4 100644 --- a/transport/src/builder.rs +++ b/transport/src/builder.rs @@ -25,6 +25,10 @@ use crate::actors::logs_collector::{ self, LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent, LogsCollectorTransportHandle, }; +#[cfg(feature = "observer")] +use crate::actors::observer::{ + self, ObserverBehaviour, ObserverConfig, ObserverEvent, ObserverTransportHandle, +}; #[cfg(feature = "scheduler")] use crate::actors::scheduler::{ self, SchedulerBehaviour, SchedulerConfig, SchedulerEvent, SchedulerTransportHandle, @@ -203,6 +207,16 @@ impl P2PTransportBuilder { Ok(logs_collector::start_transport(swarm, config)) } + #[cfg(feature = "observer")] + pub fn build_observer( + self, + config: ObserverConfig, + ) -> Result<(impl Stream, ObserverTransportHandle), Error> { + let swarm = + self.build_swarm(|base| ObserverBehaviour::new(base, config.logs_collector_id))?; + Ok(observer::start_transport(swarm, config)) + } + #[cfg(feature = "scheduler")] pub fn build_scheduler( self, diff --git a/transport/src/codec.rs b/transport/src/codec.rs index 612873e..285476d 100644 --- a/transport/src/codec.rs +++ b/transport/src/codec.rs @@ -1,5 +1,7 @@ mod legacy; -mod proto; - pub use legacy::LegacyCodec; -pub use proto::ProtoCodec; + +#[cfg(feature = "proto")] +mod proto; +#[cfg(feature = "proto")] +pub use proto::{ProtoCodec, ACK_SIZE}; diff --git a/transport/src/codec/legacy.rs b/transport/src/codec/legacy.rs index 29d36b0..9645f1d 100644 --- a/transport/src/codec/legacy.rs +++ b/transport/src/codec/legacy.rs @@ -98,6 +98,7 @@ impl request_response::Codec for LegacyCodec { } } +#[allow(dead_code)] pub trait MsgContent: Sized + Send + Debug + 'static { fn new(size: usize) -> Self; fn as_slice(&self) -> &[u8]; diff --git a/transport/src/codec/proto.rs b/transport/src/codec/proto.rs index 6694370..4ade9a2 100644 --- a/transport/src/codec/proto.rs +++ b/transport/src/codec/proto.rs @@ -5,6 +5,8 @@ use futures::{AsyncReadExt, AsyncWriteExt}; use libp2p::request_response; use prost::Message; +pub const ACK_SIZE: u64 = 4; + pub struct ProtoCodec { _req: PhantomData, _res: PhantomData, diff --git a/transport/src/lib.rs b/transport/src/lib.rs index 1653717..d49371a 100644 --- a/transport/src/lib.rs +++ b/transport/src/lib.rs @@ -35,6 +35,10 @@ pub use crate::actors::gateway::{ pub use crate::actors::logs_collector::{ LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent, LogsCollectorTransportHandle, }; +#[cfg(feature = "observer")] +pub use crate::actors::observer::{ + ObserverBehaviour, ObserverConfig, ObserverEvent, ObserverTransportHandle, +}; #[cfg(feature = "scheduler")] pub use crate::actors::scheduler::{ SchedulerBehaviour, SchedulerConfig, SchedulerEvent, SchedulerTransportHandle, diff --git a/transport/src/util.rs b/transport/src/util.rs index e5437f6..72fee18 100644 --- a/transport/src/util.rs +++ b/transport/src/util.rs @@ -1,5 +1,8 @@ -use libp2p::identity::ed25519; -use libp2p::{identity::Keypair, multiaddr::Protocol, Multiaddr}; +use libp2p::{ + identity::{ed25519, Keypair}, + multiaddr::Protocol, + Multiaddr, +}; use std::path::PathBuf; mod task_manager;