Skip to content

Commit

Permalink
Use build ID in the unique DI identifier for workers (#216)
Browse files Browse the repository at this point in the history
Fixes #212
  • Loading branch information
cretz authored Apr 16, 2024
1 parent 070faac commit 740dfa2
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public interface ITemporalWorkerServiceOptionsBuilder
/// </summary>
string TaskQueue { get; }

/// <summary>
/// Gets the build ID for this worker service. If unset, versioning is disabled.
/// </summary>
string? BuildId { get; }

/// <summary>
/// Gets the service collection being configured.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,69 @@ public static class TemporalHostingServiceCollectionExtensions
/// Add a hosted Temporal worker service as a <see cref="IHostedService" /> that contains
/// its own client that connects with the given target and namespace. To use an injected
/// <see cref="ITemporalClient" />, use
/// <see cref="AddHostedTemporalWorker(IServiceCollection, string)" />. The worker service
/// will be registered as a singleton. The result is an options builder that can be used to
/// configure the service.
/// <see cref="AddHostedTemporalWorker(IServiceCollection, string, string?)" />. The worker
/// service will be registered as a singleton. The result is an options builder that can be
/// used to configure the service.
/// </summary>
/// <param name="services">Service collection to create hosted worker on.</param>
/// <param name="clientTargetHost">Client target host to connect to when starting the
/// worker.</param>
/// <param name="clientNamespace">Client namespace to connect to when starting the worker.
/// </param>
/// <param name="taskQueue">Task queue for the worker.</param>
/// <param name="buildId">
/// Build ID for the worker. Set to non-null to opt in to versioning. If versioning is
/// wanted, this must be set here and not later via configure options. This is because the
/// combination of task queue and build ID make up the unique identifier for a worker in the
/// service collection.
/// </param>
/// <returns>Options builder to configure the service.</returns>
public static ITemporalWorkerServiceOptionsBuilder AddHostedTemporalWorker(
this IServiceCollection services,
string clientTargetHost,
string clientNamespace,
string taskQueue) =>
services.AddHostedTemporalWorker(taskQueue).ConfigureOptions(options =>
string taskQueue,
string? buildId = null) =>
services.AddHostedTemporalWorker(taskQueue, buildId).ConfigureOptions(options =>
options.ClientOptions = new(clientTargetHost) { Namespace = clientNamespace });

/// <summary>
/// Add a hosted Temporal worker service as a <see cref="IHostedService" /> that expects
/// an injected <see cref="ITemporalClient" /> (or the returned builder
/// can have client options populated). Use
/// <see cref="AddHostedTemporalWorker(IServiceCollection, string, string, string)" /> to
/// not expect an injected instance and instead connect to a client on worker start. The
/// <see cref="AddHostedTemporalWorker(IServiceCollection, string, string, string, string?)" />
/// to not expect an injected instance and instead connect to a client on worker start. The
/// worker service will be registered as a singleton. The result is an options builder that
/// can be used to configure the service.
/// </summary>
/// <param name="services">Service collection to create hosted worker on.</param>
/// <param name="taskQueue">Task queue for the worker.</param>
/// <param name="buildId">
/// Build ID for the worker. Set to non-null to opt in to versioning. If versioning is
/// wanted, this must be set here and not later via configure options. This is because the
/// combination of task queue and build ID make up the unique identifier for a worker in the
/// service collection.
/// </param>
/// <returns>Options builder to configure the service.</returns>
public static ITemporalWorkerServiceOptionsBuilder AddHostedTemporalWorker(
this IServiceCollection services, string taskQueue)
this IServiceCollection services, string taskQueue, string? buildId = null)
{
// We have to use AddSingleton instead of AddHostedService because the latter does
// not allow us to register multiple of the same type, see
// https://github.com/dotnet/runtime/issues/38751
// https://github.com/dotnet/runtime/issues/38751.
services.AddSingleton<IHostedService>(provider =>
ActivatorUtilities.CreateInstance<TemporalWorkerService>(provider, taskQueue));
return new TemporalWorkerServiceOptionsBuilder(taskQueue, services).ConfigureOptions(
options => options.TaskQueue = taskQueue);
ActivatorUtilities.CreateInstance<TemporalWorkerService>(
provider, (TaskQueue: taskQueue, BuildId: buildId)));
return new TemporalWorkerServiceOptionsBuilder(taskQueue, buildId, services).ConfigureOptions(
options =>
{
options.TaskQueue = taskQueue;
options.BuildId = buildId;
options.UseWorkerVersioning = buildId != null;
},
// Disallow duplicate options registrations because that means multiple worker
// services with the same task queue + build ID were added.
disallowDuplicates: true);
}

/// <summary>
Expand Down
61 changes: 55 additions & 6 deletions src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,75 @@ public TemporalWorkerService(

/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerService"/> class using
/// options and possibly an existing client. This constructor is mostly used by DI
/// containers. The task queue is used as the name on the options monitor to lookup the
/// options for the worker service.
/// options and possibly an existing client. This constructor was used by DI
/// containers and is now DEPRECATED.
/// </summary>
/// <param name="taskQueue">Task queue which is the options name.</param>
/// <param name="taskQueue">Task queue which is included in the options name.</param>
/// <param name="buildId">Build ID which is included in the options name.</param>
/// <param name="optionsMonitor">Used to lookup the options to build the worker with.
/// </param>
/// <param name="existingClient">Existing client to use if the options don't specify
/// client connection options (connected when run if lazy and not connected).</param>
/// <param name="loggerFactory">Logger factory to use if not already on the worker options.
/// The worker options logger factory or this one will be also be used for the client if an
/// existing client does not exist (regardless of client options' logger factory).</param>
[ActivatorUtilitiesConstructor]
[Obsolete("Deprecated older form of DI constructor, task queue + build ID tuple one is used instead.")]
public TemporalWorkerService(
string taskQueue,
string? buildId,
IOptionsMonitor<TemporalWorkerServiceOptions> optionsMonitor,
ITemporalClient? existingClient = null,
ILoggerFactory? loggerFactory = null)
: this((taskQueue, buildId), optionsMonitor, existingClient, loggerFactory)
{
var options = (TemporalWorkerServiceOptions)optionsMonitor.Get(taskQueue).Clone();
}

/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerService"/> class using
/// options and possibly an existing client. This constructor is only for use by DI
/// containers. The task queue and build ID are used as the name for the options monitor to
/// lookup the options for the worker service.
/// </summary>
/// <param name="taskQueueAndBuildId">Task queue and build ID for the options name.</param>
/// <param name="optionsMonitor">Used to lookup the options to build the worker with.
/// </param>
/// <param name="existingClient">Existing client to use if the options don't specify
/// client connection options (connected when run if lazy and not connected).</param>
/// <param name="loggerFactory">Logger factory to use if not already on the worker options.
/// The worker options logger factory or this one will be also be used for the client if an
/// existing client does not exist (regardless of client options' logger factory).</param>
/// <remarks>
/// WARNING: Do not rely on the signature of this constructor, it is for DI container use
/// only and may change in incompatible ways.
/// </remarks>
[ActivatorUtilitiesConstructor]
public TemporalWorkerService(
(string TaskQueue, string? BuildId) taskQueueAndBuildId,
IOptionsMonitor<TemporalWorkerServiceOptions> optionsMonitor,
ITemporalClient? existingClient = null,
ILoggerFactory? loggerFactory = null)
{
var options = (TemporalWorkerServiceOptions)optionsMonitor.Get(
TemporalWorkerServiceOptions.GetUniqueOptionsName(
taskQueueAndBuildId.TaskQueue, taskQueueAndBuildId.BuildId)).Clone();

// Make sure options values match the ones given in constructor
if (options.TaskQueue != taskQueueAndBuildId.TaskQueue)
{
throw new InvalidOperationException(
$"Task queue '{taskQueueAndBuildId.TaskQueue}' on constructor different than '{options.TaskQueue}' on options");
}
if (options.BuildId != taskQueueAndBuildId.BuildId)
{
throw new InvalidOperationException(
$"Build ID '{taskQueueAndBuildId.BuildId ?? "<unset>"}' on constructor different than '{options.BuildId ?? "<unset>"}' on options");
}
if (options.UseWorkerVersioning != (taskQueueAndBuildId.BuildId != null))
{
throw new InvalidOperationException(
$"Use versioning option is {options.UseWorkerVersioning}, but constructor expects different");
}

newClientOptions = options.ClientOptions;
if (newClientOptions == null)
{
Expand Down
15 changes: 15 additions & 0 deletions src/Temporalio.Extensions.Hosting/TemporalWorkerServiceOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,20 @@ public override object Clone()
}
return options;
}

/// <summary>
/// Get an options name for the given task queue and build ID.
/// </summary>
/// <param name="taskQueue">Task queue.</param>
/// <param name="buildId">Build ID.</param>
/// <returns>Unique string name for the options.</returns>
internal static string GetUniqueOptionsName(string taskQueue, string? buildId)
{
if (buildId == null)
{
return taskQueue;
}
return taskQueue + "!!__temporal__!!" + buildId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,30 @@ public class TemporalWorkerServiceOptionsBuilder : ITemporalWorkerServiceOptions
/// <param name="taskQueue">Task queue for the worker.</param>
/// <param name="services">Service collection being configured.</param>
public TemporalWorkerServiceOptionsBuilder(string taskQueue, IServiceCollection services)
: this(taskQueue, null, services)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerServiceOptionsBuilder" />
/// class.
/// </summary>
/// <param name="taskQueue">Task queue for the worker.</param>
/// <param name="buildId">Build ID for the worker.</param>
/// <param name="services">Service collection being configured.</param>
public TemporalWorkerServiceOptionsBuilder(string taskQueue, string? buildId, IServiceCollection services)
{
TaskQueue = taskQueue;
BuildId = buildId;
Services = services;
}

/// <inheritdoc />
public string TaskQueue { get; private init; }

/// <inheritdoc />
public string? BuildId { get; private init; }

/// <inheritdoc />
public IServiceCollection Services { get; private init; }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -167,22 +168,46 @@ public static ITemporalWorkerServiceOptionsBuilder AddWorkflow(
/// Get an options builder to configure worker service options.
/// </summary>
/// <param name="builder">Builder to use.</param>
/// <param name="disallowDuplicates">If true, will fail if options already registered for
/// this builder's task queue and build ID.</param>
/// <returns>Options builder.</returns>
public static OptionsBuilder<TemporalWorkerServiceOptions> ConfigureOptions(
this ITemporalWorkerServiceOptionsBuilder builder) =>
builder.Services.AddOptions<TemporalWorkerServiceOptions>(builder.TaskQueue);
this ITemporalWorkerServiceOptionsBuilder builder,
bool disallowDuplicates = false)
{
// To ensure the user isn't accidentally registering a duplicate task queue + build ID
// worker, we check here that there aren't duplicate options
var optionsName = TemporalWorkerServiceOptions.GetUniqueOptionsName(
builder.TaskQueue, builder.BuildId);
if (disallowDuplicates)
{
var any = builder.Services.Any(s =>
s.ImplementationInstance is ConfigureNamedOptions<TemporalWorkerServiceOptions> instance &&
instance.Name == optionsName);
if (any)
{
throw new InvalidOperationException(
$"Worker service for task queue '{builder.TaskQueue}' and build ID '{builder.BuildId ?? "<unset>"}' already on collection");
}
}

return builder.Services.AddOptions<TemporalWorkerServiceOptions>(optionsName);
}

/// <summary>
/// Configure worker service options using an action.
/// </summary>
/// <param name="builder">Builder to use.</param>
/// <param name="configure">Configuration action.</param>
/// <param name="disallowDuplicates">If true, will fail if options already registered for
/// this builder's task queue and build ID.</param>
/// <returns>Same builder instance.</returns>
public static ITemporalWorkerServiceOptionsBuilder ConfigureOptions(
this ITemporalWorkerServiceOptionsBuilder builder,
Action<TemporalWorkerServiceOptions> configure)
Action<TemporalWorkerServiceOptions> configure,
bool disallowDuplicates = false)
{
builder.ConfigureOptions().Configure(configure);
builder.ConfigureOptions(disallowDuplicates).Configure(configure);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,72 @@ public async Task TemporalWorkerService_ExecuteAsync_MultipleWorkers()
},
result);
}

[Workflow("Workflow")]
public class WorkflowV1
{
[WorkflowRun]
public async Task<string> RunAsync() => "done-v1";
}

[Workflow("Workflow")]
public class WorkflowV2
{
[WorkflowRun]
public async Task<string> RunAsync() => "done-v2";
}

[Fact]
public async Task TemporalWorkerService_ExecuteAsync_MultipleVersionsSameQueue()
{
var taskQueue = $"tq-{Guid.NewGuid()}";
// Build with two workers on same queue but different versions
var bld = Host.CreateApplicationBuilder();
bld.Services.AddSingleton(Client);
bld.Services.
AddHostedTemporalWorker(taskQueue, "1.0").
AddWorkflow<WorkflowV1>();
bld.Services.
AddHostedTemporalWorker(taskQueue, "2.0").
AddWorkflow<WorkflowV2>();

// Start the host
using var tokenSource = new CancellationTokenSource();
using var host = bld.Build();
var hostTask = Task.Run(() => host.RunAsync(tokenSource.Token));

// Set 1.0 as default and run
await Env.Client.UpdateWorkerBuildIdCompatibilityAsync(
taskQueue, new BuildIdOp.AddNewDefault("1.0"));
var res = await Client.ExecuteWorkflowAsync(
(WorkflowV1 wf) => wf.RunAsync(),
new($"wf-{Guid.NewGuid()}", taskQueue));
Assert.Equal("done-v1", res);

// Update default and run again
await Env.Client.UpdateWorkerBuildIdCompatibilityAsync(
taskQueue, new BuildIdOp.AddNewDefault("2.0"));
res = await Client.ExecuteWorkflowAsync(
(WorkflowV1 wf) => wf.RunAsync(),
new($"wf-{Guid.NewGuid()}", taskQueue));
Assert.Equal("done-v2", res);
}

[Fact]
public async Task TemporalWorkerService_ExecuteAsync_DuplicateQueue()
{
var taskQueue = $"tq-{Guid.NewGuid()}";
// Build with two workers on same queue but different versions
var bld = Host.CreateApplicationBuilder();
bld.Services.AddSingleton(Client);
bld.Services.
AddHostedTemporalWorker(taskQueue).
AddWorkflow<WorkflowV1>();
var exc = Assert.Throws<InvalidOperationException>(() =>
bld.Services.
AddHostedTemporalWorker(taskQueue).
AddWorkflow<WorkflowV2>());
Assert.StartsWith("Worker service", exc.Message);
Assert.EndsWith("already on collection", exc.Message);
}
}

0 comments on commit 740dfa2

Please sign in to comment.