From 88b5d254d44d12dc5eca86353fc7daaea3130353 Mon Sep 17 00:00:00 2001 From: Maxwell Weru Date: Sun, 7 Aug 2022 13:58:26 +0300 Subject: [PATCH] Change EventBus to be independent of host --- .../DependencyInjection/EventBusBuilder.cs | 6 +- src/Tingle.EventBus/EventBus.cs | 52 ++++------------ src/Tingle.EventBus/EventBusHost.cs | 62 +++++++++++++++++++ .../Extensions/ILoggerExtensions.cs | 3 + 4 files changed, 81 insertions(+), 42 deletions(-) create mode 100644 src/Tingle.EventBus/EventBusHost.cs diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs index 671c66cc..a7267e0e 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs @@ -27,10 +27,12 @@ public EventBusBuilder(IServiceCollection services) Services.AddSingleton(); Services.AddSingleton(); + // Register the bus and its host + Services.AddSingleton(); + Services.AddHostedService(); + // Register necessary services Services.AddTransient(); - Services.AddSingleton(); - Services.AddHostedService(p => p.GetRequiredService()); UseDefaultSerializer(); // Register health/readiness services needed diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs index 9934f4af..23513293 100644 --- a/src/Tingle.EventBus/EventBus.cs +++ b/src/Tingle.EventBus/EventBus.cs @@ -1,5 +1,4 @@ using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Diagnostics; @@ -12,11 +11,10 @@ namespace Tingle.EventBus; /// -/// The abstractions for an event bus +/// The event bus /// -public class EventBus : BackgroundService +public class EventBus { - private readonly IHostApplicationLifetime lifetime; private readonly IReadinessProvider readinessProvider; private readonly IEventIdGenerator idGenerator; private readonly IList transports; @@ -27,22 +25,19 @@ public class EventBus : BackgroundService /// /// /// - /// /// /// /// /// /// /// - public EventBus(IHostApplicationLifetime lifetime, - IReadinessProvider readinessProvider, + public EventBus(IReadinessProvider readinessProvider, IEventIdGenerator idGenerator, IEnumerable transports, IEnumerable configurators, IOptions optionsAccessor, ILoggerFactory loggerFactory) { - this.lifetime = lifetime ?? throw new ArgumentNullException(nameof(lifetime)); this.readinessProvider = readinessProvider ?? throw new ArgumentNullException(nameof(readinessProvider)); this.idGenerator = idGenerator ?? throw new ArgumentNullException(nameof(idGenerator)); this.configurators = configurators?.ToList() ?? throw new ArgumentNullException(nameof(configurators)); @@ -205,15 +200,9 @@ public async Task CancelAsync(IList ids, CancellationToken cance await transport.CancelAsync(ids: ids, registration: reg, cancellationToken: cancellationToken); } - /// - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + /// + public async Task StartAsync(CancellationToken cancellationToken) { - if (!await WaitForAppStartupAsync(lifetime, stoppingToken).ConfigureAwait(false)) - { - logger.ApplicationDidNotStartup(); - return; - } - // If a startup delay has been specified, apply it var delay = options.StartupDelay; if (delay != null && delay > TimeSpan.Zero) @@ -223,8 +212,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { logger.DelayedBusStartup(delay.Value); - await Task.Delay(delay.Value, stoppingToken); - await StartTransportsAsync(stoppingToken); + await Task.Delay(delay.Value, cancellationToken); + await StartTransportsAsync(cancellationToken); } catch (Exception ex) when (!(ex is OperationCanceledException || ex is TaskCanceledException)) // skip operation cancel @@ -235,25 +224,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) else { // Without a delay, just start the transports directly - await StartTransportsAsync(stoppingToken); + await StartTransportsAsync(cancellationToken); } } - private static async Task WaitForAppStartupAsync(IHostApplicationLifetime lifetime, CancellationToken stoppingToken) - { - var startedTcs = new TaskCompletionSource(); - var cancelledTcs = new TaskCompletionSource(); - - // register result setting using the cancellation tokens - lifetime.ApplicationStarted.Register(() => startedTcs.SetResult(new { })); - stoppingToken.Register(() => cancelledTcs.SetResult(new { })); - - var completedTask = await Task.WhenAny(startedTcs.Task, cancelledTcs.Task).ConfigureAwait(false); - - // if the completed task was the "app started" one, return true - return completedTask == startedTcs.Task; - } - private async Task StartTransportsAsync(CancellationToken cancellationToken) { if (options.Readiness.Enabled) @@ -279,13 +253,11 @@ private async Task StartTransportsAsync(CancellationToken cancellationToken) } } - /// - public override async Task StopAsync(CancellationToken cancellationToken) + /// + public async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); - - // Stop the bus and its transports in parallel - logger.StoppingBus(); + // Stop the transports in parallel + logger.StoppingTransports(); var tasks = transports.Select(t => t.StopAsync(cancellationToken)); await Task.WhenAll(tasks); } diff --git a/src/Tingle.EventBus/EventBusHost.cs b/src/Tingle.EventBus/EventBusHost.cs new file mode 100644 index 00000000..e6d741ff --- /dev/null +++ b/src/Tingle.EventBus/EventBusHost.cs @@ -0,0 +1,62 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Tingle.EventBus; + +/// +/// Host for . +/// +internal class EventBusHost : BackgroundService +{ + private readonly IHostApplicationLifetime lifetime; + private readonly EventBus bus; + private readonly ILogger logger; + + /// + /// + /// + /// + /// + /// + public EventBusHost(IHostApplicationLifetime lifetime, EventBus bus, ILogger logger) + { + this.lifetime = lifetime ?? throw new ArgumentNullException(nameof(lifetime)); + this.bus = bus ?? throw new ArgumentNullException(nameof(bus)); + this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!await WaitForAppStartupAsync(lifetime, stoppingToken).ConfigureAwait(false)) + { + logger.ApplicationDidNotStartup(); + return; + } + + await bus.StartAsync(stoppingToken); + } + + /// + public override async Task StopAsync(CancellationToken cancellationToken) + { + logger.StoppingBus(); + await base.StopAsync(cancellationToken); + await bus.StopAsync(cancellationToken); + } + + private static async Task WaitForAppStartupAsync(IHostApplicationLifetime lifetime, CancellationToken stoppingToken) + { + var startedTcs = new TaskCompletionSource(); + var cancelledTcs = new TaskCompletionSource(); + + // register result setting using the cancellation tokens + lifetime.ApplicationStarted.Register(() => startedTcs.SetResult(new { })); + stoppingToken.Register(() => cancelledTcs.SetResult(new { })); + + var completedTask = await Task.WhenAny(startedTcs.Task, cancelledTcs.Task).ConfigureAwait(false); + + // if the completed task was the "app started" one, return true + return completedTask == startedTcs.Task; + } +} diff --git a/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs b/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs index e8118d90..c9e9d6c3 100644 --- a/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs +++ b/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs @@ -31,6 +31,9 @@ internal static partial class ILoggerExtensions [LoggerMessage(107, LogLevel.Debug, "Stopping bus.")] public static partial void StoppingBus(this ILogger logger); + [LoggerMessage(108, LogLevel.Debug, "Stopping transports.")] + public static partial void StoppingTransports(this ILogger logger); + #endregion #region Transports (200 series)