Skip to content

Commit

Permalink
[server] Structured cluster log messages #1231 (#1301)
Browse files Browse the repository at this point in the history
* Update server.en-US.md

* add cluster id

* cluster messages
  • Loading branch information
michaelvlach authored Oct 9, 2024
1 parent 8952377 commit 84903ed
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 66 deletions.
231 changes: 186 additions & 45 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use agdb::StableHash;
use agdb_api::HttpClient;
use agdb_api::ReqwestClient;
use axum::http::StatusCode;
use serde::Serialize;
use std::fmt::Display;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -55,6 +57,40 @@ pub(crate) struct ClusterImpl {
pub(crate) data: RwLock<ClusterData>,
}

#[derive(Serialize)]
pub(crate) enum ClusterOperation {
Heartbeat,
Election,
Leader,
Follower,
Vote,
}

#[derive(Serialize)]
pub(crate) enum ClusterResult {
Success,
Reject,
Failure,
}

#[derive(Serialize)]
pub(crate) struct ClusterLog {
pub(crate) node: usize,
pub(crate) operation: ClusterOperation,
pub(crate) target_node: usize,
pub(crate) term: u64,
pub(crate) result: ClusterResult,
pub(crate) status: u16,
pub(crate) time: u128,
pub(crate) message: String,
}

impl Display for ClusterLog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&serde_json::to_string(self).unwrap_or_default())
}
}

impl ClusterImpl {
pub(crate) async fn leader(&self) -> Option<usize> {
match self.data.read().await.state {
Expand Down Expand Up @@ -215,27 +251,44 @@ async fn heartbeat(cluster: &Cluster, shutdown_signal: Arc<AtomicBool>) -> Serve

tokio::spawn(async move {
while is_leader.load(Ordering::Relaxed) && !shutdown_signal.load(Ordering::Relaxed) {
let timer = Instant::now();

match node.heartbeat(cluster_hash, term, leader).await {
Ok((status, message)) => {
if status != 200 {
tracing::warn!(
"[{cluster_index}] Heartbeat rejected by {}: ({}) {}",
node.index,
let log = ClusterLog {
node: cluster_index,
operation: ClusterOperation::Heartbeat,
target_node: node.index,
term,
result: ClusterResult::Reject,
status,
message
);
time: timer.elapsed().as_micros(),
message,
};
tracing::warn!("{log}");
}
}
Err(e) => {
let message = format!(
"[{cluster_index}] Heartbeat error on node {}: ({}) {}",
node.index, e.status, e.description
);
let log = ClusterLog {
node: cluster_index,
operation: ClusterOperation::Heartbeat,
target_node: node.index,
term,
result: if e.status.is_client_error() {
ClusterResult::Reject
} else {
ClusterResult::Failure
},
status: e.status.as_u16(),
time: timer.elapsed().as_micros(),
message: e.description,
};

if e.status.is_client_error() {
tracing::warn!(message);
tracing::warn!("{log}");
} else {
tracing::error!(message);
tracing::error!("{log}");
}
}
}
Expand All @@ -261,9 +314,22 @@ async fn election(cluster: &Cluster) -> ServerResult<()> {

let cluster_hash = cluster.cluster_hash;
let index = cluster.index;
let quorum = (cluster.nodes.len() + 1) / 2 + 1;

tracing::info!("[{index}] Starting election (cluster: {cluster_hash}, term: {election_term}, quorum: {quorum}/{})", cluster.nodes.len() + 1);
let cluster_len = cluster.nodes.len() + 1;
let quorum = cluster_len / 2 + 1;

let log = ClusterLog {
node: index,
operation: ClusterOperation::Election,
target_node: index,
term: election_term,
result: ClusterResult::Success,
status: 0,
time: timer.elapsed().as_micros(),
message: format!(
"Starting election (cluster: {cluster_hash}, quorum: {quorum}/{cluster_len})"
),
};
tracing::info!("{log}");

let votes = Arc::new(AtomicUsize::new(1));
let voted = Arc::new(AtomicUsize::new(1));
Expand All @@ -276,18 +342,31 @@ async fn election(cluster: &Cluster) -> ServerResult<()> {

tokio::spawn(async move {
match node.vote(cluster_hash, election_term, index).await {
Ok(_) => {
tracing::info!(
"[{}] Vote for term {election_term} ACCEPTED by {}",
cluster.index,
node.index
);
Ok((status, message)) => {
let log = ClusterLog {
node: cluster.index,
operation: ClusterOperation::Vote,
target_node: node.index,
term: election_term,
result: ClusterResult::Success,
time: timer.elapsed().as_micros(),
status,
message,
};
tracing::info!("{log}");

if (votes.fetch_add(1, Ordering::Relaxed) + 1) == quorum {
tracing::info!(
"[{index}] Elected as leader for term {election_term} ({}ms)",
timer.elapsed().as_millis()
);
let log = ClusterLog {
node: cluster.index,
operation: ClusterOperation::Leader,
target_node: cluster.index,
term: election_term,
result: ClusterResult::Success,
status: 0,
time: timer.elapsed().as_micros(),
message: "Elected as leader".to_string(),
};
tracing::info!("{log}");

let mut data = cluster.data.write().await;
data.state = ClusterState::LeaderElect;
Expand All @@ -297,22 +376,25 @@ async fn election(cluster: &Cluster) -> ServerResult<()> {
}
}
Err(e) => {
let log = ClusterLog {
node: cluster.index,
operation: ClusterOperation::Vote,
target_node: node.index,
term: election_term,
result: if e.status.is_client_error() {
ClusterResult::Reject
} else {
ClusterResult::Failure
},
time: timer.elapsed().as_micros(),
status: e.status.as_u16(),
message: e.description,
};

if e.status.is_client_error() {
tracing::warn!(
"[{}] Vote for term {election_term} REJECTED by {}: ({}) {}",
cluster.index,
node.index,
e.status,
e.description
);
tracing::warn!("{log}");
} else {
tracing::error!(
"[{}] Vote for term {election_term} FAILED on {}: ({}) {}",
cluster.index,
node.index,
e.status,
e.description
);
tracing::error!("{log}");
}
}
}
Expand All @@ -321,13 +403,20 @@ async fn election(cluster: &Cluster) -> ServerResult<()> {
let is_leader = cluster.data.read().await.leader.load(Ordering::Relaxed);

if !is_leader {
tracing::warn!(
"[{index}] Election for term {election_term} failed - {}/{} (quorum: {quorum}/{}) ({}ms)",
votes.load(Ordering::Relaxed),
cluster.nodes.len() + 1,
cluster.nodes.len() + 1,
timer.elapsed().as_millis(),
);
let log = ClusterLog {
node: cluster.index,
operation: ClusterOperation::Election,
target_node: cluster.index,
term: election_term,
result: ClusterResult::Reject,
status: 0,
time: timer.elapsed().as_micros(),
message: format!(
"Election for term {election_term} failed - {}/{cluster_len} (quorum: {quorum}/{cluster_len})",
votes.load(Ordering::Relaxed)
),
};
tracing::warn!("{log}");

let mut data = cluster.data.write().await;
data.state = ClusterState::Election;
Expand All @@ -340,6 +429,58 @@ async fn election(cluster: &Cluster) -> ServerResult<()> {
Ok(())
}

pub(crate) async fn become_follower(
cluster: &Cluster,
term: u64,
leader: usize,
) -> ServerResult<()> {
let mut data = cluster.data.write().await;
data.term = term;
data.state = ClusterState::Follower(leader);
data.leader.store(false, Ordering::Relaxed);

let time = data.timer;
data.timer = Instant::now();

let log = ClusterLog {
node: cluster.index,
operation: ClusterOperation::Follower,
target_node: leader,
term,
result: ClusterResult::Success,
status: StatusCode::OK.as_u16(),
time: time.elapsed().as_micros(),
message: "Becoming follower".to_string(),
};
tracing::info!("{}", log);

Ok(())
}

pub(crate) async fn vote(cluster: &Cluster, term: u64, leader: usize) -> ServerResult<()> {
let mut data = cluster.data.write().await;
data.state = ClusterState::Voted;
data.term = term;
data.voted = term;

let time = data.timer;
data.timer = Instant::now();

let log = ClusterLog {
node: cluster.index,
operation: ClusterOperation::Vote,
target_node: leader,
term,
result: ClusterResult::Success,
status: StatusCode::OK.as_u16(),
time: time.elapsed().as_micros(),
message: "Vote cast".to_string(),
};
tracing::info!("{}", log);

Ok(())
}

pub(crate) async fn start_with_shutdown(
cluster: Cluster,
mut shutdown_receiver: broadcast::Receiver<()>,
Expand Down
11 changes: 10 additions & 1 deletion agdb_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@ pub(crate) struct ConfigImpl {
pub(crate) data_dir: String,
pub(crate) cluster_token: String,
pub(crate) cluster: Vec<Url>,
#[serde(skip)]
pub(crate) cluster_node_id: usize,
}

pub(crate) fn new() -> ServerResult<Config> {
if let Ok(content) = std::fs::read_to_string(CONFIG_FILE) {
let config = Config::new(serde_yaml::from_str(&content)?);
let mut config_impl: ConfigImpl = serde_yaml::from_str(&content)?;
config_impl.cluster_node_id = config_impl
.cluster
.iter()
.position(|x| x == &config_impl.address)
.unwrap_or(0);
let config = Config::new(config_impl);

if !config.cluster.is_empty() && !config.cluster.contains(&config.address) {
return Err(ServerError::from(format!(
Expand All @@ -47,6 +55,7 @@ pub(crate) fn new() -> ServerResult<Config> {
data_dir: "agdb_server_data".to_string(),
cluster_token: "cluster".to_string(),
cluster: vec![],
cluster_node_id: 0,
};

std::fs::write(CONFIG_FILE, serde_yaml::to_string(&config)?)?;
Expand Down
4 changes: 4 additions & 0 deletions agdb_server/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Instant;

#[derive(Default, Serialize)]
struct LogRecord {
node: usize,
method: String,
version: String,
user: String,
Expand Down Expand Up @@ -66,6 +67,7 @@ async fn request_log(
log_record: &mut LogRecord,
skip_body: bool,
) -> Result<Request, Response> {
log_record.node = state.config.cluster_node_id;
log_record.method = request.method().to_string();
log_record.uri = request.uri().to_string();
log_record.version = format!("{:?}", request.version());
Expand Down Expand Up @@ -166,6 +168,7 @@ mod tests {

fn log_record(uri: &str, request_body: &str) -> LogRecord {
LogRecord {
node: 0,
method: "GET".to_string(),
uri: uri.to_string(),
version: "HTTP/1.1".to_string(),
Expand All @@ -192,6 +195,7 @@ mod tests {
#[test]
fn log_error_test() {
let log_record = LogRecord {
node: 0,
method: "GET".to_string(),
uri: "/".to_string(),
version: "HTTP/1.1".to_string(),
Expand Down
Loading

0 comments on commit 84903ed

Please sign in to comment.