diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageEventBus.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageEventBus.cs index b4bed2e3..213bb60c 100644 --- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageEventBus.cs +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageEventBus.cs @@ -60,6 +60,7 @@ public override async Task CheckHealthAsync(CancellationToken cancellation public override Task StartAsync(CancellationToken cancellationToken) { var registrations = BusOptions.GetConsumerRegistrations(); + logger.StartingBus(registrations.Count); foreach (var reg in registrations) { _ = ReceiveAsync(reg); @@ -71,6 +72,7 @@ public override Task StartAsync(CancellationToken cancellationToken) /// public override Task StopAsync(CancellationToken cancellationToken) { + logger.StoppingBus(); receiveCancellationTokenSource.Cancel(); // TODO: figure out a way to wait for notification of termination in all receivers return Task.CompletedTask; @@ -98,6 +100,7 @@ public override async Task PublishAsync(EventContext @ev // get the queue client and send the message var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken); var message = Encoding.UTF8.GetString(ms.ToArray()); + logger.LogInformation("Sending {EventId} to '{QueueName}'", @event.EventId, queueClient.Name); var response = await queueClient.SendMessageAsync(messageText: message, visibilityTimeout: visibilityTimeout, timeToLive: ttl, @@ -134,6 +137,7 @@ public override async Task> PublishAsync(IList(string id, CancellationToken canc var reg = BusOptions.GetOrCreateEventRegistration(); var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken); var parts = id.Split(SequenceNumberSeparator); - await queueClient.DeleteMessageAsync(messageId: parts[0], - popReceipt: parts[1], + string messageId = parts[0], popReceipt = parts[1]; + logger.LogInformation("Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'", messageId, popReceipt, queueClient.Name); + await queueClient.DeleteMessageAsync(messageId: messageId, + popReceipt: popReceipt, cancellationToken: cancellationToken); } @@ -183,6 +189,7 @@ public override async Task CancelAsync(IList ids, CancellationTo var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken); foreach (var (messageId, popReceipt) in splits) { + logger.LogInformation("Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'", messageId, popReceipt, queueClient.Name); await queueClient.DeleteMessageAsync(messageId: messageId, popReceipt: popReceipt, cancellationToken: cancellationToken); @@ -211,7 +218,8 @@ private async Task GetQueueClientAsync(EventRegistration reg, bool MessageEncoding = QueueMessageEncoding.Base64, }); - // ensure queue is created + // ensure queue is created if it does not exist + logger.LogInformation("Ensuring queue '{QueueName}' exists", name); await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken); queueClientsCache[(reg.EventType, deadletter)] = queueClient; @@ -242,10 +250,13 @@ private async Task ReceiveAsync(ConsumerRegistration reg) // if the response is empty, introduce a delay if (messages.Length == 0) { - await Task.Delay(TransportOptions.EmptyResultsDelay, cancellationToken); + var delay = TransportOptions.EmptyResultsDelay; + logger.LogTrace("No messages on '{QueueName}', delaying check for {Delay}", queueClient.Name, delay); + await Task.Delay(delay, cancellationToken); } else { + logger.LogDebug("Received {MessageCount} messages on '{QueueName}'", messages.Length, queueClient.Name); foreach (var message in messages) { await (Task)method.Invoke(this, new object[] { reg, queueClient, message, cancellationToken, }); @@ -268,12 +279,17 @@ private async Task OnMessageReceivedAsync(ConsumerRegistratio try { + logger.LogDebug("Processing '{MessageId}|{PopReceipt}'", message.MessageId, message.PopReceipt); using var ms = new MemoryStream(Encoding.UTF8.GetBytes(message.MessageText)); var contentType = new ContentType("*/*"); var context = await DeserializeAsync(body: ms, contentType: contentType, registration: reg, cancellationToken: cancellationToken); + logger.LogInformation("Received message: '{MessageId}|{PopReceipt}' containing Event '{EventId}'", + message.MessageId, + message.PopReceipt, + context.EventId); await PushToConsumerAsync(context, cancellationToken); } catch (Exception ex) @@ -286,6 +302,10 @@ private async Task OnMessageReceivedAsync(ConsumerRegistratio } // always delete the message from the current queue + logger.LogTrace("Deleting '{MessageId}|{PopReceipt}' on '{QueueName}'", + message.MessageId, + message.PopReceipt, + queueClient.Name); await queueClient.DeleteMessageAsync(messageId: message.MessageId, popReceipt: message.PopReceipt, cancellationToken: cancellationToken); diff --git a/src/Tingle.EventBus/Logging/ILoggerExtensions.cs b/src/Tingle.EventBus/Logging/ILoggerExtensions.cs new file mode 100644 index 00000000..6e4c3fdc --- /dev/null +++ b/src/Tingle.EventBus/Logging/ILoggerExtensions.cs @@ -0,0 +1,26 @@ +using System; + +namespace Microsoft.Extensions.Logging +{ + /// + /// Extensions on for the EventBus + /// + internal static class ILoggerExtensions + { + private static readonly Action _startingBus + = LoggerMessage.Define( + eventId: new EventId(1, nameof(StartingBus)), + logLevel: LogLevel.Debug, + formatString: "Starting bus receivers. Consumers: '{ConsumersCount}'"); + + private static readonly Action _stoppingBus + = LoggerMessage.Define( + eventId: new EventId(2, nameof(StoppingBus)), + logLevel: LogLevel.Debug, + formatString: "Stopping bus receivers."); + + public static void StartingBus(this ILogger logger, int consumersCount) => _startingBus(logger, consumersCount, null); + + public static void StoppingBus(this ILogger logger) => _stoppingBus(logger, null); + } +}