Skip to content

Commit

Permalink
batch MsgAppend entries
Browse files Browse the repository at this point in the history
Signed-off-by: balaji <[email protected]>
  • Loading branch information
poonai authored and balaji committed Feb 3, 2019
1 parent 120b5bd commit a3e3776
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,19 +484,56 @@ impl<T: Storage> Raft<T> {
m.set_entries(RepeatedField::from_vec(ents));
m.set_commit(self.raft_log.committed);
if !m.get_entries().is_empty() {
match pr.state {
ProgressState::Replicate => {
let last = m.get_entries().last().unwrap().get_index();
pr.optimistic_update(last);
pr.ins.add(last);
let last = m.get_entries().last().unwrap().get_index();
self.update_progress_state(last, pr);
}
}

fn update_progress_state(&mut self, last: u64, pr: &mut Progress) {
match pr.state {
ProgressState::Replicate => {
pr.optimistic_update(last);
pr.ins.add(last);
}
ProgressState::Probe => pr.pause(),
_ => panic!(
"{} is sending append in unhandled state {:?}",
self.tag, pr.state
),
}
}

fn try_batching(
&mut self,
to: u64,
pr: &mut Progress,
ents: &mut Vec<Entry>,
term: u64,
) -> (bool, bool, u64) {
// if MsgAppend for the reciver already exists, try_batching
// will append the entries to the existing MsgAppend
for msg in &mut self.msgs {
if msg.get_log_term() == term
&& msg.get_msg_type() == MessageType::MsgAppend
&& msg.get_to() == to
{
msg.set_index(pr.next_idx - 1);
msg.set_commit(self.raft_log.committed);
let mut batched_entries = msg.take_entries().into_vec();
batched_entries.append(ents);
msg.set_entries(RepeatedField::from_vec(batched_entries));
let is_empty = msg.get_entries().is_empty();
if !is_empty {
return (
true,
is_empty,
msg.get_entries().last().unwrap().get_index(),
);
}
ProgressState::Probe => pr.pause(),
_ => panic!(
"{} is sending append in unhandled state {:?}",
self.tag, pr.state
),
return (true, true, 0);
}
}
return (false, true, 0);
}

/// Sends RPC, with entries to the given peer.
Expand All @@ -514,7 +551,16 @@ impl<T: Storage> Raft<T> {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
let mut ents = ents.unwrap();
let term = term.unwrap();
let (batched, is_ents_empty, last_index) = self.try_batching(to, pr, &mut ents, term);
if batched {
if !is_ents_empty {
self.update_progress_state(last_index, pr)
}
return;
}
self.prepare_send_entries(&mut m, pr, term, ents);
}
self.send(m);
}
Expand Down

0 comments on commit a3e3776

Please sign in to comment.