diff --git a/src/raft.rs b/src/raft.rs index 6339921c1..2fe1888f0 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -484,19 +484,56 @@ impl Raft { m.set_entries(RepeatedField::from_vec(ents)); m.set_commit(self.raft_log.committed); if !m.get_entries().is_empty() { - match pr.state { - ProgressState::Replicate => { - let last = m.get_entries().last().unwrap().get_index(); - pr.optimistic_update(last); - pr.ins.add(last); + let last = m.get_entries().last().unwrap().get_index(); + self.update_progress_state(last, pr); + } + } + + fn update_progress_state(&mut self, last: u64, pr: &mut Progress) { + match pr.state { + ProgressState::Replicate => { + pr.optimistic_update(last); + pr.ins.add(last); + } + ProgressState::Probe => pr.pause(), + _ => panic!( + "{} is sending append in unhandled state {:?}", + self.tag, pr.state + ), + } + } + + fn try_batching( + &mut self, + to: u64, + pr: &mut Progress, + ents: &mut Vec, + term: u64, + ) -> (bool, bool, u64) { + // if MsgAppend for the reciver already exists, try_batching + // will append the entries to the existing MsgAppend + for msg in &mut self.msgs { + if msg.get_log_term() == term + && msg.get_msg_type() == MessageType::MsgAppend + && msg.get_to() == to + { + msg.set_index(pr.next_idx - 1); + msg.set_commit(self.raft_log.committed); + let mut batched_entries = msg.take_entries().into_vec(); + batched_entries.append(ents); + msg.set_entries(RepeatedField::from_vec(batched_entries)); + let is_empty = msg.get_entries().is_empty(); + if !is_empty { + return ( + true, + is_empty, + msg.get_entries().last().unwrap().get_index(), + ); } - ProgressState::Probe => pr.pause(), - _ => panic!( - "{} is sending append in unhandled state {:?}", - self.tag, pr.state - ), + return (true, true, 0); } } + return (false, true, 0); } /// Sends RPC, with entries to the given peer. @@ -514,7 +551,16 @@ impl Raft { return; } } else { - self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap()); + let mut ents = ents.unwrap(); + let term = term.unwrap(); + let (batched, is_ents_empty, last_index) = self.try_batching(to, pr, &mut ents, term); + if batched { + if !is_ents_empty { + self.update_progress_state(last_index, pr) + } + return; + } + self.prepare_send_entries(&mut m, pr, term, ents); } self.send(m); }