diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs index 125e30a9..b8a850f5 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs @@ -39,14 +39,6 @@ public AmazonKinesisTransport(IServiceScopeFactory serviceScopeFactory, clientConfig: TransportOptions.KinesisConfig); } - /// - public override async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - _ = await kinesisClient.ListStreamsAsync(cancellationToken); - return true; - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs index 3f09a159..f24f646c 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs @@ -54,16 +54,6 @@ public AmazonSqsTransport(IServiceScopeFactory serviceScopeFactory, clientConfig: TransportOptions.SqsConfig); } - /// - public override async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - _ = await snsClient.ListTopicsAsync(cancellationToken); - var prefix = BusOptions.Naming.Scope ?? ""; - _ = await sqsClient.ListQueuesAsync(prefix, cancellationToken); - return true; - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index 8eedcc43..a9d1ac60 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -44,22 +44,6 @@ public AzureEventHubsTransport(IServiceScopeFactory serviceScopeFactory, { } - /// - public override async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - // get properties for the producers - var producers = producersCache.Values; - foreach (var proc in producers) - { - cancellationToken.ThrowIfCancellationRequested(); - - await proc.GetEventHubPropertiesAsync(cancellationToken); - } - - return true; - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs index fbaeff5c..faec5c34 100644 --- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs @@ -45,15 +45,6 @@ public AzureQueueStorageTransport(IServiceScopeFactory serviceScopeFactory, : new QueueServiceClient(connectionString: (string)cred); } - /// - public override async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - /* GetStatisticsAsync fails to work as expected, instead we use GetPropertiesAsync */ - await serviceClient.GetPropertiesAsync(cancellationToken); - return true; - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs index 59286872..949bfb8c 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs @@ -60,30 +60,6 @@ public AzureServiceBusTransport(IServiceScopeFactory serviceScopeFactory, } } - /// - public override async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - Logger.LogDebug("Listing Queues ..."); - var queues = managementClient.GetQueuesRuntimePropertiesAsync(cancellationToken).AsPages(); - await foreach (var _ in queues) ; // there's nothing to do - var registrations = GetRegistrations(); - if (registrations.Any(r => r.EntityKind == EntityKind.Broadcast)) - { - Logger.LogDebug("Listing Topics ..."); - var topics = managementClient.GetTopicsRuntimePropertiesAsync(cancellationToken); - await foreach (var t in topics) - { - cancellationToken.ThrowIfCancellationRequested(); - - Logger.LogDebug("Listing Subscriptions for '{TopicName}' topic ...", t.Name); - var subscriptions = managementClient.GetSubscriptionsRuntimePropertiesAsync(t.Name, cancellationToken); - await foreach (var _ in subscriptions) ; // there's nothing to do - } - } - return true; - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs index 3fc9e459..5ac8932c 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs @@ -65,14 +65,6 @@ public InMemoryTransport(IServiceScopeFactory serviceScopeFactory, /// internal ConcurrentBag Failed => failed; - /// - public override Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - // InMemory is always healthy - return Task.FromResult(true); - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs index 0eee1d62..6423fc9d 100644 --- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs +++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs @@ -69,14 +69,6 @@ public KafkaTransport(IHostEnvironment environment, .Build(); } - /// - public override Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - adminClient.GetMetadata(StandardTimeout); - return Task.FromResult(true); - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs index a5113659..68badc01 100644 --- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs +++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs @@ -56,19 +56,6 @@ public RabbitMqTransport(IServiceScopeFactory serviceScopeFactory, }); } - /// - public override async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - if (!IsConnected) - { - await TryConnectAsync(cancellationToken); - } - - using var channel = connection!.CreateModel(); - return channel.IsOpen; - } - /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Tingle.EventBus/ConstStrings.cs b/src/Tingle.EventBus/ConstStrings.cs deleted file mode 100644 index 7ebd0817..00000000 --- a/src/Tingle.EventBus/ConstStrings.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Tingle.EventBus -{ - internal class ConstStrings - { - public const string HealthChecksObsolete = "Migrate to AspNetCore.Diagnostics.HealthChecks or build custom health checks for your workflow."; - } -} diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs index 168b4876..e714d44c 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; @@ -36,7 +37,11 @@ public EventBusBuilder(IServiceCollection services) Services.AddSingleton(); Services.AddHostedService(p => p.GetRequiredService()); UseDefaultSerializer(); - UseDefaultReadinessProvider(); + + // Register health/readiness services needed + Services.AddSingleton(); + Services.AddSingleton(p => p.GetRequiredService()); + Services.AddSingleton(p => p.GetRequiredService()); } /// diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs index 0952ce8e..60a746e1 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusReadinessOptions.cs @@ -20,13 +20,5 @@ public class EventBusReadinessOptions /// The default value is 5 minutes. Max value is 15 minutes and minimum is 5 seconds. /// public TimeSpan Timeout { get; set; } = TimeSpan.FromMinutes(5); - - /// - /// Whether to exclude the EventBus health checks when checking for readiness - /// in the default implementation of . - /// Defaults to . - /// Setting is useful when using multiple transports. - /// - public bool ExcludeSelf { get; set; } = false; } } diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs index d366a343..0e41f0fe 100644 --- a/src/Tingle.EventBus/EventBus.cs +++ b/src/Tingle.EventBus/EventBus.cs @@ -52,35 +52,6 @@ public EventBus(IReadinessProvider readinessProvider, logger = loggerFactory?.CreateLogger(LogCategoryNames.EventBus) ?? throw new ArgumentNullException(nameof(logger)); } - /// - /// Checks for health of the bus. - /// This function can be used by the Health Checks framework and may throw and exception during execution. - /// - /// Additional key-value pairs describing the health of the bus. - /// - /// A value indicating if the bus is healthy. - [Obsolete(ConstStrings.HealthChecksObsolete)] - public async Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default) - { - var healthy = true; - - // Ensure each transport is healthy - foreach (var t in transports) - { - cancellationToken.ThrowIfCancellationRequested(); - - // Check the health on the transport - var tData = new Dictionary(); - healthy &= await t.CheckHealthAsync(tData, cancellationToken); - - // Combine the data dictionaries into one, keyed by the transport name - data[t.Name] = tData; - } - - return healthy; - } - /// /// Publish an event. /// diff --git a/src/Tingle.EventBus/Health/EventBusHealthCheck.cs b/src/Tingle.EventBus/Health/EventBusHealthCheck.cs deleted file mode 100644 index 82766d15..00000000 --- a/src/Tingle.EventBus/Health/EventBusHealthCheck.cs +++ /dev/null @@ -1,42 +0,0 @@ -using Microsoft.Extensions.Diagnostics.HealthChecks; -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace Tingle.EventBus.Health -{ - /// - /// Implementation of for - /// - [Obsolete(ConstStrings.HealthChecksObsolete)] - public class EventBusHealthCheck : IHealthCheck - { - private readonly EventBus bus; - - /// - /// Creates an instance of - /// - /// The instance of to use. - public EventBusHealthCheck(EventBus bus) - { - this.bus = bus ?? throw new ArgumentNullException(nameof(bus)); - } - - /// - public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) - { - try - { - var data = new Dictionary(); - var healthy = await bus.CheckHealthAsync(data, cancellationToken); - return healthy ? HealthCheckResult.Healthy(data: data) - : HealthCheckResult.Unhealthy(data: data); - } - catch (Exception ex) - { - return new HealthCheckResult(context.Registration.FailureStatus, exception: ex); - } - } - } -} diff --git a/src/Tingle.EventBus/Health/IHealthChecksBuilderExtensions.cs b/src/Tingle.EventBus/Health/IHealthChecksBuilderExtensions.cs deleted file mode 100644 index 9ec41864..00000000 --- a/src/Tingle.EventBus/Health/IHealthChecksBuilderExtensions.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Tingle.EventBus; -using Tingle.EventBus.Health; - -namespace Microsoft.Extensions.DependencyInjection -{ - /// - /// Extension methods on for . - /// - public static class IHealthChecksBuilderExtensions - { - /// - /// Add a health check for the event bus. - /// must be resolvable from the services provider. - /// - /// The . - /// The health check name. - /// The . - [System.Obsolete(ConstStrings.HealthChecksObsolete)] - public static IHealthChecksBuilder AddEventBus(this IHealthChecksBuilder builder, string name = "eventbus") - { - return builder.AddCheck(name, tags: new[] { "eventbus", }); - } - } -} diff --git a/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs b/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs index 4eaa8834..390b7bf5 100644 --- a/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs +++ b/src/Tingle.EventBus/Readiness/DefaultReadinessProvider.cs @@ -11,30 +11,29 @@ namespace Tingle.EventBus.Readiness { - internal class DefaultReadinessProvider : IReadinessProvider + internal class DefaultReadinessProvider : IReadinessProvider, IHealthCheckPublisher { - private readonly IServiceScopeFactory scopeFactory; private readonly EventBusReadinessOptions options; private readonly ILogger logger; - public DefaultReadinessProvider(IServiceScopeFactory scopeFactory, - IOptions optionsAccessor, + private HealthReport? healthReport; + + public DefaultReadinessProvider(IOptions optionsAccessor, ILogger logger) { - this.scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); options = optionsAccessor?.Value?.Readiness ?? throw new ArgumentNullException(nameof(optionsAccessor)); this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public Task IsReadyAsync(CancellationToken cancellationToken = default) - => InternalIsReadyAsync(allowed: null /* no filters */, cancellationToken: cancellationToken); + => Task.FromResult(InternalIsReady(allowed: null /* no filters */)); /// public Task IsReadyAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken = default) - => InternalIsReadyAsync(allowed: ecr.ReadinessTags, cancellationToken: cancellationToken); + => Task.FromResult(InternalIsReady(allowed: ecr.ReadinessTags)); /// public async Task WaitReadyAsync(CancellationToken cancellationToken = default) @@ -47,16 +46,15 @@ public async Task WaitReadyAsync(CancellationToken cancellationToken = default) logger.ReadinessCheck(timeout); using var cts_timeout = new CancellationTokenSource(timeout); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts_timeout.Token); - var ct = cts.Token; var ready = false; try { do { - ready = await InternalIsReadyAsync(allowed: null /* no filters */, cancellationToken: ct); + ready = InternalIsReady(allowed: null /* no filters */); if (!ready) { - await Task.Delay(TimeSpan.FromSeconds(1), ct); // delay for a second + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); // delay for a second } } while (!ready); } @@ -67,7 +65,7 @@ public async Task WaitReadyAsync(CancellationToken cancellationToken = default) } } - private async Task InternalIsReadyAsync(ICollection? allowed, CancellationToken cancellationToken) + private bool InternalIsReady(ICollection? allowed) { // If disabled, do not proceed if (!options.Enabled) @@ -76,33 +74,24 @@ private async Task InternalIsReadyAsync(ICollection? allowed, Canc return true; } - /* - * Simplest implementation is to use the health checks registered in the application. - * - * If the customization is required per event or consumer, the reg and ecr arguments - * will serve that purpose but only in a custom implementation of IReadinessProvider - */ - using var scope = scopeFactory.CreateScope(); - var provider = scope.ServiceProvider; - var hcs = provider.GetService(); - if (hcs != null) - { - bool predicate(HealthCheckRegistration r) => ShouldInclude(registration: r, excludeSelf: options.ExcludeSelf, allowed: allowed); - var report = await hcs.CheckHealthAsync(predicate: predicate, cancellationToken: cancellationToken); - return report.Status == HealthStatus.Healthy; - } + // if the health report is not set, we are not ready. + if (healthReport is null) return false; - return true; + // if the report already says healthy, there is no need to proceed + if (healthReport.Status == HealthStatus.Healthy) return true; + + // at this point, we are not healthy but what we care about might be healthy + // so we filter by tags + var entries = healthReport.Entries.Select(kvp => kvp.Value).Where(e => ShouldInclude(e, allowed)); + return entries.All(e => e.Status == HealthStatus.Healthy); } - private static bool ShouldInclude(HealthCheckRegistration registration, bool excludeSelf, ICollection? allowed) + private static bool ShouldInclude(HealthReportEntry entry, ICollection? allowed) { - if (registration is null) throw new ArgumentNullException(nameof(registration)); - - var r_tags = registration.Tags?.ToList() ?? new List(); + var r_tags = entry.Tags ?? Array.Empty(); // Exclude the bus if configured to do so - if (excludeSelf && r_tags.Contains("eventbus")) return false; + if (r_tags.Contains("eventbus")) return false; /* * If allowed is null, means there is no filtering so we include the registration. @@ -111,5 +100,11 @@ private static bool ShouldInclude(HealthCheckRegistration registration, bool exc */ return allowed == null || allowed.Intersect(r_tags, StringComparer.OrdinalIgnoreCase).Any(); } + + Task IHealthCheckPublisher.PublishAsync(HealthReport report, CancellationToken cancellationToken) + { + Interlocked.Exchange(ref healthReport, report); + return Task.CompletedTask; + } } } diff --git a/src/Tingle.EventBus/Tingle.EventBus.csproj b/src/Tingle.EventBus/Tingle.EventBus.csproj index c47b08da..9ff11b83 100644 --- a/src/Tingle.EventBus/Tingle.EventBus.csproj +++ b/src/Tingle.EventBus/Tingle.EventBus.csproj @@ -9,7 +9,10 @@ - + + + + diff --git a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs index a57c650d..b7270ea9 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs @@ -66,10 +66,6 @@ public EventBusTransportBase(IServiceScopeFactory scopeFactory, /// EventBusTransportOptionsBase IEventBusTransportWithOptions.GetOptions() => TransportOptions; - /// - public abstract Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default); - /// public abstract Task PublishAsync(EventContext @event, EventRegistration registration, diff --git a/src/Tingle.EventBus/Transports/IEventBusTransport.cs b/src/Tingle.EventBus/Transports/IEventBusTransport.cs index 80c7da9e..30c98e70 100644 --- a/src/Tingle.EventBus/Transports/IEventBusTransport.cs +++ b/src/Tingle.EventBus/Transports/IEventBusTransport.cs @@ -17,17 +17,6 @@ public interface IEventBusTransport /// string Name { get; } - /// - /// Checks for health of the transport. - /// This function can be used by the Health Checks framework and may throw and exception during execution. - /// - /// Additional key-value pairs describing the health of the transport. - /// - /// A value indicating if the bus is healthy. - [Obsolete(ConstStrings.HealthChecksObsolete)] - Task CheckHealthAsync(Dictionary data, - CancellationToken cancellationToken = default); - /// /// Publish an event on the transport. ///