From 7fdb4bc5a8c17cffa54e0cf04a9075534f7af959 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 12:32:14 +0100 Subject: [PATCH 01/14] initial impl --- agdb_server/src/main.rs | 1 + agdb_server/src/raft.rs | 746 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 747 insertions(+) create mode 100644 agdb_server/src/raft.rs diff --git a/agdb_server/src/main.rs b/agdb_server/src/main.rs index 683d5fa33..f7dd3124a 100644 --- a/agdb_server/src/main.rs +++ b/agdb_server/src/main.rs @@ -6,6 +6,7 @@ mod db_pool; mod error_code; mod logger; mod password; +mod raft; mod routes; mod server_db; mod server_error; diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs new file mode 100644 index 000000000..5a7701dd0 --- /dev/null +++ b/agdb_server/src/raft.rs @@ -0,0 +1,746 @@ +use std::time::Duration; +use std::time::Instant; + +const VERSION: u64 = 1; + +#[derive(Debug, Clone)] +pub struct Log { + index: u64, + term: u64, + data: Vec, +} + +#[derive(Debug)] +pub struct MismatchedValues { + actual: Option, + expected: Option, +} + +#[derive(Debug)] +pub struct LogMismatch { + index: MismatchedValues, + term: MismatchedValues, + commit: MismatchedValues, +} + +#[derive(Debug)] +pub enum RequestType { + Append(Vec), + Commit(u64), + Heartbeat, + Vote, +} + +#[derive(Debug)] +pub enum ResponseType { + Ok, + ClusterMismatch(MismatchedValues), + LeaderMismatch(MismatchedValues), + TermMismatch(MismatchedValues), + LogMismatch(LogMismatch), + AlreadyVoted(MismatchedValues), +} + +#[derive(Debug)] +enum ClusterState { + Candidate, + Election, + Follower(u64), + Leader, + Voted(u64), +} + +#[derive(Debug)] +pub struct Request { + version: u64, + hash: u64, + index: u64, + target: u64, + term: u64, + log_index: u64, + log_term: u64, + log_commit: u64, + data: RequestType, +} + +#[derive(Debug)] +pub struct Response { + version: u64, + target: u64, + result: ResponseType, +} + +pub trait Storage { + fn append(&mut self, log: Log); + fn commit(&mut self, index: u64); + fn current_index(&self) -> u64; + fn current_term(&self) -> u64; + fn current_commit(&self) -> u64; + fn logs(&self, index: u64, term: u64) -> Vec; +} + +struct Node { + index: u64, + timer: Instant, +} + +pub struct Cluster { + storage: S, + nodes: Vec, + state: ClusterState, + hash: u64, + size: u64, + index: u64, + term: u64, + votes: u64, + log_index: u64, + log_term: u64, + log_commit: u64, + timer: Instant, + election_timeout: Duration, + heartbeat_timeout: Duration, + term_timeout: Duration, +} + +pub struct ClusterSettings { + pub index: u64, + pub size: u64, + pub hash: u64, + pub election_factor: u64, + pub heartbeat_timeout: Duration, + pub term_timeout: Duration, +} + +impl Cluster { + pub fn new(storage: S, settings: ClusterSettings) -> Self { + let nodes = (0..settings.size) + .map(|i| Node { + index: i, + timer: Instant::now(), + }) + .collect(); + + Self { + state: ClusterState::Election, + nodes, + hash: settings.hash, + size: settings.size, + index: settings.index, + term: 0, + votes: 0, + log_index: storage.current_index(), + log_term: storage.current_term(), + log_commit: storage.current_commit(), + timer: Instant::now(), + election_timeout: Duration::from_secs(settings.election_factor * settings.index), + heartbeat_timeout: settings.heartbeat_timeout, + term_timeout: settings.term_timeout, + storage, + } + } + + pub fn append(&mut self, log: Vec) -> Vec { + self.log_index += 1; + self.log_term = self.term; + let log = Log { + index: self.log_index, + term: self.log_term, + data: log, + }; + let requests = self + .nodes + .iter() + .filter_map(|node| { + if node.index != self.index { + Some(Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: node.index, + term: self.term, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Append(vec![log.clone()]), + }) + } else { + None + } + }) + .collect(); + self.storage.append(log); + requests + } + + pub fn process(&mut self) -> Option> { + if let ClusterState::Leader = self.state { + let requests = self.heartbeat(false); + self.timer = Instant::now(); + if requests.is_empty() { + return None; + } else { + return Some(requests); + } + } else { + if self.timer.elapsed() > self.term_timeout { + self.state = ClusterState::Election; + self.timer = Instant::now(); + } + + if let ClusterState::Election = self.state { + if self.timer.elapsed() >= self.election_timeout { + let requests = self.election(); + self.timer = Instant::now(); + return Some(requests); + } + } + } + + None + } + + pub fn request(&mut self, request: &Request) -> Response { + println!("[{}] {:?} {:?}", self.index, self.state, request); + + let response = match request.data { + RequestType::Append(ref logs) => self.append_request(request, logs), + RequestType::Commit(index) => self.commit_request(request, index), + RequestType::Heartbeat => self.heartbeat_request(request), + RequestType::Vote => self.vote_request(request), + }; + + self.timer = Instant::now(); + + match response { + Ok(response) => response, + Err(response) => response, + } + } + + pub fn response(&mut self, request: &Request, response: &Response) -> Option> { + use ClusterState::*; + use RequestType::*; + use ResponseType::*; + + println!( + "[{}] {:?} {:?} -> {:?}", + self.index, self.state, request, response + ); + + match (&self.state, &request.data, &response.result) { + (Candidate, Vote, Ok) => self.vote_ok(), + (Leader, Heartbeat, Ok) => { + self.nodes[request.target as usize].timer = Instant::now(); + None + } + (Leader, Heartbeat | Append(_) | Commit(_), LogMismatch(values)) => { + let logs = self.storage.logs( + values.index.actual.unwrap_or_default(), + values.term.actual.unwrap_or_default(), + ); + self.nodes[request.target as usize].timer = Instant::now(); + Some(vec![Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: request.target, + term: self.term, + log_index: self.index, + log_term: self.term, + log_commit: self.log_commit, + data: Append(logs), + }]) + } + _ => None, + } + } + + fn election(&mut self) -> Vec { + println!("[{}] Election", self.index); + self.state = ClusterState::Candidate; + self.votes = 1; + self.nodes + .iter() + .filter_map(|node| { + if node.index != self.index { + Some(Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: node.index, + term: self.term + 1, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Vote, + }) + } else { + None + } + }) + .collect() + } + + fn append_request(&mut self, request: &Request, logs: &[Log]) -> Result { + self.validate_hash(request)?; + self.validate_leader(request)?; + self.validate_current_term(request)?; + + for log in logs { + self.validate_log_commit(request, log)?; + self.append_storage(log); + } + + Self::ok(request) + } + fn append_storage(&mut self, log: &Log) { + self.log_index = log.index; + self.log_term = log.term; + self.storage.append(log.clone()); + } + + fn commit(&mut self, index: u64) { + self.log_commit = index; + self.storage.commit(index); + } + + fn commit_request(&mut self, request: &Request, index: u64) -> Result { + self.validate_hash(request)?; + self.validate_leader(request)?; + self.validate_current_term(request)?; + + if self.log_commit < index { + self.commit(index); + } + + Self::ok(request) + } + + fn heartbeat(&mut self, forced: bool) -> Vec { + self.nodes + .iter() + .filter_map(|node| { + if forced || node.timer.elapsed() > self.heartbeat_timeout { + Some(Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: node.index, + term: self.term, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Heartbeat, + }) + } else { + None + } + }) + .collect() + } + + fn heartbeat_request(&mut self, request: &Request) -> Result { + self.validate_hash(request)?; + + if request.term < self.term { + self.term = request.term; + self.state = ClusterState::Follower(request.index); + } + + self.validate_log(request)?; + + if request.log_commit > self.log_commit { + self.commit(request.log_commit); + } + + Self::ok(request) + } + + fn vote_ok(&mut self) -> Option> { + self.votes += 1; + + if self.votes > self.size / 2 { + self.state = ClusterState::Leader; + let requests = self.heartbeat(true); + if requests.is_empty() { + return None; + } else { + return Some(requests); + } + } + + None + } + + fn vote_request(&mut self, request: &Request) -> Result { + self.validate_hash(request)?; + self.validate_no_leader(request)?; + self.validate_term(request)?; + self.validate_log_for_vote(request)?; + self.state = ClusterState::Voted(request.index); + Self::ok(request) + } + + fn validate_hash(&self, request: &Request) -> Result<(), Response> { + if request.hash != self.hash { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::ClusterMismatch(MismatchedValues { + actual: Some(self.hash), + expected: Some(request.hash), + }), + }); + } + + Ok(()) + } + + fn validate_no_leader(&self, request: &Request) -> Result<(), Response> { + match self.state { + ClusterState::Leader | ClusterState::Candidate => Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LeaderMismatch(MismatchedValues { + actual: Some(self.index), + expected: None, + }), + }), + ClusterState::Follower(leader) => Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LeaderMismatch(MismatchedValues { + actual: Some(leader), + expected: None, + }), + }), + _ => Ok(()), + } + } + + fn validate_leader(&self, request: &Request) -> Result<(), Response> { + match self.state { + ClusterState::Follower(leader) if request.index == leader => Ok(()), + ClusterState::Follower(leader) => Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LeaderMismatch(MismatchedValues { + actual: Some(leader), + expected: Some(request.index), + }), + }), + _ => Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LeaderMismatch(MismatchedValues { + actual: None, + expected: Some(request.index), + }), + }), + } + } + + fn validate_log(&self, request: &Request) -> Result<(), Response> { + if request.log_index != self.log_index || request.log_term != self.log_term { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LogMismatch(LogMismatch { + index: MismatchedValues { + actual: Some(self.log_index), + expected: Some(request.log_index), + }, + term: MismatchedValues { + actual: Some(self.log_term), + expected: Some(request.log_term), + }, + commit: MismatchedValues { + actual: Some(self.log_commit), + expected: Some(request.log_commit), + }, + }), + }); + } + + Ok(()) + } + + fn validate_log_commit(&self, request: &Request, log: &Log) -> Result<(), Response> { + if log.index < self.log_commit { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LogMismatch(LogMismatch { + index: MismatchedValues { + actual: Some(self.log_index), + expected: Some(request.log_index), + }, + term: MismatchedValues { + actual: Some(self.log_term), + expected: Some(request.log_term), + }, + commit: MismatchedValues { + actual: Some(self.log_commit), + expected: Some(request.log_commit), + }, + }), + }); + } + + Ok(()) + } + + fn validate_log_for_vote(&self, request: &Request) -> Result<(), Response> { + if request.log_index < self.log_index + || request.log_term < self.log_term + || request.log_commit < self.log_commit + { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::LogMismatch(LogMismatch { + index: MismatchedValues { + actual: Some(self.log_index), + expected: Some(request.log_index), + }, + term: MismatchedValues { + actual: Some(self.log_term), + expected: Some(request.log_term), + }, + commit: MismatchedValues { + actual: Some(self.log_commit), + expected: Some(request.log_commit), + }, + }), + }); + } + + Ok(()) + } + + fn validate_current_term(&self, request: &Request) -> Result<(), Response> { + if request.term != self.term { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::TermMismatch(MismatchedValues { + actual: Some(self.term), + expected: Some(request.term), + }), + }); + } + + Ok(()) + } + + fn validate_term(&self, request: &Request) -> Result<(), Response> { + if request.term <= self.term { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::TermMismatch(MismatchedValues { + actual: Some(self.term), + expected: Some(request.term), + }), + }); + } + + Ok(()) + } + + fn ok(request: &Request) -> Result { + Ok(Response { + version: VERSION, + target: request.index, + result: ResponseType::Ok, + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering; + use std::sync::Arc; + use tokio::sync::RwLock; + use tokio::task::JoinHandle; + + struct TestStorage { + logs: Vec, + commit: u64, + } + + type TestNode = Arc>>; + + struct TestCluster { + nodes: Arc>>, + tasks: Vec>>, + shutdown: Arc, + messages: Arc>>, + } + + impl Storage for TestStorage { + fn append(&mut self, log: Log) { + self.logs.truncate(log.index as usize); + self.logs.push(log); + } + + fn commit(&mut self, index: u64) { + self.commit = index; + } + + fn current_index(&self) -> u64 { + self.logs.len() as u64 + } + + fn current_term(&self) -> u64 { + self.logs.last().map(|log| log.term).unwrap_or(0) + } + + fn current_commit(&self) -> u64 { + self.commit + } + + fn logs(&self, index: u64, _term: u64) -> Vec { + self.logs[index as usize..].to_vec() + } + } + + impl TestCluster { + fn new(size: u64) -> Self { + let nodes = (0..size) + .map(|index| { + let storage = TestStorage { + logs: Vec::new(), + commit: 0, + }; + let settings = ClusterSettings { + index, + size, + hash: 123, + election_factor: 1, + heartbeat_timeout: Duration::from_secs(1), + term_timeout: Duration::from_secs(3), + }; + Arc::new(RwLock::new(Cluster::new(storage, settings))) + }) + .collect(); + + Self { + nodes: Arc::new(RwLock::new(nodes)), + tasks: Vec::new(), + shutdown: Arc::new(AtomicBool::new(false)), + messages: Arc::new(RwLock::new(Vec::new())), + } + } + + async fn start(&mut self) { + let (requests_channel, mut requests_receiver) = + tokio::sync::mpsc::channel::(100); + let (responses_channel, mut responses_receiver) = tokio::sync::mpsc::channel(100); + let shutdown = self.shutdown.clone(); + let nodes = self.nodes.clone(); + self.tasks.push(tokio::spawn(async move { + while !shutdown.load(Ordering::Relaxed) { + if let Some(request) = requests_receiver.recv().await { + let target = nodes.read().await[request.target as usize].clone(); + let response = target.write().await.request(&request); + responses_channel.send((request, response)).await?; + } + } + + Ok(()) + })); + + let shutdown = self.shutdown.clone(); + let nodes = self.nodes.clone(); + let req_channel = requests_channel.clone(); + let messages = self.messages.clone(); + self.tasks.push(tokio::spawn(async move { + while !shutdown.load(Ordering::Relaxed) { + if let Some((request, response)) = responses_receiver.recv().await { + let origin = nodes.read().await[response.target as usize].clone(); + messages + .write() + .await + .push(format!("{:?} -> {:?}", request, response)); + let new_requests = origin.write().await.response(&request, &response); + if let Some(new_requests) = new_requests { + for req in new_requests { + req_channel.send(req).await?; + } + } + } + } + + Ok(()) + })); + + for node in self.nodes.read().await.iter() { + let node = node.clone(); + let shutdown = self.shutdown.clone(); + let req_channel = requests_channel.clone(); + self.tasks.push(tokio::spawn(async move { + while !shutdown.load(Ordering::Relaxed) { + if let Some(requests) = node.write().await.process() { + for request in requests { + req_channel.send(request).await?; + } + } else { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + Ok(()) + })); + } + } + + async fn stop(&mut self) -> anyhow::Result<()> { + self.shutdown.store(true, Ordering::Relaxed); + for task in self.tasks.drain(..) { + let _ = task.await?; + } + Ok(()) + } + + async fn ensure_leader(&self, timeout: Duration) -> bool { + let timer = Instant::now(); + while timer.elapsed() < timeout { + tokio::time::sleep(Duration::from_millis(100)).await; + + for node in self.nodes.read().await.iter() { + if let ClusterState::Leader = node.read().await.state { + return true; + } + } + } + + self.messages.read().await.iter().for_each(|message| { + println!("{}", message); + }); + false + } + } + + impl Drop for TestCluster { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::Relaxed); + } + } + + #[tokio::test] + async fn election() -> anyhow::Result<()> { + let mut cluster = TestCluster::new(3); + cluster.start().await; + let leader = cluster.ensure_leader(Duration::from_secs(5)).await; + cluster.stop().await?; + assert!(leader); + + cluster.messages.read().await.iter().for_each(|message| { + println!("{}", message); + }); + + Ok(()) + } +} From c283307af6d639df8e6e01920c9377922d0e75fb Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 13:07:10 +0100 Subject: [PATCH 02/14] Update raft.rs --- agdb_server/src/raft.rs | 187 ++++++++++++---------------------------- 1 file changed, 57 insertions(+), 130 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 5a7701dd0..9ae546d0f 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -79,14 +79,9 @@ pub trait Storage { fn logs(&self, index: u64, term: u64) -> Vec; } -struct Node { - index: u64, - timer: Instant, -} - pub struct Cluster { storage: S, - nodes: Vec, + nodes: Vec, state: ClusterState, hash: u64, size: u64, @@ -113,16 +108,11 @@ pub struct ClusterSettings { impl Cluster { pub fn new(storage: S, settings: ClusterSettings) -> Self { - let nodes = (0..settings.size) - .map(|i| Node { - index: i, - timer: Instant::now(), - }) - .collect(); - Self { state: ClusterState::Election, - nodes, + nodes: (0..settings.size) + .filter(|i| *i != settings.index) + .collect(), hash: settings.hash, size: settings.size, index: settings.index, @@ -150,22 +140,16 @@ impl Cluster { let requests = self .nodes .iter() - .filter_map(|node| { - if node.index != self.index { - Some(Request { - version: VERSION, - hash: self.hash, - index: self.index, - target: node.index, - term: self.term, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, - data: RequestType::Append(vec![log.clone()]), - }) - } else { - None - } + .map(|node| Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: *node, + term: self.term, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Append(vec![log.clone()]), }) .collect(); self.storage.append(log); @@ -174,19 +158,10 @@ impl Cluster { pub fn process(&mut self) -> Option> { if let ClusterState::Leader = self.state { - let requests = self.heartbeat(false); - self.timer = Instant::now(); - if requests.is_empty() { - return None; - } else { - return Some(requests); + if self.timer.elapsed() >= self.heartbeat_timeout { + return Some(self.heartbeat()); } } else { - if self.timer.elapsed() > self.term_timeout { - self.state = ClusterState::Election; - self.timer = Instant::now(); - } - if let ClusterState::Election = self.state { if self.timer.elapsed() >= self.election_timeout { let requests = self.election(); @@ -194,14 +169,17 @@ impl Cluster { return Some(requests); } } + + if self.timer.elapsed() > self.term_timeout { + self.state = ClusterState::Election; + self.timer = Instant::now(); + } } None } pub fn request(&mut self, request: &Request) -> Response { - println!("[{}] {:?} {:?}", self.index, self.state, request); - let response = match request.data { RequestType::Append(ref logs) => self.append_request(request, logs), RequestType::Commit(index) => self.commit_request(request, index), @@ -222,23 +200,13 @@ impl Cluster { use RequestType::*; use ResponseType::*; - println!( - "[{}] {:?} {:?} -> {:?}", - self.index, self.state, request, response - ); - match (&self.state, &request.data, &response.result) { (Candidate, Vote, Ok) => self.vote_ok(), - (Leader, Heartbeat, Ok) => { - self.nodes[request.target as usize].timer = Instant::now(); - None - } (Leader, Heartbeat | Append(_) | Commit(_), LogMismatch(values)) => { let logs = self.storage.logs( values.index.actual.unwrap_or_default(), values.term.actual.unwrap_or_default(), ); - self.nodes[request.target as usize].timer = Instant::now(); Some(vec![Request { version: VERSION, hash: self.hash, @@ -256,27 +224,20 @@ impl Cluster { } fn election(&mut self) -> Vec { - println!("[{}] Election", self.index); self.state = ClusterState::Candidate; self.votes = 1; self.nodes .iter() - .filter_map(|node| { - if node.index != self.index { - Some(Request { - version: VERSION, - hash: self.hash, - index: self.index, - target: node.index, - term: self.term + 1, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, - data: RequestType::Vote, - }) - } else { - None - } + .map(|node| Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: *node, + term: self.term + 1, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Vote, }) .collect() } @@ -316,25 +277,19 @@ impl Cluster { Self::ok(request) } - fn heartbeat(&mut self, forced: bool) -> Vec { + fn heartbeat(&mut self) -> Vec { self.nodes .iter() - .filter_map(|node| { - if forced || node.timer.elapsed() > self.heartbeat_timeout { - Some(Request { - version: VERSION, - hash: self.hash, - index: self.index, - target: node.index, - term: self.term, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, - data: RequestType::Heartbeat, - }) - } else { - None - } + .map(|node| Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: *node, + term: self.term, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Heartbeat, }) .collect() } @@ -361,12 +316,7 @@ impl Cluster { if self.votes > self.size / 2 { self.state = ClusterState::Leader; - let requests = self.heartbeat(true); - if requests.is_empty() { - return None; - } else { - return Some(requests); - } + return Some(self.heartbeat()); } None @@ -564,7 +514,6 @@ mod test { use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::RwLock; - use tokio::task::JoinHandle; struct TestStorage { logs: Vec, @@ -575,9 +524,7 @@ mod test { struct TestCluster { nodes: Arc>>, - tasks: Vec>>, shutdown: Arc, - messages: Arc>>, } impl Storage for TestStorage { @@ -609,6 +556,8 @@ mod test { impl TestCluster { fn new(size: u64) -> Self { + tracing_subscriber::fmt().init(); + let nodes = (0..size) .map(|index| { let storage = TestStorage { @@ -629,9 +578,7 @@ mod test { Self { nodes: Arc::new(RwLock::new(nodes)), - tasks: Vec::new(), shutdown: Arc::new(AtomicBool::new(false)), - messages: Arc::new(RwLock::new(Vec::new())), } } @@ -641,7 +588,7 @@ mod test { let (responses_channel, mut responses_receiver) = tokio::sync::mpsc::channel(100); let shutdown = self.shutdown.clone(); let nodes = self.nodes.clone(); - self.tasks.push(tokio::spawn(async move { + tokio::spawn(async move { while !shutdown.load(Ordering::Relaxed) { if let Some(request) = requests_receiver.recv().await { let target = nodes.read().await[request.target as usize].clone(); @@ -650,21 +597,17 @@ mod test { } } - Ok(()) - })); + anyhow::Ok(()) + }); let shutdown = self.shutdown.clone(); let nodes = self.nodes.clone(); let req_channel = requests_channel.clone(); - let messages = self.messages.clone(); - self.tasks.push(tokio::spawn(async move { + tokio::spawn(async move { while !shutdown.load(Ordering::Relaxed) { if let Some((request, response)) = responses_receiver.recv().await { + tracing::info!("{:?} -> {:?}", request, response); let origin = nodes.read().await[response.target as usize].clone(); - messages - .write() - .await - .push(format!("{:?} -> {:?}", request, response)); let new_requests = origin.write().await.response(&request, &response); if let Some(new_requests) = new_requests { for req in new_requests { @@ -674,14 +617,14 @@ mod test { } } - Ok(()) - })); + anyhow::Ok(()) + }); for node in self.nodes.read().await.iter() { let node = node.clone(); let shutdown = self.shutdown.clone(); let req_channel = requests_channel.clone(); - self.tasks.push(tokio::spawn(async move { + tokio::spawn(async move { while !shutdown.load(Ordering::Relaxed) { if let Some(requests) = node.write().await.process() { for request in requests { @@ -691,23 +634,16 @@ mod test { tokio::time::sleep(Duration::from_millis(10)).await; } } - Ok(()) - })); - } - } - async fn stop(&mut self) -> anyhow::Result<()> { - self.shutdown.store(true, Ordering::Relaxed); - for task in self.tasks.drain(..) { - let _ = task.await?; + anyhow::Ok(()) + }); } - Ok(()) } async fn ensure_leader(&self, timeout: Duration) -> bool { let timer = Instant::now(); while timer.elapsed() < timeout { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(10)).await; for node in self.nodes.read().await.iter() { if let ClusterState::Leader = node.read().await.state { @@ -716,9 +652,6 @@ mod test { } } - self.messages.read().await.iter().for_each(|message| { - println!("{}", message); - }); false } } @@ -733,13 +666,7 @@ mod test { async fn election() -> anyhow::Result<()> { let mut cluster = TestCluster::new(3); cluster.start().await; - let leader = cluster.ensure_leader(Duration::from_secs(5)).await; - cluster.stop().await?; - assert!(leader); - - cluster.messages.read().await.iter().for_each(|message| { - println!("{}", message); - }); + assert!(cluster.ensure_leader(Duration::from_secs(5)).await); Ok(()) } From b4095aba3585fd24eb07fac0cba6faa6a84ea18a Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 14:26:25 +0100 Subject: [PATCH 03/14] Update raft.rs --- agdb_server/src/raft.rs | 164 +++++++++++++++++++++++++--------------- 1 file changed, 104 insertions(+), 60 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 9ae546d0f..cc4be6192 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -12,8 +12,8 @@ pub struct Log { #[derive(Debug)] pub struct MismatchedValues { - actual: Option, - expected: Option, + local: Option, + requested: Option, } #[derive(Debug)] @@ -159,6 +159,7 @@ impl Cluster { pub fn process(&mut self) -> Option> { if let ClusterState::Leader = self.state { if self.timer.elapsed() >= self.heartbeat_timeout { + self.timer = Instant::now(); return Some(self.heartbeat()); } } else { @@ -201,11 +202,11 @@ impl Cluster { use ResponseType::*; match (&self.state, &request.data, &response.result) { - (Candidate, Vote, Ok) => self.vote_ok(), + (Candidate, Vote, Ok) => self.vote_ok(request.term), (Leader, Heartbeat | Append(_) | Commit(_), LogMismatch(values)) => { let logs = self.storage.logs( - values.index.actual.unwrap_or_default(), - values.term.actual.unwrap_or_default(), + values.index.local.unwrap_or_default(), + values.term.local.unwrap_or_default(), ); Some(vec![Request { version: VERSION, @@ -296,8 +297,9 @@ impl Cluster { fn heartbeat_request(&mut self, request: &Request) -> Result { self.validate_hash(request)?; + self.validate_term(request)?; - if request.term < self.term { + if request.term > self.term { self.term = request.term; self.state = ClusterState::Follower(request.index); } @@ -311,11 +313,12 @@ impl Cluster { Self::ok(request) } - fn vote_ok(&mut self) -> Option> { + fn vote_ok(&mut self, term: u64) -> Option> { self.votes += 1; if self.votes > self.size / 2 { self.state = ClusterState::Leader; + self.term = term; return Some(self.heartbeat()); } @@ -325,7 +328,7 @@ impl Cluster { fn vote_request(&mut self, request: &Request) -> Result { self.validate_hash(request)?; self.validate_no_leader(request)?; - self.validate_term(request)?; + self.validate_term_for_vote(request)?; self.validate_log_for_vote(request)?; self.state = ClusterState::Voted(request.index); Self::ok(request) @@ -337,8 +340,8 @@ impl Cluster { version: VERSION, target: request.index, result: ResponseType::ClusterMismatch(MismatchedValues { - actual: Some(self.hash), - expected: Some(request.hash), + local: Some(self.hash), + requested: Some(request.hash), }), }); } @@ -352,16 +355,16 @@ impl Cluster { version: VERSION, target: request.index, result: ResponseType::LeaderMismatch(MismatchedValues { - actual: Some(self.index), - expected: None, + local: Some(self.index), + requested: None, }), }), ClusterState::Follower(leader) => Err(Response { version: VERSION, target: request.index, result: ResponseType::LeaderMismatch(MismatchedValues { - actual: Some(leader), - expected: None, + local: Some(leader), + requested: None, }), }), _ => Ok(()), @@ -375,16 +378,16 @@ impl Cluster { version: VERSION, target: request.index, result: ResponseType::LeaderMismatch(MismatchedValues { - actual: Some(leader), - expected: Some(request.index), + local: Some(leader), + requested: Some(request.index), }), }), _ => Err(Response { version: VERSION, target: request.index, result: ResponseType::LeaderMismatch(MismatchedValues { - actual: None, - expected: Some(request.index), + local: None, + requested: Some(request.index), }), }), } @@ -397,16 +400,16 @@ impl Cluster { target: request.index, result: ResponseType::LogMismatch(LogMismatch { index: MismatchedValues { - actual: Some(self.log_index), - expected: Some(request.log_index), + local: Some(self.log_index), + requested: Some(request.log_index), }, term: MismatchedValues { - actual: Some(self.log_term), - expected: Some(request.log_term), + local: Some(self.log_term), + requested: Some(request.log_term), }, commit: MismatchedValues { - actual: Some(self.log_commit), - expected: Some(request.log_commit), + local: Some(self.log_commit), + requested: Some(request.log_commit), }, }), }); @@ -422,16 +425,16 @@ impl Cluster { target: request.index, result: ResponseType::LogMismatch(LogMismatch { index: MismatchedValues { - actual: Some(self.log_index), - expected: Some(request.log_index), + local: Some(self.log_index), + requested: Some(request.log_index), }, term: MismatchedValues { - actual: Some(self.log_term), - expected: Some(request.log_term), + local: Some(self.log_term), + requested: Some(request.log_term), }, commit: MismatchedValues { - actual: Some(self.log_commit), - expected: Some(request.log_commit), + local: Some(self.log_commit), + requested: Some(request.log_commit), }, }), }); @@ -450,16 +453,16 @@ impl Cluster { target: request.index, result: ResponseType::LogMismatch(LogMismatch { index: MismatchedValues { - actual: Some(self.log_index), - expected: Some(request.log_index), + local: Some(self.log_index), + requested: Some(request.log_index), }, term: MismatchedValues { - actual: Some(self.log_term), - expected: Some(request.log_term), + local: Some(self.log_term), + requested: Some(request.log_term), }, commit: MismatchedValues { - actual: Some(self.log_commit), - expected: Some(request.log_commit), + local: Some(self.log_commit), + requested: Some(request.log_commit), }, }), }); @@ -474,8 +477,8 @@ impl Cluster { version: VERSION, target: request.index, result: ResponseType::TermMismatch(MismatchedValues { - actual: Some(self.term), - expected: Some(request.term), + local: Some(self.term), + requested: Some(request.term), }), }); } @@ -483,14 +486,29 @@ impl Cluster { Ok(()) } - fn validate_term(&self, request: &Request) -> Result<(), Response> { + fn validate_term_for_vote(&self, request: &Request) -> Result<(), Response> { if request.term <= self.term { return Err(Response { version: VERSION, target: request.index, result: ResponseType::TermMismatch(MismatchedValues { - actual: Some(self.term), - expected: Some(request.term), + local: Some(self.term), + requested: Some(request.term), + }), + }); + } + + Ok(()) + } + + fn validate_term(&self, request: &Request) -> Result<(), Response> { + if request.term < self.term { + return Err(Response { + version: VERSION, + target: request.index, + result: ResponseType::TermMismatch(MismatchedValues { + local: Some(self.term), + requested: Some(request.term), }), }); } @@ -511,6 +529,7 @@ impl Cluster { mod test { use super::*; use std::sync::atomic::AtomicBool; + use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::RwLock; @@ -520,11 +539,16 @@ mod test { commit: u64, } - type TestNode = Arc>>; + struct TestNodeImpl { + cluster: Cluster, + } + + type TestNode = Arc>; struct TestCluster { nodes: Arc>>, shutdown: Arc, + blocked: Arc, } impl Storage for TestStorage { @@ -570,15 +594,18 @@ mod test { hash: 123, election_factor: 1, heartbeat_timeout: Duration::from_secs(1), - term_timeout: Duration::from_secs(3), + term_timeout: Duration::from_secs(1), }; - Arc::new(RwLock::new(Cluster::new(storage, settings))) + Arc::new(RwLock::new(TestNodeImpl { + cluster: Cluster::new(storage, settings), + })) }) .collect(); Self { nodes: Arc::new(RwLock::new(nodes)), shutdown: Arc::new(AtomicBool::new(false)), + blocked: Arc::new(AtomicU64::new(99)), } } @@ -588,12 +615,18 @@ mod test { let (responses_channel, mut responses_receiver) = tokio::sync::mpsc::channel(100); let shutdown = self.shutdown.clone(); let nodes = self.nodes.clone(); + let blocked = self.blocked.clone(); tokio::spawn(async move { while !shutdown.load(Ordering::Relaxed) { if let Some(request) = requests_receiver.recv().await { - let target = nodes.read().await[request.target as usize].clone(); - let response = target.write().await.request(&request); - responses_channel.send((request, response)).await?; + let blocked_node = blocked.load(Ordering::Relaxed); + if request.target != blocked_node && request.index != blocked_node { + let target = nodes.read().await[request.target as usize].clone(); + let response = target.write().await.cluster.request(&request); + responses_channel.send((request, response)).await?; + } else { + tracing::info!("Blocked: {:?}", request); + } } } @@ -608,7 +641,8 @@ mod test { if let Some((request, response)) = responses_receiver.recv().await { tracing::info!("{:?} -> {:?}", request, response); let origin = nodes.read().await[response.target as usize].clone(); - let new_requests = origin.write().await.response(&request, &response); + let new_requests = + origin.write().await.cluster.response(&request, &response); if let Some(new_requests) = new_requests { for req in new_requests { req_channel.send(req).await?; @@ -626,13 +660,13 @@ mod test { let req_channel = requests_channel.clone(); tokio::spawn(async move { while !shutdown.load(Ordering::Relaxed) { - if let Some(requests) = node.write().await.process() { + if let Some(requests) = node.write().await.cluster.process() { for request in requests { req_channel.send(request).await?; } - } else { - tokio::time::sleep(Duration::from_millis(10)).await; } + + tokio::time::sleep(Duration::from_millis(10)).await; } anyhow::Ok(()) @@ -640,19 +674,26 @@ mod test { } } - async fn ensure_leader(&self, timeout: Duration) -> bool { + async fn block(&self, index: u64) { + self.blocked.store(index, Ordering::Relaxed); + } + + async fn expect_leader(&self, index: u64, timeout: Duration) { let timer = Instant::now(); while timer.elapsed() < timeout { tokio::time::sleep(Duration::from_millis(10)).await; - for node in self.nodes.read().await.iter() { - if let ClusterState::Leader = node.read().await.state { - return true; - } + if let ClusterState::Leader = self.nodes.read().await[index as usize] + .read() + .await + .cluster + .state + { + return; } } - false + panic!("Leader not found within {:?}", timeout); } } @@ -663,11 +704,14 @@ mod test { } #[tokio::test] - async fn election() -> anyhow::Result<()> { + async fn rebalance() -> anyhow::Result<()> { let mut cluster = TestCluster::new(3); cluster.start().await; - assert!(cluster.ensure_leader(Duration::from_secs(5)).await); - + cluster.expect_leader(0, Duration::from_secs(5)).await; + cluster.block(0).await; + cluster.expect_leader(1, Duration::from_secs(5)).await; + cluster.block(99).await; + tokio::time::sleep(Duration::from_millis(1000)).await; Ok(()) } } From c897c1ab58ba0fe7e16a4b1fe2c118aaa8c8c0eb Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 14:31:15 +0100 Subject: [PATCH 04/14] Update raft.rs --- agdb_server/src/raft.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index cc4be6192..0a3090664 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -695,6 +695,24 @@ mod test { panic!("Leader not found within {:?}", timeout); } + + async fn expect_follower(&self, index: u64, timeout: Duration) { + let timer = Instant::now(); + while timer.elapsed() < timeout { + tokio::time::sleep(Duration::from_millis(10)).await; + + if let ClusterState::Follower(_) = self.nodes.read().await[index as usize] + .read() + .await + .cluster + .state + { + return; + } + } + + panic!("{index} has not become a followerwithin {:?}", timeout); + } } impl Drop for TestCluster { @@ -705,13 +723,14 @@ mod test { #[tokio::test] async fn rebalance() -> anyhow::Result<()> { + const TIMEOUT: Duration = Duration::from_secs(5); let mut cluster = TestCluster::new(3); cluster.start().await; - cluster.expect_leader(0, Duration::from_secs(5)).await; + cluster.expect_leader(0, TIMEOUT).await; cluster.block(0).await; - cluster.expect_leader(1, Duration::from_secs(5)).await; + cluster.expect_leader(1, TIMEOUT).await; cluster.block(99).await; - tokio::time::sleep(Duration::from_millis(1000)).await; + cluster.expect_follower(0, TIMEOUT).await; Ok(()) } } From 2beff808ae6c29059e3a5c1cc856be8cd7a31eed Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 15:09:03 +0100 Subject: [PATCH 05/14] Update raft.rs --- agdb_server/src/raft.rs | 58 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 0a3090664..fb1c7b540 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -532,6 +532,7 @@ mod test { use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; + use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; struct TestStorage { @@ -549,6 +550,7 @@ mod test { nodes: Arc>>, shutdown: Arc, blocked: Arc, + requests_channel: Option>, } impl Storage for TestStorage { @@ -606,12 +608,14 @@ mod test { nodes: Arc::new(RwLock::new(nodes)), shutdown: Arc::new(AtomicBool::new(false)), blocked: Arc::new(AtomicU64::new(99)), + requests_channel: None, } } async fn start(&mut self) { let (requests_channel, mut requests_receiver) = tokio::sync::mpsc::channel::(100); + self.requests_channel = Some(requests_channel.clone()); let (responses_channel, mut responses_receiver) = tokio::sync::mpsc::channel(100); let shutdown = self.shutdown.clone(); let nodes = self.nodes.clone(); @@ -713,6 +717,43 @@ mod test { panic!("{index} has not become a followerwithin {:?}", timeout); } + + async fn expect_value(&self, index: u64, value: Vec, timeout: Duration) { + let timer = Instant::now(); + while timer.elapsed() < timeout { + tokio::time::sleep(Duration::from_millis(10)).await; + + if self.nodes.read().await[index as usize] + .read() + .await + .cluster + .storage + .logs + .iter() + .any(|log| log.data == value) + { + return; + } + } + + panic!("{index} has not received the value within {:?}", timeout); + } + + async fn append(&self, index: u64, log: Vec) -> anyhow::Result<()> { + let requests = self.nodes.read().await[index as usize] + .write() + .await + .cluster + .append(log); + + for request in requests { + if let Some(channel) = &self.requests_channel { + channel.send(request).await?; + } + } + + Ok(()) + } } impl Drop for TestCluster { @@ -721,9 +762,10 @@ mod test { } } + const TIMEOUT: Duration = Duration::from_secs(5); + #[tokio::test] async fn rebalance() -> anyhow::Result<()> { - const TIMEOUT: Duration = Duration::from_secs(5); let mut cluster = TestCluster::new(3); cluster.start().await; cluster.expect_leader(0, TIMEOUT).await; @@ -733,4 +775,18 @@ mod test { cluster.expect_follower(0, TIMEOUT).await; Ok(()) } + + #[tokio::test] + async fn replication() -> anyhow::Result<()> { + const TIMEOUT: Duration = Duration::from_secs(5); + let mut cluster = TestCluster::new(3); + cluster.start().await; + cluster.expect_leader(0, TIMEOUT).await; + let value = b"Hello, World!".to_vec(); + cluster.append(0, value.clone()).await?; + cluster.expect_value(0, value.clone(), TIMEOUT).await; + cluster.expect_value(1, value.clone(), TIMEOUT).await; + cluster.expect_value(2, value.clone(), TIMEOUT).await; + Ok(()) + } } From 4d001d83d8d6ee83feb9056c1fda6acbe9bc76a7 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 15:29:37 +0100 Subject: [PATCH 06/14] Update raft.rs --- agdb_server/src/raft.rs | 74 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index fb1c7b540..2865ea78a 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -82,6 +82,7 @@ pub trait Storage { pub struct Cluster { storage: S, nodes: Vec, + timers: Vec, state: ClusterState, hash: u64, size: u64, @@ -113,6 +114,7 @@ impl Cluster { nodes: (0..settings.size) .filter(|i| *i != settings.index) .collect(), + timers: vec![Instant::now(); settings.size as usize], hash: settings.hash, size: settings.size, index: settings.index, @@ -158,10 +160,17 @@ impl Cluster { pub fn process(&mut self) -> Option> { if let ClusterState::Leader = self.state { - if self.timer.elapsed() >= self.heartbeat_timeout { - self.timer = Instant::now(); - return Some(self.heartbeat()); + let requests = self.heartbeat(); + + if requests.is_empty() { + return None; } + + requests.iter().for_each(|request| { + self.timers[request.target as usize] = Instant::now(); + }); + + return Some(requests); } else { if let ClusterState::Election = self.state { if self.timer.elapsed() >= self.election_timeout { @@ -188,6 +197,7 @@ impl Cluster { RequestType::Vote => self.vote_request(request), }; + self.timers[request.index as usize] = Instant::now(); self.timer = Instant::now(); match response { @@ -208,6 +218,7 @@ impl Cluster { values.index.local.unwrap_or_default(), values.term.local.unwrap_or_default(), ); + self.timers[request.target as usize] = Instant::now(); Some(vec![Request { version: VERSION, hash: self.hash, @@ -245,8 +256,12 @@ impl Cluster { fn append_request(&mut self, request: &Request, logs: &[Log]) -> Result { self.validate_hash(request)?; - self.validate_leader(request)?; - self.validate_current_term(request)?; + self.validate_term(request)?; + + if request.term > self.term { + self.term = request.term; + self.state = ClusterState::Follower(request.index); + } for log in logs { self.validate_log_commit(request, log)?; @@ -268,8 +283,12 @@ impl Cluster { fn commit_request(&mut self, request: &Request, index: u64) -> Result { self.validate_hash(request)?; - self.validate_leader(request)?; - self.validate_current_term(request)?; + self.validate_term(request)?; + + if request.term > self.term { + self.term = request.term; + self.state = ClusterState::Follower(request.index); + } if self.log_commit < index { self.commit(index); @@ -279,6 +298,29 @@ impl Cluster { } fn heartbeat(&mut self) -> Vec { + self.nodes + .iter() + .filter_map(|node| { + if self.timers[*node as usize].elapsed() > self.heartbeat_timeout { + Some(Request { + version: VERSION, + hash: self.hash, + index: self.index, + target: *node, + term: self.term, + log_index: self.log_index, + log_term: self.log_term, + log_commit: self.log_commit, + data: RequestType::Heartbeat, + }) + } else { + None + } + }) + .collect() + } + + fn heartbeat_no_timer(&mut self) -> Vec { self.nodes .iter() .map(|node| Request { @@ -319,7 +361,7 @@ impl Cluster { if self.votes > self.size / 2 { self.state = ClusterState::Leader; self.term = term; - return Some(self.heartbeat()); + return Some(self.heartbeat_no_timer()); } None @@ -789,4 +831,20 @@ mod test { cluster.expect_value(2, value.clone(), TIMEOUT).await; Ok(()) } + + #[tokio::test] + async fn reconciliation() -> anyhow::Result<()> { + const TIMEOUT: Duration = Duration::from_secs(5); + let mut cluster = TestCluster::new(3); + cluster.start().await; + cluster.expect_leader(0, TIMEOUT).await; + cluster.block(1).await; + let value = b"Hello, World!".to_vec(); + cluster.append(0, value.clone()).await?; + cluster.expect_value(0, value.clone(), TIMEOUT).await; + cluster.expect_value(2, value.clone(), TIMEOUT).await; + cluster.block(99).await; + cluster.expect_value(1, value.clone(), TIMEOUT).await; + Ok(()) + } } From e3820f25d93895c8b06316eed1a436c0a7d012c5 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 15:38:08 +0100 Subject: [PATCH 07/14] Update raft.rs --- agdb_server/src/raft.rs | 42 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 2865ea78a..5c0bbf40e 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -574,6 +574,7 @@ mod test { use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; + use std::sync::OnceLock; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; @@ -624,7 +625,8 @@ mod test { impl TestCluster { fn new(size: u64) -> Self { - tracing_subscriber::fmt().init(); + static LOGGER_INIT: OnceLock<()> = OnceLock::new(); + LOGGER_INIT.get_or_init(|| tracing_subscriber::fmt().init()); let nodes = (0..size) .map(|index| { @@ -724,6 +726,10 @@ mod test { self.blocked.store(index, Ordering::Relaxed); } + async fn unblock(&self) { + self.blocked.store(u64::MAX, Ordering::Relaxed); + } + async fn expect_leader(&self, index: u64, timeout: Duration) { let timer = Instant::now(); while timer.elapsed() < timeout { @@ -813,7 +819,7 @@ mod test { cluster.expect_leader(0, TIMEOUT).await; cluster.block(0).await; cluster.expect_leader(1, TIMEOUT).await; - cluster.block(99).await; + cluster.unblock().await; cluster.expect_follower(0, TIMEOUT).await; Ok(()) } @@ -824,7 +830,7 @@ mod test { let mut cluster = TestCluster::new(3); cluster.start().await; cluster.expect_leader(0, TIMEOUT).await; - let value = b"Hello, World!".to_vec(); + let value = b"0".to_vec(); cluster.append(0, value.clone()).await?; cluster.expect_value(0, value.clone(), TIMEOUT).await; cluster.expect_value(1, value.clone(), TIMEOUT).await; @@ -839,12 +845,38 @@ mod test { cluster.start().await; cluster.expect_leader(0, TIMEOUT).await; cluster.block(1).await; - let value = b"Hello, World!".to_vec(); + let value = b"0".to_vec(); cluster.append(0, value.clone()).await?; cluster.expect_value(0, value.clone(), TIMEOUT).await; cluster.expect_value(2, value.clone(), TIMEOUT).await; - cluster.block(99).await; + cluster.unblock().await; + cluster.expect_value(1, value.clone(), TIMEOUT).await; + Ok(()) + } + + #[tokio::test] + async fn reconciliation_multiple_values() -> anyhow::Result<()> { + const TIMEOUT: Duration = Duration::from_secs(5); + let mut cluster = TestCluster::new(3); + cluster.start().await; + cluster.expect_leader(0, TIMEOUT).await; + let value = b"0".to_vec(); + cluster.append(0, value.clone()).await?; + cluster.expect_value(0, value.clone(), TIMEOUT).await; cluster.expect_value(1, value.clone(), TIMEOUT).await; + cluster.expect_value(2, value.clone(), TIMEOUT).await; + let value2 = b"1".to_vec(); + let value3 = b"2".to_vec(); + cluster.block(2).await; + cluster.append(0, value2.clone()).await?; + cluster.append(0, value3.clone()).await?; + cluster.expect_value(0, value2.clone(), TIMEOUT).await; + cluster.expect_value(0, value3.clone(), TIMEOUT).await; + cluster.expect_value(1, value2.clone(), TIMEOUT).await; + cluster.expect_value(1, value3.clone(), TIMEOUT).await; + cluster.unblock().await; + cluster.expect_value(2, value2.clone(), TIMEOUT).await; + cluster.expect_value(2, value3.clone(), TIMEOUT).await; Ok(()) } } From 583c63e3bafa419f7e095746fbaa1b9f8d703925 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 16:01:51 +0100 Subject: [PATCH 08/14] Update raft.rs --- agdb_server/src/raft.rs | 113 ++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 50 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 5c0bbf40e..1c9ee3df4 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -3,7 +3,7 @@ use std::time::Instant; const VERSION: u64 = 1; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Log { index: u64, term: u64, @@ -132,13 +132,13 @@ impl Cluster { } pub fn append(&mut self, log: Vec) -> Vec { - self.log_index += 1; - self.log_term = self.term; let log = Log { index: self.log_index, term: self.log_term, data: log, }; + self.log_index += 1; + self.log_term = self.term; let requests = self .nodes .iter() @@ -730,9 +730,9 @@ mod test { self.blocked.store(u64::MAX, Ordering::Relaxed); } - async fn expect_leader(&self, index: u64, timeout: Duration) { + async fn expect_leader(&self, index: u64) { let timer = Instant::now(); - while timer.elapsed() < timeout { + while timer.elapsed() < TIMEOUT { tokio::time::sleep(Duration::from_millis(10)).await; if let ClusterState::Leader = self.nodes.read().await[index as usize] @@ -745,12 +745,12 @@ mod test { } } - panic!("Leader not found within {:?}", timeout); + panic!("Leader not found within {:?}", TIMEOUT); } - async fn expect_follower(&self, index: u64, timeout: Duration) { + async fn expect_follower(&self, index: u64) { let timer = Instant::now(); - while timer.elapsed() < timeout { + while timer.elapsed() < TIMEOUT { tokio::time::sleep(Duration::from_millis(10)).await; if let ClusterState::Follower(_) = self.nodes.read().await[index as usize] @@ -763,28 +763,41 @@ mod test { } } - panic!("{index} has not become a followerwithin {:?}", timeout); + panic!("{index} has not become a followerwithin {:?}", TIMEOUT); } - async fn expect_value(&self, index: u64, value: Vec, timeout: Duration) { + async fn expect_storage_synced(&self, left: u64, right: u64) { let timer = Instant::now(); - while timer.elapsed() < timeout { + let mut left_log = vec![]; + let mut right_log = vec![]; + + while timer.elapsed() < TIMEOUT { tokio::time::sleep(Duration::from_millis(10)).await; + left_log = self.nodes.read().await[left as usize] + .read() + .await + .cluster + .storage + .logs + .clone(); - if self.nodes.read().await[index as usize] + right_log = self.nodes.read().await[right as usize] .read() .await .cluster .storage .logs - .iter() - .any(|log| log.data == value) - { + .clone(); + + if left_log == right_log { return; } } - panic!("{index} has not received the value within {:?}", timeout); + panic!( + "{left} is not in sync with {right} in {:?}:\nLEFT\n{:?}\nRIGHT:\n{:?}", + TIMEOUT, left_log, right_log + ); } async fn append(&self, index: u64, log: Vec) -> anyhow::Result<()> { @@ -816,67 +829,67 @@ mod test { async fn rebalance() -> anyhow::Result<()> { let mut cluster = TestCluster::new(3); cluster.start().await; - cluster.expect_leader(0, TIMEOUT).await; + cluster.expect_leader(0).await; cluster.block(0).await; - cluster.expect_leader(1, TIMEOUT).await; + cluster.expect_leader(1).await; cluster.unblock().await; - cluster.expect_follower(0, TIMEOUT).await; + cluster.expect_follower(0).await; Ok(()) } #[tokio::test] async fn replication() -> anyhow::Result<()> { - const TIMEOUT: Duration = Duration::from_secs(5); let mut cluster = TestCluster::new(3); cluster.start().await; - cluster.expect_leader(0, TIMEOUT).await; - let value = b"0".to_vec(); - cluster.append(0, value.clone()).await?; - cluster.expect_value(0, value.clone(), TIMEOUT).await; - cluster.expect_value(1, value.clone(), TIMEOUT).await; - cluster.expect_value(2, value.clone(), TIMEOUT).await; + cluster.expect_leader(0).await; + cluster.append(0, b"0".to_vec()).await?; + cluster.expect_storage_synced(0, 1).await; + cluster.expect_storage_synced(0, 2).await; Ok(()) } #[tokio::test] async fn reconciliation() -> anyhow::Result<()> { - const TIMEOUT: Duration = Duration::from_secs(5); let mut cluster = TestCluster::new(3); cluster.start().await; - cluster.expect_leader(0, TIMEOUT).await; + cluster.expect_leader(0).await; cluster.block(1).await; - let value = b"0".to_vec(); - cluster.append(0, value.clone()).await?; - cluster.expect_value(0, value.clone(), TIMEOUT).await; - cluster.expect_value(2, value.clone(), TIMEOUT).await; + cluster.append(0, b"0".to_vec()).await?; cluster.unblock().await; - cluster.expect_value(1, value.clone(), TIMEOUT).await; + cluster.expect_storage_synced(0, 1).await; + cluster.expect_storage_synced(0, 2).await; Ok(()) } #[tokio::test] async fn reconciliation_multiple_values() -> anyhow::Result<()> { - const TIMEOUT: Duration = Duration::from_secs(5); let mut cluster = TestCluster::new(3); cluster.start().await; - cluster.expect_leader(0, TIMEOUT).await; - let value = b"0".to_vec(); - cluster.append(0, value.clone()).await?; - cluster.expect_value(0, value.clone(), TIMEOUT).await; - cluster.expect_value(1, value.clone(), TIMEOUT).await; - cluster.expect_value(2, value.clone(), TIMEOUT).await; - let value2 = b"1".to_vec(); - let value3 = b"2".to_vec(); + cluster.expect_leader(0).await; + cluster.append(0, b"0".to_vec().clone()).await?; + cluster.expect_storage_synced(0, 1).await; + cluster.expect_storage_synced(0, 2).await; cluster.block(2).await; - cluster.append(0, value2.clone()).await?; - cluster.append(0, value3.clone()).await?; - cluster.expect_value(0, value2.clone(), TIMEOUT).await; - cluster.expect_value(0, value3.clone(), TIMEOUT).await; - cluster.expect_value(1, value2.clone(), TIMEOUT).await; - cluster.expect_value(1, value3.clone(), TIMEOUT).await; + cluster.append(0, b"1".to_vec()).await?; + cluster.append(0, b"2".to_vec()).await?; + cluster.unblock().await; + cluster.expect_storage_synced(0, 1).await; + cluster.expect_storage_synced(0, 2).await; + Ok(()) + } + + #[tokio::test] + async fn drop_uncommited_value() -> anyhow::Result<()> { + let mut cluster = TestCluster::new(3); + cluster.start().await; + cluster.expect_leader(0).await; + cluster.block(0).await; + cluster.append(0, b"0".to_vec()).await?; + cluster.expect_leader(1).await; + cluster.append(1, b"1".to_vec()).await?; cluster.unblock().await; - cluster.expect_value(2, value2.clone(), TIMEOUT).await; - cluster.expect_value(2, value3.clone(), TIMEOUT).await; + cluster.expect_storage_synced(0, 1).await; + cluster.expect_storage_synced(0, 2).await; Ok(()) } } From 8eb307a1b9ba45d8a83a7a531787c5d0829f601a Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 19:23:19 +0100 Subject: [PATCH 09/14] Update raft.rs --- agdb_server/src/raft.rs | 390 ++++++++++++++++++++++------------------ 1 file changed, 213 insertions(+), 177 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 1c9ee3df4..bdd4862f2 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -1,8 +1,6 @@ use std::time::Duration; use std::time::Instant; -const VERSION: u64 = 1; - #[derive(Debug, Clone, PartialEq)] pub struct Log { index: u64, @@ -26,7 +24,6 @@ pub struct LogMismatch { #[derive(Debug)] pub enum RequestType { Append(Vec), - Commit(u64), Heartbeat, Vote, } @@ -52,7 +49,6 @@ enum ClusterState { #[derive(Debug)] pub struct Request { - version: u64, hash: u64, index: u64, target: u64, @@ -65,34 +61,36 @@ pub struct Request { #[derive(Debug)] pub struct Response { - version: u64, target: u64, result: ResponseType, } +struct Node { + index: u64, + log_index: u64, + log_term: u64, + log_commit: u64, + timer: Instant, + voted: bool, +} + pub trait Storage { fn append(&mut self, log: Log); fn commit(&mut self, index: u64); fn current_index(&self) -> u64; fn current_term(&self) -> u64; fn current_commit(&self) -> u64; - fn logs(&self, index: u64, term: u64) -> Vec; + fn logs(&self, since_index: u64, since_term: u64) -> Vec; } pub struct Cluster { storage: S, - nodes: Vec, - timers: Vec, + nodes: Vec, state: ClusterState, hash: u64, size: u64, index: u64, term: u64, - votes: u64, - log_index: u64, - log_term: u64, - log_commit: u64, - timer: Instant, election_timeout: Duration, heartbeat_timeout: Duration, term_timeout: Duration, @@ -112,18 +110,31 @@ impl Cluster { Self { state: ClusterState::Election, nodes: (0..settings.size) - .filter(|i| *i != settings.index) + .map(|i| Node { + index: i, + log_index: if i == settings.index { + storage.current_index() + } else { + 0 + }, + log_term: if i == settings.index { + storage.current_term() + } else { + 0 + }, + log_commit: if i == settings.index { + storage.current_commit() + } else { + 0 + }, + timer: Instant::now(), + voted: i == settings.index, + }) .collect(), - timers: vec![Instant::now(); settings.size as usize], hash: settings.hash, size: settings.size, index: settings.index, term: 0, - votes: 0, - log_index: storage.current_index(), - log_term: storage.current_term(), - log_commit: storage.current_commit(), - timer: Instant::now(), election_timeout: Duration::from_secs(settings.election_factor * settings.index), heartbeat_timeout: settings.heartbeat_timeout, term_timeout: settings.term_timeout, @@ -133,24 +144,24 @@ impl Cluster { pub fn append(&mut self, log: Vec) -> Vec { let log = Log { - index: self.log_index, - term: self.log_term, + index: self.local().log_index, + term: self.term, data: log, }; - self.log_index += 1; - self.log_term = self.term; + self.local_mut().log_index += 1; + self.local_mut().log_term = self.term; let requests = self .nodes .iter() + .filter(|node| self.index != node.index) .map(|node| Request { - version: VERSION, hash: self.hash, index: self.index, - target: *node, + target: node.index, term: self.term, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, + log_index: self.local().log_index, + log_term: self.local().log_term, + log_commit: self.local().log_commit, data: RequestType::Append(vec![log.clone()]), }) .collect(); @@ -167,22 +178,22 @@ impl Cluster { } requests.iter().for_each(|request| { - self.timers[request.target as usize] = Instant::now(); + self.node_mut(request.target).timer = Instant::now(); }); return Some(requests); } else { if let ClusterState::Election = self.state { - if self.timer.elapsed() >= self.election_timeout { + if self.local().timer.elapsed() >= self.election_timeout { let requests = self.election(); - self.timer = Instant::now(); + self.local_mut().timer = Instant::now(); return Some(requests); } } - if self.timer.elapsed() > self.term_timeout { + if self.local().timer.elapsed() > self.term_timeout { self.state = ClusterState::Election; - self.timer = Instant::now(); + self.local_mut().timer = Instant::now(); } } @@ -192,13 +203,11 @@ impl Cluster { pub fn request(&mut self, request: &Request) -> Response { let response = match request.data { RequestType::Append(ref logs) => self.append_request(request, logs), - RequestType::Commit(index) => self.commit_request(request, index), RequestType::Heartbeat => self.heartbeat_request(request), RequestType::Vote => self.vote_request(request), }; - self.timers[request.index as usize] = Instant::now(); - self.timer = Instant::now(); + self.node_mut(request.target).timer = Instant::now(); match response { Ok(response) => response, @@ -212,43 +221,50 @@ impl Cluster { use ResponseType::*; match (&self.state, &request.data, &response.result) { - (Candidate, Vote, Ok) => self.vote_ok(request.term), - (Leader, Heartbeat | Append(_) | Commit(_), LogMismatch(values)) => { - let logs = self.storage.logs( - values.index.local.unwrap_or_default(), - values.term.local.unwrap_or_default(), - ); - self.timers[request.target as usize] = Instant::now(); - Some(vec![Request { - version: VERSION, - hash: self.hash, - index: self.index, - target: request.target, - term: self.term, - log_index: self.index, - log_term: self.term, - log_commit: self.log_commit, - data: Append(logs), - }]) - } + (Candidate, Vote, Ok) => self.vote_received(request), + (Leader, Heartbeat | Append(_), Ok) => self.commit(request), + (Leader, Heartbeat | Append(_), LogMismatch(values)) => self.reconcile(request, values), _ => None, } } + fn reconcile(&mut self, request: &Request, values: &LogMismatch) -> Option> { + let logs = self.storage.logs( + values.index.local.unwrap_or_default(), + values.term.local.unwrap_or_default(), + ); + self.node_mut(request.target).timer = Instant::now(); + Some(vec![Request { + hash: self.hash, + index: self.index, + target: request.target, + term: self.term, + log_index: self.local().log_index, + log_term: self.local().log_term, + log_commit: self.local().log_commit, + data: RequestType::Append(logs), + }]) + } + fn election(&mut self) -> Vec { self.state = ClusterState::Candidate; - self.votes = 1; + self.nodes + .iter_mut() + .filter(|node| self.index != node.index) + .for_each(|node| { + node.voted = false; + }); self.nodes .iter() + .filter(|node| self.index != node.index) .map(|node| Request { - version: VERSION, hash: self.hash, index: self.index, - target: *node, + target: node.index, term: self.term + 1, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, + log_index: self.local().log_index, + log_term: self.local().log_term, + log_commit: self.local().log_commit, data: RequestType::Vote, }) .collect() @@ -257,110 +273,127 @@ impl Cluster { fn append_request(&mut self, request: &Request, logs: &[Log]) -> Result { self.validate_hash(request)?; self.validate_term(request)?; - - if request.term > self.term { - self.term = request.term; - self.state = ClusterState::Follower(request.index); - } + self.become_follower(request); for log in logs { self.validate_log_commit(request, log)?; self.append_storage(log); } + if self.local().log_commit < request.log_commit { + let available_commit = std::cmp::min(self.local().log_index, request.log_commit); + self.commit_storage(available_commit); + } + Self::ok(request) } fn append_storage(&mut self, log: &Log) { - self.log_index = log.index; - self.log_term = log.term; self.storage.append(log.clone()); + self.local_mut().log_index = log.index + 1; + self.local_mut().log_term = log.term; } - fn commit(&mut self, index: u64) { - self.log_commit = index; + fn commit_storage(&mut self, index: u64) { self.storage.commit(index); + self.local_mut().log_commit = index; } - fn commit_request(&mut self, request: &Request, index: u64) -> Result { - self.validate_hash(request)?; - self.validate_term(request)?; + fn commit(&mut self, request: &Request) -> Option> { + self.node_mut(request.target).log_index = request.log_index; + self.node_mut(request.target).log_term = request.log_term; + self.node_mut(request.target).log_commit = request.log_commit; - if request.term > self.term { - self.term = request.term; - self.state = ClusterState::Follower(request.index); - } + let quorum = self.size / 2 + 1; - if self.log_commit < index { - self.commit(index); + if self.local().log_commit < request.log_index + && self + .nodes + .iter() + .filter(|node| node.log_index >= request.log_index) + .count() as u64 + >= quorum + { + self.commit_storage(request.log_index); + return Some(self.heartbeat_no_timer()); } - Self::ok(request) + None } fn heartbeat(&mut self) -> Vec { self.nodes .iter() - .filter_map(|node| { - if self.timers[*node as usize].elapsed() > self.heartbeat_timeout { - Some(Request { - version: VERSION, - hash: self.hash, - index: self.index, - target: *node, - term: self.term, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, - data: RequestType::Heartbeat, - }) - } else { - None - } + .filter(|node| { + self.index != node.index + && self.node(node.index).timer.elapsed() > self.heartbeat_timeout + }) + .map(|node| Request { + hash: self.hash, + index: self.index, + target: node.index, + term: self.term, + log_index: self.local().log_index, + log_term: self.local().log_term, + log_commit: self.local().log_commit, + data: RequestType::Heartbeat, }) .collect() } fn heartbeat_no_timer(&mut self) -> Vec { - self.nodes + let requests: Vec = self + .nodes .iter() + .filter(|node| self.index != node.index) .map(|node| Request { - version: VERSION, hash: self.hash, index: self.index, - target: *node, + target: node.index, term: self.term, - log_index: self.log_index, - log_term: self.log_term, - log_commit: self.log_commit, + log_index: self.local().log_index, + log_term: self.local().log_term, + log_commit: self.local().log_commit, data: RequestType::Heartbeat, }) - .collect() + .collect(); + + requests.iter().for_each(|request| { + self.node_mut(request.target).timer = Instant::now(); + }); + + requests } fn heartbeat_request(&mut self, request: &Request) -> Result { self.validate_hash(request)?; self.validate_term(request)?; - - if request.term > self.term { - self.term = request.term; - self.state = ClusterState::Follower(request.index); - } - + self.become_follower(request); self.validate_log(request)?; - if request.log_commit > self.log_commit { - self.commit(request.log_commit); + if self.local().log_commit < request.log_commit { + let available_commit = std::cmp::min(self.local().log_index, request.log_commit); + self.commit_storage(available_commit); } Self::ok(request) } - fn vote_ok(&mut self, term: u64) -> Option> { - self.votes += 1; + fn become_follower(&mut self, request: &Request) { + if self.term < request.term { + self.term = request.term; + self.state = ClusterState::Follower(request.index); + } + } + + fn vote_received(&mut self, request: &Request) -> Option> { + self.node_mut(request.target).voted = true; - if self.votes > self.size / 2 { + let votes = self.nodes.iter().filter(|node| node.voted).count() as u64; + let quorum = self.size / 2; + + if votes > quorum { self.state = ClusterState::Leader; - self.term = term; + self.term = request.term; return Some(self.heartbeat_no_timer()); } @@ -369,17 +402,16 @@ impl Cluster { fn vote_request(&mut self, request: &Request) -> Result { self.validate_hash(request)?; - self.validate_no_leader(request)?; + self.validate_vote_state(request)?; self.validate_term_for_vote(request)?; self.validate_log_for_vote(request)?; - self.state = ClusterState::Voted(request.index); + self.state = ClusterState::Voted(request.term); Self::ok(request) } fn validate_hash(&self, request: &Request) -> Result<(), Response> { - if request.hash != self.hash { + if self.hash != request.hash { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::ClusterMismatch(MismatchedValues { local: Some(self.hash), @@ -391,10 +423,9 @@ impl Cluster { Ok(()) } - fn validate_no_leader(&self, request: &Request) -> Result<(), Response> { + fn validate_vote_state(&self, request: &Request) -> Result<(), Response> { match self.state { ClusterState::Leader | ClusterState::Candidate => Err(Response { - version: VERSION, target: request.index, result: ResponseType::LeaderMismatch(MismatchedValues { local: Some(self.index), @@ -402,55 +433,39 @@ impl Cluster { }), }), ClusterState::Follower(leader) => Err(Response { - version: VERSION, target: request.index, result: ResponseType::LeaderMismatch(MismatchedValues { local: Some(leader), requested: None, }), }), - _ => Ok(()), - } - } - - fn validate_leader(&self, request: &Request) -> Result<(), Response> { - match self.state { - ClusterState::Follower(leader) if request.index == leader => Ok(()), - ClusterState::Follower(leader) => Err(Response { - version: VERSION, - target: request.index, - result: ResponseType::LeaderMismatch(MismatchedValues { - local: Some(leader), - requested: Some(request.index), - }), - }), - _ => Err(Response { - version: VERSION, + ClusterState::Voted(term) if request.term <= term => Err(Response { target: request.index, - result: ResponseType::LeaderMismatch(MismatchedValues { - local: None, - requested: Some(request.index), + result: ResponseType::AlreadyVoted(MismatchedValues { + local: Some(term), + requested: Some(request.term), }), }), + _ => Ok(()), } } fn validate_log(&self, request: &Request) -> Result<(), Response> { - if request.log_index != self.log_index || request.log_term != self.log_term { + if self.local().log_index != request.log_index || self.local().log_term != request.log_term + { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::LogMismatch(LogMismatch { index: MismatchedValues { - local: Some(self.log_index), + local: Some(self.local().log_index), requested: Some(request.log_index), }, term: MismatchedValues { - local: Some(self.log_term), + local: Some(self.local().log_term), requested: Some(request.log_term), }, commit: MismatchedValues { - local: Some(self.log_commit), + local: Some(self.local().log_commit), requested: Some(request.log_commit), }, }), @@ -461,21 +476,20 @@ impl Cluster { } fn validate_log_commit(&self, request: &Request, log: &Log) -> Result<(), Response> { - if log.index < self.log_commit { + if self.local().log_commit > log.index { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::LogMismatch(LogMismatch { index: MismatchedValues { - local: Some(self.log_index), + local: Some(self.local().log_index), requested: Some(request.log_index), }, term: MismatchedValues { - local: Some(self.log_term), + local: Some(self.local().log_term), requested: Some(request.log_term), }, commit: MismatchedValues { - local: Some(self.log_commit), + local: Some(self.local().log_commit), requested: Some(request.log_commit), }, }), @@ -486,24 +500,23 @@ impl Cluster { } fn validate_log_for_vote(&self, request: &Request) -> Result<(), Response> { - if request.log_index < self.log_index - || request.log_term < self.log_term - || request.log_commit < self.log_commit + if self.local().log_index > request.log_index + || self.local().log_term > request.log_term + || self.local().log_commit > request.log_commit { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::LogMismatch(LogMismatch { index: MismatchedValues { - local: Some(self.log_index), + local: Some(self.local().log_index), requested: Some(request.log_index), }, term: MismatchedValues { - local: Some(self.log_term), + local: Some(self.local().log_term), requested: Some(request.log_term), }, commit: MismatchedValues { - local: Some(self.log_commit), + local: Some(self.local().log_commit), requested: Some(request.log_commit), }, }), @@ -514,9 +527,8 @@ impl Cluster { } fn validate_current_term(&self, request: &Request) -> Result<(), Response> { - if request.term != self.term { + if self.term != request.term { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::TermMismatch(MismatchedValues { local: Some(self.term), @@ -529,9 +541,8 @@ impl Cluster { } fn validate_term_for_vote(&self, request: &Request) -> Result<(), Response> { - if request.term <= self.term { + if self.term >= request.term { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::TermMismatch(MismatchedValues { local: Some(self.term), @@ -544,9 +555,8 @@ impl Cluster { } fn validate_term(&self, request: &Request) -> Result<(), Response> { - if request.term < self.term { + if self.term > request.term { return Err(Response { - version: VERSION, target: request.index, result: ResponseType::TermMismatch(MismatchedValues { local: Some(self.term), @@ -560,11 +570,26 @@ impl Cluster { fn ok(request: &Request) -> Result { Ok(Response { - version: VERSION, target: request.index, result: ResponseType::Ok, }) } + + fn node(&self, index: u64) -> &Node { + &self.nodes[index as usize] + } + + fn node_mut(&mut self, index: u64) -> &mut Node { + &mut self.nodes[index as usize] + } + + fn local(&self) -> &Node { + self.node(self.index) + } + + fn local_mut(&mut self) -> &mut Node { + self.node_mut(self.index) + } } #[cfg(test)] @@ -578,6 +603,7 @@ mod test { use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; + #[derive(Debug, Default, Clone, PartialEq)] struct TestStorage { logs: Vec, commit: u64, @@ -768,35 +794,33 @@ mod test { async fn expect_storage_synced(&self, left: u64, right: u64) { let timer = Instant::now(); - let mut left_log = vec![]; - let mut right_log = vec![]; + let mut left_storage = TestStorage::default(); + let mut right_storage = TestStorage::default(); while timer.elapsed() < TIMEOUT { tokio::time::sleep(Duration::from_millis(10)).await; - left_log = self.nodes.read().await[left as usize] + left_storage = self.nodes.read().await[left as usize] .read() .await .cluster .storage - .logs .clone(); - right_log = self.nodes.read().await[right as usize] + right_storage = self.nodes.read().await[right as usize] .read() .await .cluster .storage - .logs .clone(); - if left_log == right_log { + if left_storage == right_storage { return; } } panic!( "{left} is not in sync with {right} in {:?}:\nLEFT\n{:?}\nRIGHT:\n{:?}", - TIMEOUT, left_log, right_log + TIMEOUT, left_storage, right_storage ); } @@ -855,9 +879,9 @@ mod test { cluster.expect_leader(0).await; cluster.block(1).await; cluster.append(0, b"0".to_vec()).await?; + cluster.expect_storage_synced(0, 2).await; cluster.unblock().await; cluster.expect_storage_synced(0, 1).await; - cluster.expect_storage_synced(0, 2).await; Ok(()) } @@ -872,8 +896,8 @@ mod test { cluster.block(2).await; cluster.append(0, b"1".to_vec()).await?; cluster.append(0, b"2".to_vec()).await?; - cluster.unblock().await; cluster.expect_storage_synced(0, 1).await; + cluster.unblock().await; cluster.expect_storage_synced(0, 2).await; Ok(()) } @@ -887,8 +911,20 @@ mod test { cluster.append(0, b"0".to_vec()).await?; cluster.expect_leader(1).await; cluster.append(1, b"1".to_vec()).await?; + cluster.expect_storage_synced(1, 2).await; cluster.unblock().await; cluster.expect_storage_synced(0, 1).await; + Ok(()) + } + + #[tokio::test] + async fn commit() -> anyhow::Result<()> { + let mut cluster = TestCluster::new(3); + cluster.start().await; + cluster.expect_leader(0).await; + cluster.append(0, b"0".to_vec()).await?; + cluster.expect_leader(1).await; + cluster.expect_storage_synced(0, 1).await; cluster.expect_storage_synced(0, 2).await; Ok(()) } From 67960e9c6edee6a39523dc98b8b97581dce09697 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 19:32:06 +0100 Subject: [PATCH 10/14] Update raft.rs --- agdb_server/src/raft.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index bdd4862f2..2243deb3f 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -223,15 +223,15 @@ impl Cluster { match (&self.state, &request.data, &response.result) { (Candidate, Vote, Ok) => self.vote_received(request), (Leader, Heartbeat | Append(_), Ok) => self.commit(request), - (Leader, Heartbeat | Append(_), LogMismatch(values)) => self.reconcile(request, values), + (Leader, Heartbeat | Append(_), LogMismatch(_)) => self.reconcile(request), _ => None, } } - fn reconcile(&mut self, request: &Request, values: &LogMismatch) -> Option> { + fn reconcile(&mut self, request: &Request) -> Option> { let logs = self.storage.logs( - values.index.local.unwrap_or_default(), - values.term.local.unwrap_or_default(), + self.node(request.target).log_index, + self.node(request.target).log_term, ); self.node_mut(request.target).timer = Instant::now(); Some(vec![Request { @@ -916,16 +916,4 @@ mod test { cluster.expect_storage_synced(0, 1).await; Ok(()) } - - #[tokio::test] - async fn commit() -> anyhow::Result<()> { - let mut cluster = TestCluster::new(3); - cluster.start().await; - cluster.expect_leader(0).await; - cluster.append(0, b"0".to_vec()).await?; - cluster.expect_leader(1).await; - cluster.expect_storage_synced(0, 1).await; - cluster.expect_storage_synced(0, 2).await; - Ok(()) - } } From e879cb83039c2ab256c9f6ebdcae544e9b012808 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 20:09:13 +0100 Subject: [PATCH 11/14] Update main.rs --- agdb_server/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/agdb_server/src/main.rs b/agdb_server/src/main.rs index f7dd3124a..c484bd62f 100644 --- a/agdb_server/src/main.rs +++ b/agdb_server/src/main.rs @@ -6,6 +6,7 @@ mod db_pool; mod error_code; mod logger; mod password; +#[allow(dead_code)] mod raft; mod routes; mod server_db; From 37b398bb63ea2d9d8b84356e08ccc2c2ad1a0d04 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 20:09:16 +0100 Subject: [PATCH 12/14] Update raft.rs --- agdb_server/src/raft.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 2243deb3f..57ce7faf3 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -526,20 +526,6 @@ impl Cluster { Ok(()) } - fn validate_current_term(&self, request: &Request) -> Result<(), Response> { - if self.term != request.term { - return Err(Response { - target: request.index, - result: ResponseType::TermMismatch(MismatchedValues { - local: Some(self.term), - requested: Some(request.term), - }), - }); - } - - Ok(()) - } - fn validate_term_for_vote(&self, request: &Request) -> Result<(), Response> { if self.term >= request.term { return Err(Response { From 26dd9d26e1a17199947133489dfe7f2dfdc9a817 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 20:33:24 +0100 Subject: [PATCH 13/14] Update agdb_server.yaml --- .github/workflows/agdb_server.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/agdb_server.yaml b/.github/workflows/agdb_server.yaml index 7289057d3..b45ec9238 100644 --- a/.github/workflows/agdb_server.yaml +++ b/.github/workflows/agdb_server.yaml @@ -39,7 +39,7 @@ jobs: - run: cargo llvm-cov clean --workspace - run: cargo llvm-cov --package agdb_server --all-features --no-report - run: cargo llvm-cov --package agdb_server --all-features --no-report -- --ignored - - run: cargo llvm-cov report --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 0 --show-missing-lines + - run: cargo llvm-cov report --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 1 --show-missing-lines agdb_server_test: runs-on: ubuntu-latest From 2e3e3c4576ede2677010e317b325815dbdfda06a Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Sun, 1 Dec 2024 20:40:24 +0100 Subject: [PATCH 14/14] Update raft.rs --- agdb_server/src/raft.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index 57ce7faf3..f88b4c765 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -274,6 +274,7 @@ impl Cluster { self.validate_hash(request)?; self.validate_term(request)?; self.become_follower(request); + self.update_node(request.index, request); for log in logs { self.validate_log_commit(request, log)?; @@ -369,6 +370,7 @@ impl Cluster { self.validate_term(request)?; self.become_follower(request); self.validate_log(request)?; + self.update_node(request.index, request); if self.local().log_commit < request.log_commit { let available_commit = std::cmp::min(self.local().log_index, request.log_commit); @@ -561,6 +563,12 @@ impl Cluster { }) } + fn update_node(&mut self, index: u64, request: &Request) { + self.node_mut(index).log_index = request.log_index; + self.node_mut(index).log_term = request.log_term; + self.node_mut(index).log_commit = request.log_commit; + } + fn node(&self, index: u64) -> &Node { &self.nodes[index as usize] } @@ -902,4 +910,25 @@ mod test { cluster.expect_storage_synced(0, 1).await; Ok(()) } + + #[tokio::test] + async fn cluster_of_5() -> anyhow::Result<()> { + let mut cluster = TestCluster::new(5); + cluster.start().await; + cluster.expect_leader(0).await; + cluster.append(0, b"0".to_vec()).await?; + cluster.expect_storage_synced(0, 1).await; + cluster.expect_storage_synced(0, 2).await; + cluster.expect_storage_synced(0, 3).await; + cluster.expect_storage_synced(0, 4).await; + cluster.block(0).await; + cluster.expect_leader(1).await; + cluster.append(1, b"1".to_vec()).await?; + cluster.expect_storage_synced(1, 2).await; + cluster.expect_storage_synced(1, 3).await; + cluster.expect_storage_synced(1, 4).await; + cluster.unblock().await; + cluster.expect_storage_synced(0, 1).await; + Ok(()) + } }