Skip to content

Commit

Permalink
[Host.RabbitMQ] Automatic or manual message acks #21
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Mar 3, 2024
1 parent 85791c0 commit a1282a8
Show file tree
Hide file tree
Showing 36 changed files with 454 additions and 120 deletions.
56 changes: 47 additions & 9 deletions docs/provider_rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration](#configuration)
- [Producers](#producers)
- [Consumers](#consumers)
- [Consumer Error Handling](#consumer-error-handling)
- [Dead Letter Exchange](#dead-letter-exchange)
- [Custom Consumer Error Handler](#custom-consumer-error-handler)
- [Consumer Concurrency Level](#consumer-concurrency-level)
- [Acknowledgment Mode](#acknowledgment-mode)
- [Consumer Error Handling](#consumer-error-handling)
- [Dead Letter Exchange](#dead-letter-exchange)
- [Custom Consumer Error Handler](#custom-consumer-error-handler)
- [Consumer Concurrency Level](#consumer-concurrency-level)
- [Request-Response](#request-response)
- [Topology Provisioning](#topology-provisioning)
- [Not Supported](#not-supported)
Expand Down Expand Up @@ -113,7 +114,7 @@ services.AddSlimMessageBus((mbb) =>

### Consumers

Consumers need to specify the queue name from which the consumer should be reading from. SMB will provison the specified queue.
Consumers need to specify the queue name from which the consumer should be reading from. SMB will provision the specified queue.
Additionally,

- when the exchange name binding is specified then SMB will provision that binding with the broker,
Expand Down Expand Up @@ -143,7 +144,44 @@ We can specify defaults for all consumers on the bus level:
});
```

### Consumer Error Handling
#### Acknowledgment Mode

When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB:

- Ack - to indicate the message was successfully processed and it should be removed from the queue.
- Nack (negative ack) - when the message was processed but resulted in an error, while still it needs to be removed from the queue or retried (depending what the user chooses in the given use case).

In SMB we can set the acknowledgment mode for each consumer:

```cs
builder.Consume<PingMessage>(x => x
.Queue("subscriber", autoDelete: false)
.ExchangeBinding(topic)
.AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade) // sets the acknowledgement
.WithConsumer<PingConsumer>());
```

Alternatively, a bus wide default can be specified for all consumers:

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
cfg.AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade) // sets the acknowledgement
});
});
```

See the [RabbitMqMessageAcknowledgementMode](../src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageAcknowledgementMode.cs) has the available options.

By default (`ConfirmAfterMessageProcessingWhenNoManualConfirmMade`), messages are acknowledge (Ack) after the message processing finish with success.
If an exception where to happen the message is rejected (Nack) (or else whatever the [custom error handler](#consumer-error-handling) logic does).
In that default mode, the user can still Ack or Nack the message depending on the need inside of the consumer or interceptor using the Ack() / Nack() methods exposed on the [ConsumerContext](intro.md#consumer-context-additional-message-information) - this allows for manual acknowledgements. The default setting is optimal and safe. However, message retries could happen (at-least-once delivery).

The other acknowledgement modes will ack the message before processing, but are less safe as it will lead to at-most-once delivery.

#### Consumer Error Handling

By default the the transport implementation performs a negative ack (nack) in the AMQP protocol for any message that failed in the consumer. As a result the message will be marked as failed and routed to an dead letter exchange or discarded by the RabbitMQ broker.

Expand All @@ -152,7 +190,7 @@ The recommendation here is to either:
- configure a [dead letter exchange](#dead-letter-exchange) configured on the consumer queue,
- or provide a [custom error handler](#custom-consumer-error-handler) (retry the message couple of times, if failed send to a dead letter exchange).

#### Dead Letter Exchange
##### Dead Letter Exchange

The [Dead Letter Exchanges](https://www.rabbitmq.com/dlx.html) is a feature of RabbitMQ that will forward failed messages from a particular queue to a special exchange.

Expand Down Expand Up @@ -192,7 +230,7 @@ services.AddSlimMessageBus((mbb) =>
});
```

#### Custom Consumer Error Handler
##### Custom Consumer Error Handler

Define a custom consumer error handler implementation of `RabbitMqConsumerErrorHandler<>`:

Expand Down Expand Up @@ -234,7 +272,7 @@ services.AddTransient(typeof(RabbitMqConsumerErrorHandler<>), typeof(CustomRabbi

> When error handler is not found in the DI or it returns `false` then default error handling will be applied.
### Consumer Concurrency Level
#### Consumer Concurrency Level

By default each consumer in the service process will handle one message at the same time.
In order to increase the desired concurrency, set the [`ConsumerDispatchConcurrency`](https://www.rabbitmq.com/dotnet-api-guide.html#consumer-callbacks-and-ordering) to a value greater than 1.
Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.2.3</Version>
<Version>2.3.0-rc1</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task ProcessEventAsync(ProcessEventArgs args)
_lastMessage = args;

var headers = GetHeadersFromTransportMessage(args.Data);
var (lastException, _, _, _) = await MessageProcessor.ProcessMessage(args.Data, headers, args.CancellationToken).ConfigureAwait(false);
var (lastException, _, _, _) = await MessageProcessor.ProcessMessage(args.Data, headers, cancellationToken: args.CancellationToken).ConfigureAwait(false);
if (lastException != null)
{
// Note: The OnMessageFaulted was called at this point by the MessageProcessor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage messa
return;
}

var (exception, _, _, _) = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, token).ConfigureAwait(false);
var (exception, _, _, _) = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, cancellationToken: token).ConfigureAwait(false);
if (exception != null)
{
Logger.LogError(exception, "Abandon message (exception occured while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public async Task OnMessage(ConsumeResult message)
_lastOffset = message.TopicPartitionOffset;

var messageHeaders = message.ToHeaders(_headerSerializer);
var (lastException, consumerSettings, response, _) = await _messageProcessor.ProcessMessage(message, messageHeaders, _cancellationTokenSource.Token).ConfigureAwait(false);
var (lastException, consumerSettings, response, _) = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false);
if (lastException != null)
{
// ToDo: Retry logic
Expand Down
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object me
? requestHeaders as IReadOnlyDictionary<string, object> ?? new Dictionary<string, object>(requestHeaders)
: null;

var (exception, _, response, _) = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, cancellationToken, currentServiceProvider);
var (exception, _, response, _) = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, currentServiceProvider: currentServiceProvider, cancellationToken: cancellationToken);
if (exception != null)
{

// We want to pass the same exception to the sender as it happened in the handler/consumer
ExceptionDispatchInfo.Capture(exception).Throw();
}
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
headers[prop.Name] = prop.Value;
}
}
return consumer.MessageProcessor.ProcessMessage(arg.ApplicationMessage, headers, CancellationToken);
return consumer.MessageProcessor.ProcessMessage(arg.ApplicationMessage, headers, cancellationToken: CancellationToken);
}
return Task.CompletedTask;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace SlimMessageBus.Host.RabbitMQ;

[Flags]
internal enum RabbitMqMessageConfirmOption
public enum RabbitMqMessageConfirmOption
{
Ack = 1,
Nack = 2,
Expand Down
5 changes: 5 additions & 0 deletions src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@
/// <returns></returns>
public delegate void RabbitMqMessagePropertiesModifier<T>(T message, IBasicProperties messageProperties);

/// <summary>
/// Represents an action to confirm the message.
/// </summary>
/// <param name="option"></param>
public delegate void RabbitMqMessageConfirmAction(RabbitMqMessageConfirmOption option);
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,19 @@ public static TConsumerBuilder DeadLetterExchange<TConsumerBuilder>(this TConsum

return builder;
}

/// <summary>
/// Sets the message Acknowledgement Mode for the consumer.
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="mode"></param>
/// <returns></returns>
public static TConsumerBuilder AcknowledgementMode<TConsumerBuilder>(this TConsumerBuilder builder, RabbitMqMessageAcknowledgementMode mode)
where TConsumerBuilder : AbstractConsumerBuilder
{
builder.ConsumerSettings.Properties[RabbitMqProperties.MessageAcknowledgementMode] = mode;
return builder;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,59 @@

public static class RabbitMqConsumerContextExtensions
{
private static readonly string Key = "RabbitMq_MessageConfirmAction";

public static BasicDeliverEventArgs GetTransportMessage(this IConsumerContext context)
{
#if NETSTANDARD2_0
if (context is null) throw new ArgumentNullException(nameof(context));

#else
ArgumentNullException.ThrowIfNull(context);
#endif

return context.GetPropertyOrDefault<BasicDeliverEventArgs>(RabbitMqProperties.Message);
}

static internal void SetTransportMessage(this ConsumerContext context, BasicDeliverEventArgs message)
static internal void SetTransportMessage(this IConsumerContext context, BasicDeliverEventArgs message)
{
#if NETSTANDARD2_0
if (context is null) throw new ArgumentNullException(nameof(context));
#else
ArgumentNullException.ThrowIfNull(context);
#endif

context.Properties[RabbitMqProperties.Message] = message;
}

static internal void SetConfirmAction(this IConsumerContext consumerContext, RabbitMqMessageConfirmAction messageConfirmAction)
=> consumerContext.Properties[Key] = messageConfirmAction;

static internal void ConfirmAction(this IConsumerContext consumerContext, RabbitMqMessageConfirmOption option)
{
var messageConfirmAction = consumerContext.GetPropertyOrDefault<RabbitMqMessageConfirmAction>(Key)
?? throw new ConsumerMessageBusException("Cannnot perform RabbitMq message confirmation at this point");

messageConfirmAction(option);
}

/// <summary>
/// Sends an Ack confirm for the processed message.
/// </summary>
/// <param name="consumerContext"></param>
public static void Ack(this IConsumerContext consumerContext)
=> ConfirmAction(consumerContext, RabbitMqMessageConfirmOption.Ack);

/// <summary>
/// Sends an Nack (negative ack) for the processed message with the setting to NOT redeliver it.
/// </summary>
/// <param name="consumerContext"></param>
public static void Nack(this IConsumerContext consumerContext)
=> ConfirmAction(consumerContext, RabbitMqMessageConfirmOption.Nack);

/// <summary>
/// Sends an Nack (negative ack) for the processed message with the setting to requeue it.
/// </summary>
/// <param name="consumerContext"></param>
public static void NackWithRequeue(this IConsumerContext consumerContext)
=> ConfirmAction(consumerContext, RabbitMqMessageConfirmOption.Nack | RabbitMqMessageConfirmOption.Requeue);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace SlimMessageBus.Host.RabbitMQ;

/// <summary>
/// Specifies how messages are confirmed with RabbitMq
/// </summary>
public enum RabbitMqMessageAcknowledgementMode
{
/// <summary>
/// Each message will get an Ack after successful processing, or when error happens the message will get an Nack in the end.
/// However, if the user made any manual ConsumerContext.Ack() or ConsumerContext.Nack() during the consumption process (or in an interceptor), that will be used to confirm the message instead.
/// This results in at-least-once delivery guarantee and a safe processing.
/// </summary>
/// <remarks>That is the default option</remarks>
ConfirmAfterMessageProcessingWhenNoManualConfirmMade = 0,

/// <summary>
/// The message will already be considered as Ack upon recieve. See https://www.rabbitmq.com/docs/confirms#acknowledgement-modes for details.
/// This results in at-most-once delivery guarantee (messages could be lost if processing would not fully finish).
/// This is managed by the protocol and should give faster throughput than <see cref="RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing"/> while leading to same delivery guarantees.
/// </summary>
AckAutomaticByRabbit = 1,

/// <summary>
/// The message will be Ack-ed by SMB before the actuall message processing starts.
/// This results in at-most-once delivery guarantee (messages could be lost if processing would not fully finish).
/// </summary>
AckMessageBeforeProcessing = 2,
}
Loading

0 comments on commit a1282a8

Please sign in to comment.