Skip to content

Commit

Permalink
Added general global bookmark provider for all KafkaMessageReceivedOv…
Browse files Browse the repository at this point in the history
…erride Triggers (#3807)

Co-authored-by: Yannick Laubscher <[email protected]>
  • Loading branch information
Snotax and yannicklaubscherswt authored Mar 20, 2023
1 parent 580fef3 commit 31575ac
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Elsa.Activities.Kafka.Activities.KafkaMessageReceived;
using Elsa.Activities.Kafka.Configuration;
using Elsa.Services;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.Activities.Kafka.Bookmarks
{
/// <summary>
/// General Bookmark provider for all KafkaMessageReceived override activities.
/// </summary>
internal class OverrideKafkaBookmarkProvider : BookmarkProvider<Elsa.Activities.Kafka.Bookmarks.MessageReceivedBookmark, KafkaMessageReceived>
{

private readonly IKafkaCustomActivityProvider _customActivityProvider;
public OverrideKafkaBookmarkProvider(IKafkaCustomActivityProvider customActivityProvider)
{
_customActivityProvider = customActivityProvider;
}

public override bool SupportsActivity(BookmarkProviderContext<KafkaMessageReceived> context)
{
if (_customActivityProvider != null &&
_customActivityProvider.KafkaOverrideTriggers != null
&& _customActivityProvider.KafkaOverrideTriggers.Contains(context.ActivityExecutionContext.ActivityBlueprint.Type))
{
return true;
}
else
{
return base.SupportsActivity(context);
}
}

public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<KafkaMessageReceived> context, CancellationToken cancellationToken) =>
new[]
{
Result(
new Elsa.Activities.Kafka.Bookmarks.MessageReceivedBookmark(
topic: (await context.ReadActivityPropertyAsync(x => x.Topic, cancellationToken))!,
group: (await context.ReadActivityPropertyAsync(x => x.Group, cancellationToken))!,
connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!,
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken) ?? new Dictionary<string, string>())!,
autoOffsetReset: Enum.Parse<Confluent.Kafka.AutoOffsetReset>(await context.ReadActivityPropertyAsync(x => x.AutoOffsetReset, cancellationToken) ?? ((int)Confluent.Kafka.AutoOffsetReset.Earliest).ToString())!
))
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public static ElsaOptionsBuilder AddKafkaActivities(this ElsaOptionsBuilder opti
.AddHostedService<StartKafkaQueues>()
.AddSingleton<IKafkaTenantIdResolver, DefaultKafkaTenantIdResolver>()
.AddSingleton<IKafkaCustomActivityProvider, KafkaCustomActivityProvider>()
.AddBookmarkProvider<OverrideKafkaBookmarkProvider>()
.AddBookmarkProvider<QueueMessageReceivedBookmarkProvider>();

options.AddPubSubConsumer<UpdateWorkers, TriggerIndexingFinished>("WorkflowManagementEvents");
Expand Down

0 comments on commit 31575ac

Please sign in to comment.