Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability for certain task failure types to fail workflow #205

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,16 +619,31 @@ can be used from workflows including:

#### Workflow Exceptions

* Workflows can throw exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow, retrying until
code update allows it to continue).
* Exceptions that are instances of `Temporalio.Exceptions.FailureException` will fail the workflow with that exception.
* For failing the workflow explicitly with a user exception, explicitly throw
* Workflows can throw exceptions to fail the workflow/update or the "workflow task" (i.e. suspend the workflow, retrying
until code update allows it to continue).
* By default, exceptions that are instances of `Temporalio.Exceptions.FailureException` will fail the workflow/update
with that exception.
* For failing the workflow/update explicitly with a user exception, explicitly throw
`Temporalio.Exceptions.ApplicationFailureException`. This can be marked non-retryable or include details as needed.
* Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of
`FailureException` (or `TaskCanceledException`) and will fail the workflow if uncaught.
* All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow is
fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an
`ApplicationFailureException` as mentioned above.
`FailureException` (or `TaskCanceledException`) and will fail the workflow/update if uncaught.
* By default, all other exceptions fail the "workflow task" which means the workflow/update will continually retry until
the code is fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the
workflow/update, use an `ApplicationFailureException` as mentioned above.
* By default, all non-deterministic exceptions that are detected internally fail the "workflow task".

The default behavior can be customized at the worker level for all workflows via the
`TemporalWorkerOptions.WorkflowFailureExceptionTypes` property or per workflow via the `FailureExceptionTypes` property
on the `WorkflowAttribute`. When a workflow encounters a "workflow task" fail (i.e. suspend), it will first check either
of these collections to see if the exception is an instance of any of the types and if so, will turn into a
workflow/update failure. As a special case, when a non-deterministic exception occurs and
`Temporalio.Exceptions.WorkflowNondeterminismException` is assignable to any of the types in the collection, that too
will turn into a workflow/update failure. However non-deterministic exceptions that match during update handlers become
workflow failures not update failures like other exceptions because a non-deterministic exception is an
entire-workflow-failure situation.

⚠️ WARNING: Customizing the default behavior is currently experimental and the default behavior may change in the
future.

#### Workflow Logic Constraints

Expand Down
2 changes: 0 additions & 2 deletions src/Temporalio/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,15 @@ internal unsafe partial struct WorkerOrFail
public ByteArray* fail;
}

internal unsafe partial struct ByteArrayRefArray
{
[NativeTypeName("const struct ByteArrayRef *")]
public ByteArrayRef* data;

[NativeTypeName("size_t")]
public UIntPtr size;
}

internal partial struct WorkerOptions
{
[NativeTypeName("struct ByteArrayRef")]
Expand Down Expand Up @@ -541,6 +550,12 @@ internal partial struct WorkerOptions

[NativeTypeName("uint32_t")]
public uint max_concurrent_activity_task_polls;

[NativeTypeName("bool")]
public byte nondeterminism_as_workflow_fail;

[NativeTypeName("struct ByteArrayRefArray")]
public ByteArrayRefArray nondeterminism_as_workflow_fail_for_types;
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
Expand Down
27 changes: 27 additions & 0 deletions src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Temporalio.Exceptions;

namespace Temporalio.Bridge
{
Expand Down Expand Up @@ -423,6 +426,10 @@ public static Interop.WorkerOptions ToInteropOptions(
max_concurrent_workflow_task_polls = (uint)options.MaxConcurrentWorkflowTaskPolls,
nonsticky_to_sticky_poll_ratio = options.NonStickyToStickyPollRatio,
max_concurrent_activity_task_polls = (uint)options.MaxConcurrentActivityTaskPolls,
nondeterminism_as_workflow_fail =
(byte)(AnyNonDeterminismFailureTypes(options.WorkflowFailureExceptionTypes) ? 1 : 0),
nondeterminism_as_workflow_fail_for_types = scope.ByteArrayArray(
AllNonDeterminismFailureTypeWorkflows(options.Workflows)),
};
}

Expand All @@ -442,6 +449,10 @@ public static Interop.WorkerOptions ToInteropOptions(
throw new ArgumentException("Unable to get assembly manifest ID for build ID");
buildId = entryAssembly.ManifestModule.ModuleVersionId.ToString();
}
var nonDetWorkflowFailForTypes = options.Workflows.Where(
w => w.FailureExceptionTypes?.Any(
t => t.IsAssignableFrom(typeof(WorkflowNondeterminismException))) ?? false).
Select(w => w.Name).ToArray();
return new()
{
namespace_ = scope.ByteArray(options.Namespace),
Expand All @@ -462,7 +473,23 @@ public static Interop.WorkerOptions ToInteropOptions(
max_concurrent_workflow_task_polls = 1,
nonsticky_to_sticky_poll_ratio = 1,
max_concurrent_activity_task_polls = 1,
nondeterminism_as_workflow_fail =
(byte)(AnyNonDeterminismFailureTypes(options.WorkflowFailureExceptionTypes) ? 1 : 0),
nondeterminism_as_workflow_fail_for_types = scope.ByteArrayArray(
AllNonDeterminismFailureTypeWorkflows(options.Workflows)),
};
}

private static bool AnyNonDeterminismFailureTypes(
IReadOnlyCollection<Type>? types) =>
types?.Any(t => t.IsAssignableFrom(typeof(WorkflowNondeterminismException))) ?? false;

private static string[] AllNonDeterminismFailureTypeWorkflows(
IList<Workflows.WorkflowDefinition> workflows) =>
workflows.
Where(w => AnyNonDeterminismFailureTypes(w.FailureExceptionTypes)).
Select(w =>
w.Name ?? throw new ArgumentException("Dynamic workflows cannot trap non-determinism")).
ToArray();
}
}
19 changes: 19 additions & 0 deletions src/Temporalio/Bridge/Scope.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;

namespace Temporalio.Bridge
Expand Down Expand Up @@ -75,6 +76,24 @@ public Interop.ByteArrayRef NewlineDelimited(IEnumerable<string>? values)
return val.Ref;
}

/// <summary>
/// Create an array of byte arrays from an collection of strings.
/// </summary>
/// <param name="strings">Strings.</param>
/// <returns>Created byte array array.</returns>
public Interop.ByteArrayRefArray ByteArrayArray(IEnumerable<string> strings)
{
var arr = strings.Select(ByteArray).ToArray();
unsafe
{
return new()
{
data = ArrayPointer(arr),
size = (UIntPtr)arr.Length,
};
}
}

/// <summary>
/// Create a cancellation token.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ typedef struct WorkerOrFail {
const struct ByteArray *fail;
} WorkerOrFail;

typedef struct ByteArrayRefArray {
const struct ByteArrayRef *data;
size_t size;
} ByteArrayRefArray;

typedef struct WorkerOptions {
struct ByteArrayRef namespace_;
struct ByteArrayRef task_queue;
Expand All @@ -358,6 +363,8 @@ typedef struct WorkerOptions {
uint32_t max_concurrent_workflow_task_polls;
float nonsticky_to_sticky_poll_ratio;
uint32_t max_concurrent_activity_task_polls;
bool nondeterminism_as_workflow_fail;
struct ByteArrayRefArray nondeterminism_as_workflow_fail_for_types;
} WorkerOptions;

/**
Expand Down
17 changes: 17 additions & 0 deletions src/Temporalio/Bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ impl ByteArrayRef {
}
}

#[repr(C)]
pub struct ByteArrayRefArray {
data: *const ByteArrayRef,
size: libc::size_t,
}

impl ByteArrayRefArray {
fn to_str_vec(&self) -> Vec<&str> {
if self.size == 0 {
vec![]
} else {
let raw = unsafe { std::slice::from_raw_parts(self.data, self.size) };
raw.iter().map(ByteArrayRef::to_str).collect()
}
}
}

/// Metadata is <key1>\n<value1>\n<key2>\n<value2>. Metadata keys or
/// values cannot contain a newline within.
type MetadataRef = ByteArrayRef;
Expand Down
23 changes: 23 additions & 0 deletions src/Temporalio/Bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use crate::client::Client;
use crate::runtime::Runtime;
use crate::ByteArray;
use crate::ByteArrayRef;
use crate::ByteArrayRefArray;
use crate::UserDataHandle;
use prost::Message;
use temporal_sdk_core::replay::HistoryForReplay;
use temporal_sdk_core::replay::ReplayWorkerInput;
use temporal_sdk_core::WorkerConfigBuilder;
use temporal_sdk_core_api::errors::PollActivityError;
use temporal_sdk_core_api::errors::PollWfError;
use temporal_sdk_core_api::errors::WorkflowErrorType;
use temporal_sdk_core_api::Worker as CoreWorker;
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
Expand All @@ -17,6 +19,8 @@ use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -41,6 +45,8 @@ pub struct WorkerOptions {
max_concurrent_workflow_task_polls: u32,
nonsticky_to_sticky_poll_ratio: f32,
max_concurrent_activity_task_polls: u32,
nondeterminism_as_workflow_fail: bool,
nondeterminism_as_workflow_fail_for_types: ByteArrayRefArray,
}

#[derive(Clone)]
Expand Down Expand Up @@ -503,6 +509,23 @@ impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
.max_concurrent_wft_polls(opt.max_concurrent_workflow_task_polls as usize)
.nonsticky_to_sticky_poll_ratio(opt.nonsticky_to_sticky_poll_ratio)
.max_concurrent_at_polls(opt.max_concurrent_activity_task_polls as usize)
.workflow_failure_errors(if opt.nondeterminism_as_workflow_fail {
HashSet::from([WorkflowErrorType::Nondeterminism])
} else {
HashSet::new()
})
.workflow_types_to_failure_errors(
opt.nondeterminism_as_workflow_fail_for_types
.to_str_vec()
.into_iter()
.map(|s| {
(
s.to_owned(),
HashSet::from([WorkflowErrorType::Nondeterminism]),
)
})
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.build()
.map_err(|err| anyhow::anyhow!(err))
}
Expand Down
15 changes: 14 additions & 1 deletion src/Temporalio/Common/ExpressionUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private static (MethodInfo Method, IReadOnlyCollection<object?> Args) ExtractCal
{
throw new ArgumentException("Static call expression must not have a lambda parameter");
}
var method = call.Method;
if (call.Object != null)
{
if (expr.Parameters.Count != 1 ||
Expand All @@ -88,6 +89,18 @@ private static (MethodInfo Method, IReadOnlyCollection<object?> Args) ExtractCal
throw new ArgumentException(
"Instance call expression must have a single lambda parameter used for the call");
}
// In .NET method.ReflectedType will not properly be the call.Object type, but
// instead its declaration type. We need it to be the most-specific method in the
// hierarchy.
if (method.ReflectedType != call.Object.Type)
{
method = call.Object.Type.GetMethod(
method.Name,
BindingFlags.Public | BindingFlags.Instance,
null,
method.GetParameters().Select(p => p.ParameterType).ToArray(),
null) ?? throw new ArgumentException("Failed finding more-specific resolved method");
}
}
// Extract all arguments. If they are constant expressions we'll optimize and just pull
// them out, but if they are not, we will compile them live (accepting the performance
Expand All @@ -107,7 +120,7 @@ private static (MethodInfo Method, IReadOnlyCollection<object?> Args) ExtractCal
return expr.Compile()();
#endif
});
return (call.Method, args.ToArray());
return (method, args.ToArray());
}
}
}
13 changes: 4 additions & 9 deletions src/Temporalio/Exceptions/InvalidWorkflowOperationException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ namespace Temporalio.Exceptions
/// </summary>
public class InvalidWorkflowOperationException : TemporalException
{
private readonly string? stackTraceOverride;

/// <summary>
/// Initializes a new instance of the <see cref="InvalidWorkflowOperationException"/> class.
/// </summary>
/// <param name="message">Exception message.</param>
/// <param name="stackTraceOverride">Override of stack trace.</param>
internal InvalidWorkflowOperationException(string message, string? stackTraceOverride = null)
: base(message) =>
this.stackTraceOverride = stackTraceOverride;

/// <inheritdoc />
public override string? StackTrace => stackTraceOverride ?? base.StackTrace;
internal InvalidWorkflowOperationException(string message)
: base(message)
{
}
}
}
22 changes: 22 additions & 0 deletions src/Temporalio/Exceptions/InvalidWorkflowSchedulerException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace Temporalio.Exceptions
{
/// <summary>
/// Occurs when the workflow does something outside of the workflow scheduler.
/// </summary>
public class InvalidWorkflowSchedulerException : InvalidWorkflowOperationException
{
private readonly string? stackTraceOverride;

/// <summary>
/// Initializes a new instance of the <see cref="InvalidWorkflowSchedulerException"/> class.
/// </summary>
/// <param name="message">Exception message.</param>
/// <param name="stackTraceOverride">Override of stack trace.</param>
internal InvalidWorkflowSchedulerException(string message, string? stackTraceOverride = null)
: base(message) =>
this.stackTraceOverride = stackTraceOverride;

/// <inheritdoc />
public override string? StackTrace => stackTraceOverride ?? base.StackTrace;
}
}
22 changes: 22 additions & 0 deletions src/Temporalio/Exceptions/WorkflowNondeterminismException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace Temporalio.Exceptions
{
/// <summary>
/// Occurs when the workflow does something non-deterministic.
/// </summary>
/// <remarks>
/// This is usually only thrown in a replayer and therefore can't be caught in a workflow, but
/// this exception can be used as a failure exception type to have non-deterministic exceptions
/// fail workflows.
/// </remarks>
public class WorkflowNondeterminismException : InvalidWorkflowOperationException
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Java capitalizes the D here but core does not, open to suggestions

{
/// <summary>
/// Initializes a new instance of the <see cref="WorkflowNondeterminismException"/> class.
/// </summary>
/// <param name="message">Exception message.</param>
internal WorkflowNondeterminismException(string message)
: base(message)
{
}
}
}
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
WorkflowStackTrace: options.WorkflowStackTrace,
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: MetricMeter));
RuntimeMetricMeter: MetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes));
}
}

Expand Down
Loading
Loading