diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs
index 125e30a9..b8a850f5 100644
--- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs
+++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs
@@ -39,14 +39,6 @@ public AmazonKinesisTransport(IServiceScopeFactory serviceScopeFactory,
clientConfig: TransportOptions.KinesisConfig);
}
- ///
- public override async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- _ = await kinesisClient.ListStreamsAsync(cancellationToken);
- return true;
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
index 3f09a159..f24f646c 100644
--- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
+++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
@@ -54,16 +54,6 @@ public AmazonSqsTransport(IServiceScopeFactory serviceScopeFactory,
clientConfig: TransportOptions.SqsConfig);
}
- ///
- public override async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- _ = await snsClient.ListTopicsAsync(cancellationToken);
- var prefix = BusOptions.Naming.Scope ?? "";
- _ = await sqsClient.ListQueuesAsync(prefix, cancellationToken);
- return true;
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs
index 8eedcc43..a9d1ac60 100644
--- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs
+++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs
@@ -44,22 +44,6 @@ public AzureEventHubsTransport(IServiceScopeFactory serviceScopeFactory,
{
}
- ///
- public override async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- // get properties for the producers
- var producers = producersCache.Values;
- foreach (var proc in producers)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- await proc.GetEventHubPropertiesAsync(cancellationToken);
- }
-
- return true;
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs
index fbaeff5c..faec5c34 100644
--- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs
+++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs
@@ -45,15 +45,6 @@ public AzureQueueStorageTransport(IServiceScopeFactory serviceScopeFactory,
: new QueueServiceClient(connectionString: (string)cred);
}
- ///
- public override async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- /* GetStatisticsAsync fails to work as expected, instead we use GetPropertiesAsync */
- await serviceClient.GetPropertiesAsync(cancellationToken);
- return true;
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs
index 59286872..949bfb8c 100644
--- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs
+++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs
@@ -60,30 +60,6 @@ public AzureServiceBusTransport(IServiceScopeFactory serviceScopeFactory,
}
}
- ///
- public override async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- Logger.LogDebug("Listing Queues ...");
- var queues = managementClient.GetQueuesRuntimePropertiesAsync(cancellationToken).AsPages();
- await foreach (var _ in queues) ; // there's nothing to do
- var registrations = GetRegistrations();
- if (registrations.Any(r => r.EntityKind == EntityKind.Broadcast))
- {
- Logger.LogDebug("Listing Topics ...");
- var topics = managementClient.GetTopicsRuntimePropertiesAsync(cancellationToken);
- await foreach (var t in topics)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- Logger.LogDebug("Listing Subscriptions for '{TopicName}' topic ...", t.Name);
- var subscriptions = managementClient.GetSubscriptionsRuntimePropertiesAsync(t.Name, cancellationToken);
- await foreach (var _ in subscriptions) ; // there's nothing to do
- }
- }
- return true;
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
index 3fc9e459..5ac8932c 100644
--- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
@@ -65,14 +65,6 @@ public InMemoryTransport(IServiceScopeFactory serviceScopeFactory,
///
internal ConcurrentBag Failed => failed;
- ///
- public override Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- // InMemory is always healthy
- return Task.FromResult(true);
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
index 0eee1d62..6423fc9d 100644
--- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
+++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
@@ -69,14 +69,6 @@ public KafkaTransport(IHostEnvironment environment,
.Build();
}
- ///
- public override Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- adminClient.GetMetadata(StandardTimeout);
- return Task.FromResult(true);
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
index a5113659..68badc01 100644
--- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
+++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
@@ -56,19 +56,6 @@ public RabbitMqTransport(IServiceScopeFactory serviceScopeFactory,
});
}
- ///
- public override async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- if (!IsConnected)
- {
- await TryConnectAsync(cancellationToken);
- }
-
- using var channel = connection!.CreateModel();
- return channel.IsOpen;
- }
-
///
public override async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/src/Tingle.EventBus/ConstStrings.cs b/src/Tingle.EventBus/ConstStrings.cs
deleted file mode 100644
index 7ebd0817..00000000
--- a/src/Tingle.EventBus/ConstStrings.cs
+++ /dev/null
@@ -1,11 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace Tingle.EventBus
-{
- internal class ConstStrings
- {
- public const string HealthChecksObsolete = "Migrate to AspNetCore.Diagnostics.HealthChecks or build custom health checks for your workflow.";
- }
-}
diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
index 168b4876..e714d44c 100644
--- a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
+++ b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
@@ -36,7 +37,11 @@ public EventBusBuilder(IServiceCollection services)
Services.AddSingleton();
Services.AddHostedService(p => p.GetRequiredService());
UseDefaultSerializer();
- UseDefaultReadinessProvider();
+
+ // Register health/readiness services needed
+ Services.AddSingleton();
+ Services.AddSingleton(p => p.GetRequiredService());
+ Services.AddSingleton(p => p.GetRequiredService());
}
///
diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs
index 0952ce8e..60a746e1 100644
--- a/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs
+++ b/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs
@@ -20,13 +20,5 @@ public class EventBusReadinessOptions
/// The default value is 5 minutes. Max value is 15 minutes and minimum is 5 seconds.
///
public TimeSpan Timeout { get; set; } = TimeSpan.FromMinutes(5);
-
- ///
- /// Whether to exclude the EventBus health checks when checking for readiness
- /// in the default implementation of .
- /// Defaults to .
- /// Setting is useful when using multiple transports.
- ///
- public bool ExcludeSelf { get; set; } = false;
}
}
diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs
index d366a343..0e41f0fe 100644
--- a/src/Tingle.EventBus/EventBus.cs
+++ b/src/Tingle.EventBus/EventBus.cs
@@ -52,35 +52,6 @@ public EventBus(IReadinessProvider readinessProvider,
logger = loggerFactory?.CreateLogger(LogCategoryNames.EventBus) ?? throw new ArgumentNullException(nameof(logger));
}
- ///
- /// Checks for health of the bus.
- /// This function can be used by the Health Checks framework and may throw and exception during execution.
- ///
- /// Additional key-value pairs describing the health of the bus.
- ///
- /// A value indicating if the bus is healthy.
- [Obsolete(ConstStrings.HealthChecksObsolete)]
- public async Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default)
- {
- var healthy = true;
-
- // Ensure each transport is healthy
- foreach (var t in transports)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- // Check the health on the transport
- var tData = new Dictionary();
- healthy &= await t.CheckHealthAsync(tData, cancellationToken);
-
- // Combine the data dictionaries into one, keyed by the transport name
- data[t.Name] = tData;
- }
-
- return healthy;
- }
-
///
/// Publish an event.
///
diff --git a/src/Tingle.EventBus/Health/EventBusHealthCheck.cs b/src/Tingle.EventBus/Health/EventBusHealthCheck.cs
deleted file mode 100644
index 82766d15..00000000
--- a/src/Tingle.EventBus/Health/EventBusHealthCheck.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-using Microsoft.Extensions.Diagnostics.HealthChecks;
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Tingle.EventBus.Health
-{
- ///
- /// Implementation of for
- ///
- [Obsolete(ConstStrings.HealthChecksObsolete)]
- public class EventBusHealthCheck : IHealthCheck
- {
- private readonly EventBus bus;
-
- ///
- /// Creates an instance of
- ///
- /// The instance of to use.
- public EventBusHealthCheck(EventBus bus)
- {
- this.bus = bus ?? throw new ArgumentNullException(nameof(bus));
- }
-
- ///
- public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
- {
- try
- {
- var data = new Dictionary();
- var healthy = await bus.CheckHealthAsync(data, cancellationToken);
- return healthy ? HealthCheckResult.Healthy(data: data)
- : HealthCheckResult.Unhealthy(data: data);
- }
- catch (Exception ex)
- {
- return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
- }
- }
- }
-}
diff --git a/src/Tingle.EventBus/Health/IHealthChecksBuilderExtensions.cs b/src/Tingle.EventBus/Health/IHealthChecksBuilderExtensions.cs
deleted file mode 100644
index 9ec41864..00000000
--- a/src/Tingle.EventBus/Health/IHealthChecksBuilderExtensions.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-using Tingle.EventBus;
-using Tingle.EventBus.Health;
-
-namespace Microsoft.Extensions.DependencyInjection
-{
- ///
- /// Extension methods on for .
- ///
- public static class IHealthChecksBuilderExtensions
- {
- ///
- /// Add a health check for the event bus.
- /// must be resolvable from the services provider.
- ///
- /// The .
- /// The health check name.
- /// The .
- [System.Obsolete(ConstStrings.HealthChecksObsolete)]
- public static IHealthChecksBuilder AddEventBus(this IHealthChecksBuilder builder, string name = "eventbus")
- {
- return builder.AddCheck(name, tags: new[] { "eventbus", });
- }
- }
-}
diff --git a/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs b/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs
index 4eaa8834..390b7bf5 100644
--- a/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs
+++ b/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs
@@ -11,30 +11,29 @@
namespace Tingle.EventBus.Readiness
{
- internal class DefaultReadinessProvider : IReadinessProvider
+ internal class DefaultReadinessProvider : IReadinessProvider, IHealthCheckPublisher
{
- private readonly IServiceScopeFactory scopeFactory;
private readonly EventBusReadinessOptions options;
private readonly ILogger logger;
- public DefaultReadinessProvider(IServiceScopeFactory scopeFactory,
- IOptions optionsAccessor,
+ private HealthReport? healthReport;
+
+ public DefaultReadinessProvider(IOptions optionsAccessor,
ILogger logger)
{
- this.scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
options = optionsAccessor?.Value?.Readiness ?? throw new ArgumentNullException(nameof(optionsAccessor));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
public Task IsReadyAsync(CancellationToken cancellationToken = default)
- => InternalIsReadyAsync(allowed: null /* no filters */, cancellationToken: cancellationToken);
+ => Task.FromResult(InternalIsReady(allowed: null /* no filters */));
///
public Task IsReadyAsync(EventRegistration reg,
EventConsumerRegistration ecr,
CancellationToken cancellationToken = default)
- => InternalIsReadyAsync(allowed: ecr.ReadinessTags, cancellationToken: cancellationToken);
+ => Task.FromResult(InternalIsReady(allowed: ecr.ReadinessTags));
///
public async Task WaitReadyAsync(CancellationToken cancellationToken = default)
@@ -47,16 +46,15 @@ public async Task WaitReadyAsync(CancellationToken cancellationToken = default)
logger.ReadinessCheck(timeout);
using var cts_timeout = new CancellationTokenSource(timeout);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts_timeout.Token);
- var ct = cts.Token;
var ready = false;
try
{
do
{
- ready = await InternalIsReadyAsync(allowed: null /* no filters */, cancellationToken: ct);
+ ready = InternalIsReady(allowed: null /* no filters */);
if (!ready)
{
- await Task.Delay(TimeSpan.FromSeconds(1), ct); // delay for a second
+ await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); // delay for a second
}
} while (!ready);
}
@@ -67,7 +65,7 @@ public async Task WaitReadyAsync(CancellationToken cancellationToken = default)
}
}
- private async Task InternalIsReadyAsync(ICollection? allowed, CancellationToken cancellationToken)
+ private bool InternalIsReady(ICollection? allowed)
{
// If disabled, do not proceed
if (!options.Enabled)
@@ -76,33 +74,24 @@ private async Task InternalIsReadyAsync(ICollection? allowed, Canc
return true;
}
- /*
- * Simplest implementation is to use the health checks registered in the application.
- *
- * If the customization is required per event or consumer, the reg and ecr arguments
- * will serve that purpose but only in a custom implementation of IReadinessProvider
- */
- using var scope = scopeFactory.CreateScope();
- var provider = scope.ServiceProvider;
- var hcs = provider.GetService();
- if (hcs != null)
- {
- bool predicate(HealthCheckRegistration r) => ShouldInclude(registration: r, excludeSelf: options.ExcludeSelf, allowed: allowed);
- var report = await hcs.CheckHealthAsync(predicate: predicate, cancellationToken: cancellationToken);
- return report.Status == HealthStatus.Healthy;
- }
+ // if the health report is not set, we are not ready.
+ if (healthReport is null) return false;
- return true;
+ // if the report already says healthy, there is no need to proceed
+ if (healthReport.Status == HealthStatus.Healthy) return true;
+
+ // at this point, we are not healthy but what we care about might be healthy
+ // so we filter by tags
+ var entries = healthReport.Entries.Select(kvp => kvp.Value).Where(e => ShouldInclude(e, allowed));
+ return entries.All(e => e.Status == HealthStatus.Healthy);
}
- private static bool ShouldInclude(HealthCheckRegistration registration, bool excludeSelf, ICollection? allowed)
+ private static bool ShouldInclude(HealthReportEntry entry, ICollection? allowed)
{
- if (registration is null) throw new ArgumentNullException(nameof(registration));
-
- var r_tags = registration.Tags?.ToList() ?? new List();
+ var r_tags = entry.Tags ?? Array.Empty();
// Exclude the bus if configured to do so
- if (excludeSelf && r_tags.Contains("eventbus")) return false;
+ if (r_tags.Contains("eventbus")) return false;
/*
* If allowed is null, means there is no filtering so we include the registration.
@@ -111,5 +100,11 @@ private static bool ShouldInclude(HealthCheckRegistration registration, bool exc
*/
return allowed == null || allowed.Intersect(r_tags, StringComparer.OrdinalIgnoreCase).Any();
}
+
+ Task IHealthCheckPublisher.PublishAsync(HealthReport report, CancellationToken cancellationToken)
+ {
+ Interlocked.Exchange(ref healthReport, report);
+ return Task.CompletedTask;
+ }
}
}
diff --git a/src/Tingle.EventBus/Tingle.EventBus.csproj b/src/Tingle.EventBus/Tingle.EventBus.csproj
index c47b08da..9ff11b83 100644
--- a/src/Tingle.EventBus/Tingle.EventBus.csproj
+++ b/src/Tingle.EventBus/Tingle.EventBus.csproj
@@ -9,7 +9,10 @@
-
+
+
+
+
diff --git a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs
index a57c650d..b7270ea9 100644
--- a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs
+++ b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs
@@ -66,10 +66,6 @@ public EventBusTransportBase(IServiceScopeFactory scopeFactory,
///
EventBusTransportOptionsBase IEventBusTransportWithOptions.GetOptions() => TransportOptions;
- ///
- public abstract Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default);
-
///
public abstract Task PublishAsync(EventContext @event,
EventRegistration registration,
diff --git a/src/Tingle.EventBus/Transports/IEventBusTransport.cs b/src/Tingle.EventBus/Transports/IEventBusTransport.cs
index 80c7da9e..30c98e70 100644
--- a/src/Tingle.EventBus/Transports/IEventBusTransport.cs
+++ b/src/Tingle.EventBus/Transports/IEventBusTransport.cs
@@ -17,17 +17,6 @@ public interface IEventBusTransport
///
string Name { get; }
- ///
- /// Checks for health of the transport.
- /// This function can be used by the Health Checks framework and may throw and exception during execution.
- ///
- /// Additional key-value pairs describing the health of the transport.
- ///
- /// A value indicating if the bus is healthy.
- [Obsolete(ConstStrings.HealthChecksObsolete)]
- Task CheckHealthAsync(Dictionary data,
- CancellationToken cancellationToken = default);
-
///
/// Publish an event on the transport.
///