Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Add internal raft library #1367 #1368

Merged
merged 14 commits into from
Dec 1, 2024
Prev Previous commit
Update raft.rs
michaelvlach committed Dec 1, 2024
commit 2e3e3c4576ede2677010e317b325815dbdfda06a
29 changes: 29 additions & 0 deletions agdb_server/src/raft.rs
Original file line number Diff line number Diff line change
@@ -274,6 +274,7 @@ impl<S: Storage> Cluster<S> {
self.validate_hash(request)?;
self.validate_term(request)?;
self.become_follower(request);
self.update_node(request.index, request);

for log in logs {
self.validate_log_commit(request, log)?;
@@ -369,6 +370,7 @@ impl<S: Storage> Cluster<S> {
self.validate_term(request)?;
self.become_follower(request);
self.validate_log(request)?;
self.update_node(request.index, request);

if self.local().log_commit < request.log_commit {
let available_commit = std::cmp::min(self.local().log_index, request.log_commit);
@@ -561,6 +563,12 @@ impl<S: Storage> Cluster<S> {
})
}

fn update_node(&mut self, index: u64, request: &Request) {
self.node_mut(index).log_index = request.log_index;
self.node_mut(index).log_term = request.log_term;
self.node_mut(index).log_commit = request.log_commit;
}

fn node(&self, index: u64) -> &Node {
&self.nodes[index as usize]
}
@@ -902,4 +910,25 @@ mod test {
cluster.expect_storage_synced(0, 1).await;
Ok(())
}

#[tokio::test]
async fn cluster_of_5() -> anyhow::Result<()> {
let mut cluster = TestCluster::new(5);
cluster.start().await;
cluster.expect_leader(0).await;
cluster.append(0, b"0".to_vec()).await?;
cluster.expect_storage_synced(0, 1).await;
cluster.expect_storage_synced(0, 2).await;
cluster.expect_storage_synced(0, 3).await;
cluster.expect_storage_synced(0, 4).await;
cluster.block(0).await;
cluster.expect_leader(1).await;
cluster.append(1, b"1".to_vec()).await?;
cluster.expect_storage_synced(1, 2).await;
cluster.expect_storage_synced(1, 3).await;
cluster.expect_storage_synced(1, 4).await;
cluster.unblock().await;
cluster.expect_storage_synced(0, 1).await;
Ok(())
}
}