Skip to content

Commit

Permalink
Merge pull request #583 from dora-rs/fix-575
Browse files Browse the repository at this point in the history
Don't wait for non-started dynamic nodes on stop
  • Loading branch information
phil-opp authored Jul 11, 2024
2 parents 01c4047 + 508311e commit 8d63e81
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
40 changes: 37 additions & 3 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ impl Daemon {
}
Event::CtrlC => {
for dataflow in self.running.values_mut() {
dataflow.stop_all(&self.clock, None).await;
dataflow
.stop_all(&mut self.coordinator_connection, &self.clock, None)
.await?;
}
}
}
Expand Down Expand Up @@ -496,7 +498,13 @@ impl Daemon {
.send(Some(reply))
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));

dataflow.stop_all(&self.clock, grace_duration).await;
dataflow
.stop_all(
&mut self.coordinator_connection,
&self.clock,
grace_duration,
)
.await?;
RunStatus::Continue
}
DaemonCoordinatorEvent::Destroy => {
Expand Down Expand Up @@ -640,6 +648,10 @@ impl Daemon {
if local {
dataflow.pending_nodes.insert(node.id.clone());

if node.kind.dynamic() {
dataflow.dynamic_nodes.insert(node.id.clone());
}

let node_id = node.id.clone();
let node_stderr_most_recent = dataflow
.node_stderr_most_recent
Expand Down Expand Up @@ -1464,6 +1476,12 @@ pub struct RunningDataflow {
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
running_nodes: BTreeMap<NodeId, RunningNode>,

/// List of all dynamic node IDs.
///
/// We want to treat dynamic nodes differently in some cases, so we need
/// to know which nodes are dynamic.
dynamic_nodes: BTreeSet<NodeId>,

open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>,

pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
Expand Down Expand Up @@ -1495,6 +1513,7 @@ impl RunningDataflow {
timers: BTreeMap::new(),
open_inputs: BTreeMap::new(),
running_nodes: BTreeMap::new(),
dynamic_nodes: BTreeSet::new(),
open_external_mappings: HashMap::new(),
pending_drop_tokens: HashMap::new(),
_timer_handles: Vec::new(),
Expand Down Expand Up @@ -1559,7 +1578,21 @@ impl RunningDataflow {
Ok(())
}

async fn stop_all(&mut self, clock: &HLC, grace_duration: Option<Duration>) {
async fn stop_all(
&mut self,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
grace_duration: Option<Duration>,
) -> eyre::Result<()> {
self.pending_nodes
.handle_dataflow_stop(
coordinator_connection,
clock,
&mut self.cascading_error_causes,
&self.dynamic_nodes,
)
.await?;

for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock);
}
Expand All @@ -1586,6 +1619,7 @@ impl RunningDataflow {
}
});
self.stop_sent = true;
Ok(())
}

fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
Expand Down
20 changes: 19 additions & 1 deletion binaries/daemon/src/pending.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};

use dora_core::{
config::NodeId,
Expand Down Expand Up @@ -97,6 +97,24 @@ impl PendingNodes {
Ok(log)
}

pub async fn handle_dataflow_stop(
&mut self,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
dynamic_nodes: &BTreeSet<NodeId>,
) -> eyre::Result<Vec<LogMessage>> {
// remove all local dynamic nodes that are not yet started
for node_id in dynamic_nodes {
if self.local_nodes.remove(node_id) {
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await?;
}
}

Ok(Vec::new())
}

pub async fn handle_external_all_nodes_ready(
&mut self,
exited_before_subscribe: Vec<NodeId>,
Expand Down

0 comments on commit 8d63e81

Please sign in to comment.