Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Dec 20, 2024
1 parent 87246eb commit 0101137
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
23 changes: 14 additions & 9 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl ManagedRun {
mut commands: Vec<WFCommand>,
used_flags: Vec<u32>,
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
is_forced_failure: bool,
) -> Result<RunUpdateAct, Box<NextPageReq>> {
let activation_was_only_eviction = self.activation_is_eviction();
let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() {
Expand Down Expand Up @@ -446,6 +447,7 @@ impl ManagedRun {
query_responses,
used_flags,
resp_chan,
is_forced_failure,
};

// Verify we can actually apply the next workflow task, which will happen as part of
Expand Down Expand Up @@ -617,6 +619,7 @@ impl ManagedRun {
}],
vec![],
resp_chan,
true,
)
.unwrap_or_else(|e| {
dbg_panic!("Got next page request when auto-failing workflow: {e:?}");
Expand Down Expand Up @@ -686,6 +689,7 @@ impl ManagedRun {
query_responses: completion.query_responses,
has_pending_query: completion.has_pending_query,
activation_was_eviction: completion.activation_was_eviction,
is_forced_failure: completion.is_forced_failure,
};

self.wfm.machines.add_lang_used_flags(completion.used_flags);
Expand All @@ -708,7 +712,8 @@ impl ManagedRun {
self.wfm.feed_history_from_new_page(update)?;
}
// Don't bother applying the next task if we're evicting at the end of this activation
if !completion.activation_was_eviction {
// or are otherwise broken.
if !completion.activation_was_eviction && !self.am_broken {
self.wfm.apply_next_task_if_ready()?;
}
let new_local_acts = self.wfm.drain_queued_local_activities();
Expand Down Expand Up @@ -1083,7 +1088,7 @@ impl ManagedRun {
// fulfilling a query. If the activation we sent was *only* an eviction, don't send that
// either.
let should_respond = !(machines_wft_response.has_pending_jobs
|| machines_wft_response.replaying
|| (machines_wft_response.replaying && !data.is_forced_failure)
|| is_query_playback
|| data.activation_was_eviction
|| machines_wft_response.have_seen_terminal_event);
Expand Down Expand Up @@ -1331,6 +1336,7 @@ struct CompletionDataForWFT {
query_responses: Vec<QueryResult>,
has_pending_query: bool,
activation_was_eviction: bool,
is_forced_failure: bool,
}

/// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data
Expand Down Expand Up @@ -1405,13 +1411,11 @@ impl WorkflowManager {
self.machines.ready_to_apply_next_wft()
}

/// If there are no pending jobs for the workflow, apply the next workflow task and check
/// again if there are any jobs. Importantly, does not *drain* jobs.
///
/// Returns true if there are jobs (before or after applying the next WFT).
fn apply_next_task_if_ready(&mut self) -> Result<bool> {
/// If there are no pending jobs for the workflow apply the next workflow task and check again
/// if there are any jobs. Importantly, does not *drain* jobs.
fn apply_next_task_if_ready(&mut self) -> Result<()> {
if self.machines.has_pending_jobs() {
return Ok(true);
return Ok(());
}
loop {
let consumed_events = self.machines.apply_next_wft_from_history()?;
Expand All @@ -1423,7 +1427,7 @@ impl WorkflowManager {
break;
}
}
Ok(self.machines.has_pending_jobs())
Ok(())
}

/// Must be called when we're ready to respond to a WFT after handling catching up on replay
Expand Down Expand Up @@ -1473,6 +1477,7 @@ struct RunActivationCompletion {
has_pending_query: bool,
query_responses: Vec<QueryResult>,
used_flags: Vec<u32>,
is_forced_failure: bool,
/// Used to notify the worker when the completion is done processing and the completion can
/// unblock. Must always be `Some` when initialized.
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
Expand Down
7 changes: 6 additions & 1 deletion core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ impl WFStream {
commands,
used_flags,
..
} => match rh.successful_completion(commands, used_flags, complete.response_tx) {
} => match rh.successful_completion(
commands,
used_flags,
complete.response_tx,
false,
) {
Ok(acts) => acts,
Err(npr) => {
self.runs_needing_fetching
Expand Down
23 changes: 17 additions & 6 deletions tests/integ_tests/workflow_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
time::Duration,
};
use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService};
use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions};
use temporal_sdk::{
interceptors::WorkerInterceptor, ActivityOptions, LocalActivityOptions, WfContext,
WorkflowResult,
Expand Down Expand Up @@ -780,7 +780,6 @@ async fn history_out_of_order_on_restart() {
.worker_config
.workflow_failure_errors([WorkflowErrorType::Nondeterminism]);
let mut worker = starter.worker().await;
let client = starter.get_client().await;
let mut starter2 = starter.clone_no_worker();
let mut worker2 = starter2.worker().await;

Expand Down Expand Up @@ -825,16 +824,19 @@ async fn history_out_of_order_on_restart() {
..Default::default()
})
.await;
ctx.timer(Duration::from_secs(5)).await;
ctx.timer(Duration::from_secs(3)).await;
Ok(().into())
});
worker2.register_activity("echo", echo);
let run_id = worker
worker
.submit_wf(
wf_name.to_owned(),
wf_name.to_owned(),
vec![],
WorkflowOptions::default(),
WorkflowOptions {
execution_timeout: Some(Duration::from_secs(7)),
..Default::default()
},
)
.await
.unwrap();
Expand All @@ -853,5 +855,14 @@ async fn history_out_of_order_on_restart() {
worker2.run_until_done().await.unwrap();
};
join!(w1, w2);
// The workflow should complete because the nondeterminism error should fail the workflow
// The workflow should fail with the nondeterminism error
let handle = starter
.get_client()
.await
.get_untyped_workflow_handle(wf_name, "");
let res = handle
.get_workflow_result(Default::default())
.await
.unwrap();
assert_matches!(res, WorkflowExecutionResult::Failed(_));
}

0 comments on commit 0101137

Please sign in to comment.