Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log and drop signals when we can't decode the arguments #114

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -954,15 +954,30 @@ private void ApplySignalWorkflow(SignalWorkflow signal)
// Run the handler as a top-level function
_ = QueueNewTaskAsync(() => RunTopLevelAsync(async () =>
{
await inbound.Value.HandleSignalAsync(new(
Signal: signal.SignalName,
Definition: signalDefn,
Args: DecodeArgs(
// Drop the signal if we cannot decode the arguments
object?[] args;
try
{
args = DecodeArgs(
method: signalDefn.Method ?? signalDefn.Delegate!.Method,
payloads: signal.Input,
itemName: $"Signal {signal.SignalName}",
dynamic: signalDefn.Dynamic,
dynamicArgPrepend: signal.SignalName),
dynamicArgPrepend: signal.SignalName);
}
catch (Exception e)
{
logger.LogError(
e,
"Failed decoding signal args for {SignalName}, dropping the signal",
signal.SignalName);
return;
}

await inbound.Value.HandleSignalAsync(new(
Signal: signal.SignalName,
Definition: signalDefn,
Args: args,
Headers: signal.Headers)).ConfigureAwait(true);
}));
}
Expand Down
32 changes: 32 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,38 @@ await handle.SignalAsync(
});
}

[Workflow]
public class BadSignalArgsDroppedWorkflow
{
[WorkflowRun]
public Task RunAsync() => Workflow.DelayAsync(Timeout.Infinite);

[WorkflowSignal]
public async Task SomeSignalAsync(string arg) => SignalArgs.Add(arg);

[WorkflowQuery]
public IList<string> SignalArgs { get; } = new List<string>();
}

[Fact]
public async Task ExecuteWorkflowAsync_BadSignalArgs_ProperlyDropped()
{
await ExecuteWorkerAsync<BadSignalArgsDroppedWorkflow>(async worker =>
{
var handle = await Env.Client.StartWorkflowAsync(
(BadSignalArgsDroppedWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
// Send 4 signals, the first and third being bad
await handle.SignalAsync("SomeSignal", new object?[] { 123 });
await handle.SignalAsync("SomeSignal", new object?[] { "value1" });
await handle.SignalAsync("SomeSignal", new object?[] { false });
await handle.SignalAsync("SomeSignal", new object?[] { "value2" });
Assert.Equal(
new List<string> { "value1", "value2" },
await handle.QueryAsync(wf => wf.SignalArgs));
});
}

[Workflow]
public class QueryWorkflow
{
Expand Down