Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate replication manager with networking stack #387

Merged
merged 126 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
d3bc9e7
Use SyncMessage in replication network behaviour
sandreae May 22, 2023
2d2c05c
Use target set in sync request
sandreae May 22, 2023
d812c06
Convert integer to Mode
sandreae May 22, 2023
3c2d696
Add replication to main behaviour struct
adzialocha May 22, 2023
9e522a2
Add SyncManager to replication behaviour
adzialocha May 23, 2023
e73c3a7
Add schema provider to behaviour
adzialocha May 23, 2023
3bd31c4
Move mananger again out of network behaviour, add replication service
adzialocha May 23, 2023
abdaa74
Introduce event loop to handle swarm and channel events
adzialocha May 23, 2023
3c11663
Add new service message types to enum
adzialocha May 24, 2023
9a67a46
Better method name and structure for event loop
adzialocha May 24, 2023
26d6445
Send and receive service messages on new or closed connections and re…
adzialocha May 24, 2023
b0e44eb
Have peer id on network config struct
adzialocha May 24, 2023
e7a9bab
Introduce connection manager in replication service
adzialocha May 24, 2023
93ae241
Prepare methods for finished or failing sessions
adzialocha May 24, 2023
ffd2d09
Add and remove peers in connection manager
adzialocha May 24, 2023
a113ac4
Count failed and successful sessions
adzialocha May 24, 2023
448e8f1
Initiate replication with peers
adzialocha May 24, 2023
2e12f3a
Add some basic logging
adzialocha May 24, 2023
00a32ba
Do not override with default when building config in cli
adzialocha May 24, 2023
f89be7e
Fix checking only for certain messages in async loop
adzialocha May 24, 2023
2f255e4
Clippy happy, developer happy
adzialocha May 24, 2023
414b866
Make Domain error in IngestError transparent
sandreae May 25, 2023
f60b1b2
Add logging for replication entry exchange
sandreae May 25, 2023
d561290
Sort system schema to the front of TargetSet
sandreae May 25, 2023
3d7bd86
Refactor log height diff logic
sandreae May 25, 2023
df90bbb
Don't diff over schema sub-range of target set
sandreae May 25, 2023
7214083
Introduce DuplicateSessionRequestError
sandreae May 26, 2023
f96865a
More logging and use new error type
sandreae May 26, 2023
34de908
Logging for dropping and re-initiating duplicate session requests
sandreae May 26, 2023
18f7e48
Log when re-initiating session with peer
sandreae May 26, 2023
1b350b6
Fix issue when calculating local log heights
sandreae May 26, 2023
99ad0a8
More logging in manager
sandreae May 26, 2023
4e0aacf
Improve logging message
sandreae May 26, 2023
c0b6816
Fix diff test
sandreae May 26, 2023
20e2112
Correct expect error message
sandreae May 26, 2023
22e391e
Ignore duplicate inbound sync requests
sandreae May 27, 2023
5671844
Add messaging diagram to lifetime test
sandreae May 28, 2023
c482b18
Logging in behaviour
sandreae May 28, 2023
7db4f1c
Remove re-initiating dropped duplicate sessions if they had a differe…
sandreae May 29, 2023
b035f9a
Diagram for sync lifetime test
sandreae May 29, 2023
af82212
Test for concurrent sync request handling
sandreae May 29, 2023
6270743
Remove duplicate diagram
sandreae May 29, 2023
d7e4c22
Make random target set include more
sandreae May 29, 2023
4fe8592
Small logging and improved comments
sandreae May 29, 2023
7ac01ee
Elegantly handle concurrent session requests with duplicate target set
sandreae May 29, 2023
a2b9493
Correct validation of TargetSet
sandreae May 29, 2023
ff8668f
Better naming in TargetSet fixture
sandreae May 29, 2023
897bb67
Update tests
sandreae May 29, 2023
461e676
Order log heights in Have message
sandreae May 29, 2023
2f842f3
Implement Human on Message and SyncMessage
sandreae May 29, 2023
9a6b37d
Some work on logging
sandreae May 29, 2023
2194cd6
Fix remote log height logging
sandreae May 29, 2023
848dd5e
fmt
sandreae May 29, 2023
c3d4c0e
Remove all sessions for a peer on replication error
sandreae May 30, 2023
b93efd5
Add error logging to handler
sandreae May 30, 2023
c632fd1
Add ConnectionId to peer identifier in replication service
sandreae May 30, 2023
9364ccb
Doc string for PeerConnectionIdentifier
sandreae May 30, 2023
286bad9
Add comment to PeerConnectionId defaults
sandreae May 30, 2023
f87af83
Add (very) basic replication scheduler
sandreae May 30, 2023
e5bf152
Refactor replication behaviour event triggering
sandreae May 31, 2023
b6d139b
Temp fix for UNIQUE
sandreae May 31, 2023
f7c7cda
Send SyncMessages to one handler by ConnectionId
sandreae May 31, 2023
afd24a5
Maintain list of peers and all their connections on ConnectionManager
sandreae May 31, 2023
6183ca7
Remove connection from ConnectionManager when swarm issues Connection…
sandreae May 31, 2023
f3c3f6c
Refactor ConnectionEstablished messaging in replication behaviour
sandreae May 31, 2023
86222a4
Improve error handling and logging
sandreae May 31, 2023
cd7629f
Update api in behaviour network tests
sandreae May 31, 2023
5152ff6
Error logging in replication connection handler
sandreae May 31, 2023
d620053
Cargo clippy
sandreae May 31, 2023
91ceb9e
fmt
sandreae May 31, 2023
c5373ef
More tests for TargetSet validation
sandreae May 31, 2023
8510d15
Only identify peers by their PeerId (not ConnectionId) in replication…
sandreae Jun 2, 2023
8512680
Rename ConnectionEstablished to PeerConnected etc..
sandreae Jun 2, 2023
8dad289
Poll ticking stream for scheduling replication
adzialocha Jun 3, 2023
58a024e
Dynamically retrieve target set when starting replication
adzialocha Jun 3, 2023
fc5d45b
Add some more doc strings
adzialocha Jun 3, 2023
7bb4cb2
Fix formatting
adzialocha Jun 3, 2023
e550d53
Fix missing peer id in e2e test
adzialocha Jun 3, 2023
bbc6140
Remove unnecessary type casting in entry SQL
adzialocha Jun 3, 2023
996f95e
Give error logging more context
adzialocha Jun 3, 2023
2969a91
Fix SQL query by making seq_num IN values a string
adzialocha Jun 3, 2023
a48bce7
Try different string literal
adzialocha Jun 3, 2023
19bf865
Use IntervalStream from tokio for scheduler
adzialocha Jun 3, 2023
722a4b3
Add doc strings
adzialocha Jun 3, 2023
ba05484
Fix filtering active sessions logic
adzialocha Jun 3, 2023
84587f2
Update comments
adzialocha Jun 4, 2023
8d24268
Remove repeating debug log
adzialocha Jun 4, 2023
5042593
Re-initiate dropped session if its concerning a different target set
adzialocha Jun 4, 2023
79ceacc
Allow max 3 sessions per peer and max one for the same target set
adzialocha Jun 4, 2023
c7655f4
Update test and fix bug in re-initiating session logic
adzialocha Jun 4, 2023
a3f4b89
Correct diagram
adzialocha Jun 4, 2023
ec8bdd2
Inform connection handler about replication errors, introduce timeout
adzialocha Jun 4, 2023
a5fc932
Close all connection handlers on critical errors
adzialocha Jun 4, 2023
f7b894e
Fix import style
adzialocha Jun 5, 2023
7f24e41
Fix import style
adzialocha Jun 5, 2023
dbb6546
Remove no longer relevant log message
sandreae Jun 5, 2023
445d1aa
Stop dialing peer after one address dialed successfully
sandreae Jun 8, 2023
2b904f5
Only accept one inbound and one outbound connection per peer
sandreae Jun 8, 2023
58275db
fmt x clippy
sandreae Jun 8, 2023
759aaee
Use libp2p from git main
sandreae Jun 9, 2023
24d67ed
Add network info logging on incoming connection errors
sandreae Jun 9, 2023
e4a3e75
Revert
adzialocha Jun 12, 2023
2965230
Make clippy happy
adzialocha Jun 12, 2023
7d95942
Do never actively close connections
adzialocha Jun 12, 2023
c47cfee
Remove dead code
adzialocha Jun 12, 2023
4b2885c
Check more often when using ping and mDNS discovery
adzialocha Jun 16, 2023
fdf4f4a
Close replication session on all errors
adzialocha Jun 16, 2023
4682e18
Better error logging
adzialocha Jun 16, 2023
0c3dde9
Fix issue where outbound streams could not be re-established after error
adzialocha Jun 16, 2023
bfd9def
Add behaviour logic which always uses latest healthy connection
adzialocha Jun 16, 2023
891892f
Rename to peers behaviour
adzialocha Jun 16, 2023
94378ee
Make clippy happy
adzialocha Jun 16, 2023
45a5976
Add entry to CHANGELOG.md
adzialocha Jun 16, 2023
a2c28da
Use connection ids to identify peers
adzialocha Jun 18, 2023
158ca76
Clean up logging a little bit
adzialocha Jun 18, 2023
bca3e16
A little bit less verbose logging
adzialocha Jun 18, 2023
c8e6f04
Fix tests
adzialocha Jun 18, 2023
b4a77cc
Add a test for connection manager
adzialocha Jun 18, 2023
908e4fe
Write some more doc-strings
adzialocha Jun 18, 2023
4461a19
Add more docs
adzialocha Jun 18, 2023
e3daade
Disconnect from all peers before shutdown
adzialocha Jun 18, 2023
0215c62
Dial peers by multiaddr on mdns discovery
sandreae Jun 19, 2023
1bcba23
Rename Naive -> LogHeight strategy
sandreae Jun 19, 2023
9091145
Naming improvement
sandreae Jun 19, 2023
d6ea4f4
Doc strings
sandreae Jun 19, 2023
462c4ff
fmt
sandreae Jun 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tokio = { version = "1.25.0", features = [
"sync",
"time",
] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tower-http = { version = "0.3.4", default-features = false, features = [
"cors",
] }
Expand Down
14 changes: 14 additions & 0 deletions aquadoggo/src/bus.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use libp2p::PeerId;
use p2panda_rs::operation::OperationId;

use crate::manager::Sender;
use crate::replication::SyncMessage;

/// Sender for cross-service communication bus.
pub type ServiceSender = Sender<ServiceMessage>;
Expand All @@ -12,4 +14,16 @@ pub type ServiceSender = Sender<ServiceMessage>;
pub enum ServiceMessage {
/// A new operation arrived at the node.
NewOperation(OperationId),

/// Node established a bi-directional connection to another node.
ConnectionEstablished(PeerId),

/// Node closed a connection to another node.
ConnectionClosed(PeerId),

/// Node sent a message to remote node for replication.
SentReplicationMessage(PeerId, SyncMessage),

/// Node received a message from remote node for replication.
ReceivedReplicationMessage(PeerId, SyncMessage),
}
4 changes: 4 additions & 0 deletions aquadoggo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ impl Configuration {
}
};

// Derive peer id from key pair
let key_pair = NetworkConfiguration::load_or_generate_key_pair(config.base_path.clone())?;
config.network.set_peer_id(&key_pair.public());

Ok(config)
}
}
46 changes: 24 additions & 22 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,30 @@ pub async fn materializer_service(

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
operation_id
)
}) {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
operation_id
)
}) {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions aquadoggo/src/network/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous
use log::debug;

use crate::network::config::NODE_NAMESPACE;
use crate::network::replication;
use crate::network::NetworkConfiguration;

/// Network behaviour for the aquadoggo node.
Expand Down Expand Up @@ -42,9 +43,11 @@ pub struct Behaviour {
/// Register with a rendezvous server and query remote peer addresses.
pub rendezvous_client: Toggle<rendezvous::client::Behaviour>,

/// Serve as a rendezvous point for remote peers to register their external addresses
/// and query the addresses of other peers.
/// Serve as a rendezvous point for remote peers to register their external addresses and query
/// the addresses of other peers.
pub rendezvous_server: Toggle<rendezvous::server::Behaviour>,

pub replication: replication::Behaviour,
}

impl Behaviour {
Expand Down Expand Up @@ -103,6 +106,7 @@ impl Behaviour {
// address has been provided
let rendezvous_client = if network_config.rendezvous_address.is_some() {
debug!("Rendezvous client network behaviour enabled");
// @TODO: Why does this need the whole key pair?!
Some(rendezvous::client::Behaviour::new(key_pair))
} else {
None
Expand Down Expand Up @@ -132,6 +136,8 @@ impl Behaviour {
None
};

let replication = replication::Behaviour::new();

Ok(Self {
autonat: autonat.into(),
identify: identify.into(),
Expand All @@ -142,6 +148,7 @@ impl Behaviour {
rendezvous_server: rendezvous_server.into(),
relay_client: relay_client.into(),
relay_server: relay_server.into(),
replication,
})
}
}
19 changes: 14 additions & 5 deletions aquadoggo/src/network/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use anyhow::Result;
use libp2p::connection_limits::ConnectionLimits;
use libp2p::identity::Keypair;
use libp2p::identity::{Keypair, PublicKey};
use libp2p::{Multiaddr, PeerId};
use log::info;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -72,8 +72,8 @@ pub struct NetworkConfiguration {

/// Ping behaviour enabled.
///
/// Send outbound pings to connected peers every 15 seconds and respond to inbound pings.
/// Every sent ping must yield a response within 20 seconds in order to be successful.
/// Send outbound pings to connected peers every 15 seconds and respond to inbound pings. Every
/// sent ping must yield a response within 20 seconds in order to be successful.
pub ping: bool,

/// QUIC transport port.
Expand Down Expand Up @@ -103,6 +103,9 @@ pub struct NetworkConfiguration {
///
/// Serve as a rendezvous point for peer discovery, allowing peer registration and queries.
pub rendezvous_server_enabled: bool,

/// Our local peer id.
pub peer_id: Option<PeerId>,
}

impl Default for NetworkConfiguration {
Expand All @@ -127,11 +130,17 @@ impl Default for NetworkConfiguration {
rendezvous_address: None,
rendezvous_peer_id: None,
rendezvous_server_enabled: false,
peer_id: None,
}
}
}

impl NetworkConfiguration {
/// Derive peer id from a given public key.
pub fn set_peer_id(&mut self, public_key: &PublicKey) {
self.peer_id = Some(PeerId::from_public_key(public_key));
}

/// Define the connection limits of the swarm.
pub fn connection_limits(&self) -> ConnectionLimits {
ConnectionLimits::default()
Expand All @@ -144,8 +153,8 @@ impl NetworkConfiguration {

/// Load the key pair from the file at the specified path.
///
/// If the file does not exist, a random key pair is generated and saved.
/// If no path is specified, a random key pair is generated.
/// If the file does not exist, a random key pair is generated and saved. If no path is
/// specified, a random key pair is generated.
pub fn load_or_generate_key_pair(path: Option<PathBuf>) -> Result<Keypair> {
let key_pair = match path {
Some(mut path) => {
Expand Down
4 changes: 1 addition & 3 deletions aquadoggo/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
mod behaviour;
mod config;
mod identity;
mod replication;
mod service;
mod swarm;
mod transport;
// @TODO: Remove this as soon as we integrated it into the libp2p swarm
#[allow(dead_code)]
mod replication;

pub use config::NetworkConfiguration;
pub use service::network_service;
60 changes: 34 additions & 26 deletions aquadoggo/src/network/replication/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ use libp2p::swarm::{
use libp2p::{Multiaddr, PeerId};

use crate::network::replication::handler::{Handler, HandlerInEvent, HandlerOutEvent};
use crate::network::replication::protocol::Message;
use crate::replication::SyncMessage;

#[derive(Debug)]
pub enum BehaviourOutEvent {
MessageReceived(PeerId, Message),
Error,
pub enum Event {
MessageReceived(PeerId, SyncMessage),
}

#[derive(Debug)]
pub struct Behaviour {
events: VecDeque<ToSwarm<BehaviourOutEvent, HandlerInEvent>>,
events: VecDeque<ToSwarm<Event, HandlerInEvent>>,
}

impl Behaviour {
Expand All @@ -33,18 +32,17 @@ impl Behaviour {
}

impl Behaviour {
fn send_message(&mut self, peer_id: PeerId, message: Message) {
pub fn send_message(&mut self, peer_id: PeerId, message: SyncMessage) {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerInEvent::Message(message),
handler: NotifyHandler::Any,
});
}

fn handle_received_message(&mut self, peer_id: &PeerId, message: Message) {
// @TODO: Handle incoming messages
fn handle_received_message(&mut self, peer_id: &PeerId, message: SyncMessage) {
self.events
.push_back(ToSwarm::GenerateEvent(BehaviourOutEvent::MessageReceived(
.push_back(ToSwarm::GenerateEvent(Event::MessageReceived(
*peer_id, message,
)));
}
Expand All @@ -53,7 +51,7 @@ impl Behaviour {
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;

type OutEvent = BehaviourOutEvent;
type OutEvent = Event;

fn handle_established_inbound_connection(
&mut self,
Expand Down Expand Up @@ -124,17 +122,18 @@ mod tests {
use libp2p::swarm::{keep_alive, Swarm};
use libp2p_swarm_test::SwarmExt;

use crate::network::replication::Message;
use crate::replication::{Message, SyncMessage, TargetSet};

use super::{Behaviour as ReplicationBehaviour, BehaviourOutEvent};
use super::{Behaviour as ReplicationBehaviour, Event};

#[tokio::test]
async fn peers_connect() {
// Create two swarms
let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new());
let mut swarm2 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new());

// Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection.
// Listen on swarm1 and connect from swarm2, this should establish a bi-directional
// connection.
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;

Expand Down Expand Up @@ -189,9 +188,10 @@ mod tests {
assert_eq!(info2.connection_counters().num_established(), 1);

// Send a message from to swarm1 local peer from swarm2 local peer.
swarm1
.behaviour_mut()
.send_message(swarm2_peer_id, Message::Dummy(0));
swarm1.behaviour_mut().send_message(
swarm2_peer_id,
SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))),
);

// Await a swarm event on swarm2.
//
Expand Down Expand Up @@ -220,20 +220,22 @@ mod tests {
let swarm2_peer_id = *swarm2.local_peer_id();

// Send a message from to swarm1 local peer from swarm2 local peer.
swarm1
.behaviour_mut()
.send_message(swarm2_peer_id, Message::Dummy(0));
swarm1.behaviour_mut().send_message(
swarm2_peer_id,
SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))),
);

// Send a message from to swarm2 local peer from swarm1 local peer.
swarm2
.behaviour_mut()
.send_message(swarm1_peer_id, Message::Dummy(1));
swarm2.behaviour_mut().send_message(
swarm1_peer_id,
SyncMessage::new(1, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))),
);

// Collect the next 2 behaviour events which occur in either swarms.
for _ in 0..2 {
tokio::select! {
BehaviourOutEvent::MessageReceived(peer_id, message) = swarm1.next_behaviour_event() => res1.push((peer_id, message)),
BehaviourOutEvent::MessageReceived(peer_id, message) = swarm2.next_behaviour_event() => res2.push((peer_id, message)),
Event::MessageReceived(peer_id, message) = swarm1.next_behaviour_event() => res1.push((peer_id, message)),
Event::MessageReceived(peer_id, message) = swarm2.next_behaviour_event() => res2.push((peer_id, message)),
}
}

Expand All @@ -244,11 +246,17 @@ mod tests {
// swarm1 should have received the message from swarm2 peer.
let (peer_id, message) = &res1[0];
assert_eq!(peer_id, &swarm2_peer_id);
assert_eq!(message, &Message::Dummy(1));
assert_eq!(
message,
&SyncMessage::new(1, Message::SyncRequest(0.into(), TargetSet::new(&vec![])))
);

// swarm2 should have received the message from swarm1 peer.
let (peer_id, message) = &res2[0];
assert_eq!(peer_id, &swarm1_peer_id);
assert_eq!(message, &Message::Dummy(0));
assert_eq!(
message,
&SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![])))
);
}
}
Loading