Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.AzureServiceBus] Modify user properties on failure / Supply reason and description on dead-letter #365

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -1263,5 +1263,5 @@ This allows to recreate missing elements in the infrastructure without restartin

## Versions

- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/tree/release/v3).
- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/3.0.0).
- The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0).
53 changes: 48 additions & 5 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Default Subscription Name](#default-subscription-name)
- [Consumer context](#consumer-context)
- [Exception Handling for Consumers](#exception-handling-for-consumers)
- [DeadLetter: Application-Level Dead-Lettering](#deadletter-application-level-dead-lettering)
- [Failure: Modify Application Properties on Failure](#failure-modify-application-properties-on-failure)
- [Transport Specific Settings](#transport-specific-settings)
- [Request-Response Configuration](#request-response-configuration)
- [Produce Request Messages](#produce-request-messages)
Expand All @@ -19,6 +21,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Validation of Topology](#validation-of-topology)
- [Trigger Topology Provisioning](#trigger-topology-provisioning)


## Configuration

Azure Service Bus provider requires a connection string:
Expand Down Expand Up @@ -193,12 +196,52 @@ This could be useful to extract the message's `CorrelationId` or `ApplicationPro

### Exception Handling for Consumers

In case the consumer was to throw an exception while processing a message, SMB marks the message as abandoned.
This results in a message delivery retry performed by Azure SB (potentially event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt the message Azure SB moves the message to a dead letter queue (DLQ). More information [here](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues).
In the case where the consumer throws an exception while processing a message, SMB marks the message as abandoned.
This results in a message delivery retry performed by Azure SB (potentially as an event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt, Azure SB will move the message to the [dead letter queue (DLQ)](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues).

SMB will also add a user property, `SMB.Exception`, on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

For finer control, a custom error handler can be added by registering an instance of `IConsumerErrorHandler<T>` with the DI. The offending message and the raised exception can then be inspected to determine if the message should be retried (in proceess), failed, or considered as having executed successfully.

In addition to the standard `IConsumerErrorHandler<T>` return types, `ServiceBusConsumerErrorHandler<T>` provides additional, specialized responses for use with the Azure Service Bus transport.

#### DeadLetter: Application-Level Dead-Lettering

[Application-level dead-lettering](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#application-level-dead-lettering) is supported via `DeadLetter(string reason, string description)`. If neither a `reason` nor `description` are supplied, the raised exception type and message will be used as the `reason` and `description`.

If you need to send only selected messages to DLQ, wrap the body of your consumer method in a `try-catch` block and rethrow the exception for only the messages you want to be moved to DLQ (after the retry limit is reached).
```cs
public sealed class SampleConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
{
public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
return Task.FromResult(DeadLetter("reason", "description"));
}
}
```

#### Failure: Modify Application Properties on Failure

An overload to `Failure(IReadOnlyDictionary<string, object)` is included to facilitate the modification of application properites on a failed message. This includes the `SMB.Exception` property should alternative detail be required.
zarusz marked this conversation as resolved.
Show resolved Hide resolved
zarusz marked this conversation as resolved.
Show resolved Hide resolved

```cs
public sealed class SampleConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
{
public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
var properties = new Dictionary<string, object>
{
{ "Key", "value" },
{ "Attempts", attempts },
{ "SMB.Exception", exception.ToString().Substring(0, 1000) }
};

return Task.FromResult(Failure(properties));
}
}
```

> By using `IConsumerContext.Properties` (`IConsumerWithContext`) to pass state to the `IConsumerErrorHandler<T>` instance, consumer state can be persisted with the message. This can then be retrieved from `IConsumerContext.Headers` in a subsequent execution to resume processing from a checkpoint, supporting idempotency, especially when distributed transactions are not possible.

SMB will also set a user property `SMB.Exception` on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

### Transport Specific Settings

Expand Down Expand Up @@ -480,4 +523,4 @@ However, in situations when the underlying ASB topology changes (queue / topic i
ITopologyControl ctrl = // injected

await ctrl.ProvisionTopology();
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
protected IMessageProcessor<ServiceBusReceivedMessage> MessageProcessor { get; }
protected TopicSubscriptionParams TopicSubscription { get; }

protected AsbBaseConsumer(ServiceBusMessageBus messageBus,

Check warning on line 14 in src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs

View workflow job for this annotation

GitHub Actions / build

Refactor this constructor to reduce its Cognitive Complexity from 18 to the 15 allowed. (https://rules.sonarsource.com/csharp/RSPEC-3776)
ServiceBusClient serviceBusClient,
TopicSubscriptionParams subscriptionFactoryParams,
IMessageProcessor<ServiceBusReceivedMessage> messageProcessor,
Expand Down Expand Up @@ -165,6 +165,8 @@
Func<ServiceBusReceivedMessage, string, string, CancellationToken, Task> deadLetterMessage,
CancellationToken token)
{
const string smbException = "SMB.Exception";

// Process the message.
Logger.LogDebug("Received message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);

Expand All @@ -188,16 +190,33 @@
await completeMessage(message, token).ConfigureAwait(false);
return;

case ServiceBusProcessResult.DeadLetterState:
case ServiceBusProcessResult.DeadLetterState deadLetterState:
Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false);

var reason = deadLetterState.Reason ?? r.Exception?.GetType().Name ?? string.Empty;
var descripiton = deadLetterState.Description ?? r.Exception?.GetType().Name ?? string.Empty;
await deadLetterMessage(message, reason, descripiton, token).ConfigureAwait(false);
return;

case ServiceBusProcessResult.FailureStateWithProperties withProperties:
var dict = new Dictionary<string, object>(withProperties.Properties.Count + 1);
foreach (var properties in withProperties.Properties)
{
dict.Add(properties.Key, properties.Value);
}

// Set the exception message if it has not been provided
dict.TryAdd(smbException, r.Exception.Message);

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await abandonMessage(message, dict, token).ConfigureAwait(false);
return;

case ProcessResult.FailureState:
var messageProperties = new Dictionary<string, object>();
{
// Set the exception message
messageProperties.Add("SMB.Exception", r.Exception.Message);
messageProperties.Add(smbException, r.Exception.Message);
}

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@ public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>, IServiceBusConsumerErrorHandler<T>
{
public ProcessResult DeadLetter() => ServiceBusProcessResult.DeadLetter;
public ProcessResult DeadLetter(string reason = null, string description = null)
{
return reason == null && description == null
? ServiceBusProcessResult.DeadLetter
: new ServiceBusProcessResult.DeadLetterState(reason, description);
}

public ProcessResult Failure(IReadOnlyDictionary<string, object> properties)
{
return properties != null && properties.Count > 0
? new ServiceBusProcessResult.FailureStateWithProperties(properties)
: Failure();
}
}

public record ServiceBusProcessResult : ProcessResult
Expand All @@ -14,5 +26,25 @@ public record ServiceBusProcessResult : ProcessResult
/// </summary>
public static readonly ProcessResult DeadLetter = new DeadLetterState();

public record DeadLetterState() : ProcessResult();
public record DeadLetterState : ProcessResult
{
public DeadLetterState(string reason = null, string description = null)
{
Reason = reason;
Description = description;
}

public string Reason { get; }
public string Description { get; }
}

public record FailureStateWithProperties : FailureState
{
public FailureStateWithProperties(IReadOnlyDictionary<string, object> properties)
{
Properties = properties;
}

public IReadOnlyDictionary<string, object> Properties { get; }
}
}
2 changes: 1 addition & 1 deletion src/SlimMessageBus/IConsumerWithContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace SlimMessageBus;

/// <summary>
/// An extension point for <see cref="IConsumer{TMessage}"/> to recieve provider specific (for current message subject to processing).
/// An extension point for <see cref="IConsumer{TMessage}"/> to receive provider specific (for current message subject to processing).
/// </summary>
public interface IConsumerWithContext
{
Expand Down
Loading
Loading