Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait started per transport #488

Merged
merged 3 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/ConfigSample/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},

"EventBus": {
"WaitTransportStarted": false,
"DefaultTransportWaitStarted": false,
"Naming": {
"Convention": "DotCase",
"UseFullTypeNames": false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Amazon;
using Amazon.Runtime;
using Microsoft.Extensions.Options;
using Tingle.EventBus.Configuration;

namespace Microsoft.Extensions.DependencyInjection;
Expand All @@ -14,7 +15,9 @@ public abstract class AmazonTransportConfigureOptions<TOptions> : EventBusTransp
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
public AmazonTransportConfigureOptions(IEventBusConfigurationProvider configurationProvider) : base(configurationProvider) { }
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AmazonTransportConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override void PostConfigure(string? name, TOptions options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,14 @@ namespace Microsoft.Extensions.DependencyInjection;
/// </summary>
internal class AmazonKinesisConfigureOptions : AmazonTransportConfigureOptions<AmazonKinesisTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="AmazonKinesisConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AmazonKinesisConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override void PostConfigure(string? name, AmazonKinesisTransportOptions options)
Expand All @@ -39,11 +34,11 @@ public override void PostConfigure(string? name, AmazonKinesisTransportOptions o
}

// Ensure the entity names are not longer than the limits
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,14 @@ namespace Microsoft.Extensions.DependencyInjection;
/// </summary>
internal class AmazonSqsConfigureOptions : AmazonTransportConfigureOptions<AmazonSqsTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="AmazonSqsConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AmazonSqsConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override void PostConfigure(string? name, AmazonSqsTransportOptions options)
Expand All @@ -36,11 +31,11 @@ public override void PostConfigure(string? name, AmazonSqsTransportOptions optio
options.SnsConfig.RegionEndpoint ??= options.Region;

// Ensure the entity names are not longer than the limits
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ public abstract class AzureTransportConfigureOptions<TCredential, TOptions> : Ev
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
public AzureTransportConfigureOptions(IEventBusConfigurationProvider configurationProvider) : base(configurationProvider) { }
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AzureTransportConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override ValidateOptionsResult Validate(string? name, TOptions options)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Azure.Messaging.EventHubs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Tingle.EventBus.Configuration;

Expand All @@ -11,19 +10,14 @@ namespace Microsoft.Extensions.DependencyInjection;
internal class AzureEventHubsConfigureOptions : AzureTransportConfigureOptions<AzureEventHubsTransportCredentials, AzureEventHubsTransportOptions>,
IConfigureNamedOptions<AzureEventHubsTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="AzureEventHubsConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AzureEventHubsConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
protected override void Configure(IConfiguration configuration, AzureEventHubsTransportOptions options)
Expand Down Expand Up @@ -76,7 +70,7 @@ public override void PostConfigure(string? name, AzureEventHubsTransportOptions
options.CheckpointInterval = Math.Max(options.CheckpointInterval, 1);

// If there are consumers for this transport, we must check azure blob storage
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
if (registrations.Any(r => r.Consumers.Count > 0))
{
// ensure the connection string for blob storage or token credential is provided
Expand Down Expand Up @@ -106,7 +100,7 @@ public override void PostConfigure(string? name, AzureEventHubsTransportOptions
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,14 @@ namespace Microsoft.Extensions.DependencyInjection;
internal class AzureQueueStorageConfigureOptions : AzureTransportConfigureOptions<AzureQueueStorageTransportCredentials, AzureQueueStorageTransportOptions>,
IConfigureNamedOptions<AzureQueueStorageTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="AzureQueueStorageConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AzureQueueStorageConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
protected override void Configure(IConfiguration configuration, AzureQueueStorageTransportOptions options)
Expand Down Expand Up @@ -57,7 +52,7 @@ public override void PostConfigure(string? name, AzureQueueStorageTransportOptio
}

// Ensure there's only one consumer per event
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
var multiple = registrations.FirstOrDefault(r => r.Consumers.Count > 1);
if (multiple is not null)
{
Expand All @@ -70,7 +65,7 @@ public override void PostConfigure(string? name, AzureQueueStorageTransportOptio
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,14 @@ namespace Microsoft.Extensions.DependencyInjection;
internal class AzureServiceBusConfigureOptions : AzureTransportConfigureOptions<AzureServiceBusTransportCredentials, AzureServiceBusTransportOptions>,
IConfigureNamedOptions<AzureServiceBusTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="AzureServiceBusConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public AzureServiceBusConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
protected override void Configure(IConfiguration configuration, AzureServiceBusTransportOptions options)
Expand Down Expand Up @@ -58,11 +53,11 @@ public override void PostConfigure(string? name, AzureServiceBusTransportOptions

// Ensure the entity names are not longer than the limits
// See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quotas#messaging-quotas
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,26 @@ namespace Microsoft.Extensions.DependencyInjection;
/// </summary>
internal class InMemoryTransportConfigureOptions : EventBusTransportConfigureOptions<InMemoryTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="InMemoryTransportConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>\
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public InMemoryTransportConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override void PostConfigure(string? name, InMemoryTransportOptions options)
{
base.PostConfigure(name, options);
if (name is null) throw new ArgumentNullException(nameof(name));

var registrations = busOptions.GetRegistrations(name);
var registrations = BusOptions.GetRegistrations(name);
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);
Expand Down
11 changes: 3 additions & 8 deletions src/Tingle.EventBus.Transports.Kafka/KafkaConfigureOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,14 @@ namespace Microsoft.Extensions.DependencyInjection;
/// </summary>
internal class KafkaConfigureOptions : EventBusTransportConfigureOptions<KafkaTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="KafkaConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public KafkaConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override void PostConfigure(string? name, KafkaTransportOptions options)
Expand Down Expand Up @@ -53,7 +48,7 @@ public override void PostConfigure(string? name, KafkaTransportOptions options)
options.CheckpointInterval = Math.Max(options.CheckpointInterval, 1);

// ensure there's only one consumer per event
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
var multiple = registrations.FirstOrDefault(r => r.Consumers.Count > 1);
if (multiple is not null)
{
Expand All @@ -66,7 +61,7 @@ public override void PostConfigure(string? name, KafkaTransportOptions options)
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,34 @@ namespace Microsoft.Extensions.DependencyInjection;
/// </summary>
internal class RabbitMqConfigureOptions : EventBusTransportConfigureOptions<RabbitMqTransportOptions>
{
private readonly EventBusOptions busOptions;

/// <summary>
/// Initializes a new <see cref="RabbitMqConfigureOptions"/> given the configuration
/// provided by the <paramref name="configurationProvider"/>.
/// </summary>
/// <param name="configurationProvider">An <see cref="IEventBusConfigurationProvider"/> instance.</param>
/// <param name="busOptionsAccessor">An <see cref="IOptions{TOptions}"/> for bus configuration.</param>\
public RabbitMqConfigureOptions(IEventBusConfigurationProvider configurationProvider, IOptions<EventBusOptions> busOptionsAccessor)
: base(configurationProvider)
{
busOptions = busOptionsAccessor?.Value ?? throw new ArgumentNullException(nameof(busOptionsAccessor));
}
: base(configurationProvider, busOptionsAccessor) { }

/// <inheritdoc/>
public override void PostConfigure(string? name, RabbitMqTransportOptions options)
{
base.PostConfigure(name, options);

// If there are consumers for this transport, confirm the right Bus options
var registrations = busOptions.GetRegistrations(name!);
var registrations = BusOptions.GetRegistrations(name!);
if (registrations.Any(r => r.Consumers.Count > 0))
{
// we need full type names
if (!busOptions.Naming.UseFullTypeNames)
if (!BusOptions.Naming.UseFullTypeNames)
{
throw new NotSupportedException($"When using RabbitMQ transport '{nameof(busOptions.Naming.UseFullTypeNames)}' must be 'true'");
throw new NotSupportedException($"When using RabbitMQ transport '{nameof(BusOptions.Naming.UseFullTypeNames)}' must be 'true'");
}

// consumer names must be suffixed
if (!busOptions.Naming.SuffixConsumerName)
if (!BusOptions.Naming.SuffixConsumerName)
{
throw new NotSupportedException($"When using RabbitMQ transport '{nameof(busOptions.Naming.SuffixConsumerName)}' must be 'true'");
throw new NotSupportedException($"When using RabbitMQ transport '{nameof(BusOptions.Naming.SuffixConsumerName)}' must be 'true'");
}
}

Expand Down Expand Up @@ -77,7 +72,7 @@ public override void PostConfigure(string? name, RabbitMqTransportOptions option
foreach (var reg in registrations)
{
// Set the IdFormat
options.SetEventIdFormat(reg, busOptions);
options.SetEventIdFormat(reg, BusOptions);

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);
Expand Down
Loading