Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly clear metadata when encoding/decoding payloads #343

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Temporalio/Worker/WorkflowCodecHelper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Linq;
using System.Threading.Tasks;
using Google.Protobuf;
using Google.Protobuf.Collections;
using Temporalio.Api.Common.V1;
using Temporalio.Bridge.Api.ActivityResult;
Expand Down Expand Up @@ -223,6 +224,7 @@ private static async Task EncodeAsync(IPayloadCodec codec, Payload payload)
var encodedList = await codec.EncodeAsync(new Payload[] { payload }).ConfigureAwait(false);
var encoded = encodedList.Single();
payload.Metadata.Clear();
payload.Data = ByteString.Empty;
payload.MergeFrom(encoded);
}

Expand Down Expand Up @@ -324,6 +326,7 @@ private static async Task DecodeAsync(IPayloadCodec codec, Payload payload)
// We are gonna require a single result here
var decoded = await codec.DecodeAsync(new Payload[] { payload }).ConfigureAwait(false);
payload.Metadata.Clear();
payload.Data = ByteString.Empty;
payload.MergeFrom(decoded.Single());
}
}
Expand Down
57 changes: 57 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Text;

Check failure on line 8 in tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Using directive is unnecessary.
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.Logging;
using Temporalio.Activities;
using Temporalio.Api.Common.V1;
using Temporalio.Api.Enums.V1;
using Temporalio.Api.History.V1;
using Temporalio.Client;
using Temporalio.Client.Schedules;
using Temporalio.Common;
using Temporalio.Converters;
using Temporalio.Exceptions;
using Temporalio.Runtime;
using Temporalio.Tests.Converters;
using Temporalio.Worker;
using Temporalio.Workflows;
using Xunit;
Expand Down Expand Up @@ -5736,6 +5739,60 @@
});
}

[Workflow]
public class NullWithCodecWorkflow
{
[Activity]
public static string? ReturnsNull() => null;

[WorkflowRun]
public async Task<string?> RunAsync(string? input)
{
Assert.Null(input);
return await Workflow.ExecuteActivityAsync(
() => ReturnsNull(),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
}
}

[Fact]
public async Task ExecuteWorkflowAsync_NullWithCodec_EncodedProperly()
{
// Need client with codec
var newOptions = (TemporalClientOptions)Client.Options.Clone();
newOptions.DataConverter = DataConverter.Default with
{
PayloadCodec = new Base64PayloadCodec(),
};
var client = new TemporalClient(Client.Connection, newOptions);

// Run worker
await ExecuteWorkerAsync<NullWithCodecWorkflow>(
async worker =>
{
var handle = await client.StartWorkflowAsync(
(NullWithCodecWorkflow wf) => wf.RunAsync(null),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
var result = await handle.GetResultAsync();
Assert.Null(result);
// Confirm the codec is in use in history with the activity result
ActivityTaskCompletedEventAttributes? attrs = null;
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
if (evt.ActivityTaskCompletedEventAttributes is { } taskAttrs)
{
attrs = taskAttrs;
break;
}
}
Assert.Equal(
Base64PayloadCodec.EncodingName,
attrs?.Result?.Payloads_?.Single()?.Metadata?.GetValueOrDefault("encoding")?.ToStringUtf8());
},
new TemporalWorkerOptions().AddAllActivities<NullWithCodecWorkflow>(null),
client);
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down
Loading