Skip to content

Commit

Permalink
remove control handling while send is in flight
Browse files Browse the repository at this point in the history
  • Loading branch information
Zettroke committed Oct 13, 2022
1 parent 29ad3a7 commit 04988cc
Showing 1 changed file with 4 additions and 104 deletions.
108 changes: 4 additions & 104 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,59 +186,11 @@ impl Fanout {
return Ok(());
}

// Keep track of whether the control channel has returned `Ready(None)`, and stop polling
// it once it has. If we don't do this check, it will continue to return `Ready(None)` any
// time it is polled, which can lead to a busy loop below.
//
// In real life this is likely a non-issue, but it can lead to strange behavior in tests if
// left unhandled.
let mut control_channel_open = true;

// Create our send group which arms all senders to send the given events, and handles
// adding/removing/replacing senders while the send is in-flight.
let mut send_group = SendGroup::new(&mut self.senders, events);

loop {
tokio::select! {
// Semantically, it's not hugely important that this select is biased. It does,
// however, make testing simpler when you can count on control messages being
// processed first.
biased;

maybe_msg = self.control_channel.recv(), if control_channel_open => {
trace!("Processing control message inside of send: {:?}", maybe_msg);

// During a send operation, control messages must be applied via the
// `SendGroup`, since it has exclusive access to the senders.
match maybe_msg {
Some(ControlMessage::Add(id, sink)) => {
send_group.add(id, sink);
},
Some(ControlMessage::Remove(id)) => {
send_group.remove(&id);
},
Some(ControlMessage::Replace(id, Some(sink))) => {
send_group.replace(&id, Sender::new(sink));
},
Some(ControlMessage::Replace(id, None)) => {
send_group.pause(&id);
},
None => {
// Control channel is closed, which means Vector is shutting down.
control_channel_open = false;
}
}
}
// Create our send group which arms all senders to send the given events,
// there is no point in handling adding/removing/replacing senders while the send is in-flight
SendGroup::new(&mut self.senders, events).send().await?;

result = send_group.send() => match result {
Ok(()) => {
trace!("Sent item to fanout.");
break;
},
Err(e) => return Err(e),
}
}
}
trace!("Sent item to fanout.");

Ok(())
}
Expand Down Expand Up @@ -285,44 +237,6 @@ impl<'a> SendGroup<'a> {
Self { senders, sends }
}

fn try_detach_send(&mut self, id: &ComponentKey) {
if let Some(send) = self.sends.remove(id) {
tokio::spawn(async move {
if let Err(e) = send.await {
warn!(
cause = %e,
message = "Encountered error writing to component after detaching from topology.",
);
}
});
}
}

#[allow(clippy::needless_pass_by_value)]
fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
// When we're in the middle of a send, we can only keep track of the new sink, but can't
// actually send to it, as we don't have the item to send... so only add it to `senders`.
assert!(
self.senders
.insert(id.clone(), Some(Sender::new(sink)))
.is_none(),
"Adding duplicate output id to fanout: {id}"
);
}

fn remove(&mut self, id: &ComponentKey) {
// We may or may not be removing a sender that we're try to drive a send against, so we have
// to also detach the send future for the sender if it exists, otherwise we'd be hanging
// around still trying to send to it.
assert!(
self.senders.remove(id).is_some(),
"Removing non-existent sink from fanout: {id}"
);

// Now try and detach the in-flight send, if it exists.
self.try_detach_send(id);
}

fn replace(&mut self, id: &ComponentKey, sink: Sender) {
match self.senders.get_mut(id) {
Some(sender) => {
Expand All @@ -338,20 +252,6 @@ impl<'a> SendGroup<'a> {
}
}

fn pause(&mut self, id: &ComponentKey) {
match self.senders.get_mut(id) {
Some(sender) => {
// A sink must be known and present to be replaced, otherwise an invalid sequence of
// control operations has been applied.
assert!(
sender.take().is_some(),
"Pausing non-existent sink is not valid: {id}"
);
}
None => panic!("Pausing unknown sink from fanout: {id}"),
}
}

async fn send(&mut self) -> crate::Result<()> {
// Right now, we do a linear scan of all sends, polling each send once in order to avoid
// waiting forever, such that we can let our control messages get picked up while sends are
Expand Down

0 comments on commit 04988cc

Please sign in to comment.