Skip to content

Commit

Permalink
[Host.RabbitMQ] Automatically ack messages #21
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Mar 3, 2024
1 parent b188799 commit 708278a
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ public static class RabbitMqMessageBusSettingsExtensions
/// <param name="action">Action to be executed, the first param is the RabbitMQ <see cref="IModel"/> from the underlying client, and second parameter represents the SMB exchange, queue and binding setup</param>
public static RabbitMqMessageBusSettings UseTopologyInitializer(this RabbitMqMessageBusSettings settings, RabbitMqTopologyInitializer action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
if (settings is null) throw new ArgumentNullException(nameof(settings));
if (action is null) throw new ArgumentNullException(nameof(action));

settings.Properties[RabbitMqProperties.TopologyInitializer] = action;
return settings;
Expand Down Expand Up @@ -74,6 +75,8 @@ public static RabbitMqMessageBusSettings UseDeadLetterExchangeDefaults(this Rabb
/// <returns></returns>
public static RabbitMqMessageBusSettings UseQueueDefaults(this RabbitMqMessageBusSettings settings, bool? durable = null, bool? autoDelete = null)
{
if (settings is null) throw new ArgumentNullException(nameof(settings));

if (durable != null)
{
settings.Properties[RabbitMqProperties.QueueDurable] = durable.Value;
Expand All @@ -93,7 +96,8 @@ public static RabbitMqMessageBusSettings UseQueueDefaults(this RabbitMqMessageBu
/// <returns></returns>
public static RabbitMqMessageBusSettings UseMessagePropertiesModifier(this RabbitMqMessageBusSettings settings, RabbitMqMessagePropertiesModifier<object> messagePropertiesModifier)
{
if (messagePropertiesModifier == null) throw new ArgumentNullException(nameof(messagePropertiesModifier));
if (settings is null) throw new ArgumentNullException(nameof(settings));
if (messagePropertiesModifier is null) throw new ArgumentNullException(nameof(messagePropertiesModifier));

settings.Properties[RabbitMqProperties.MessagePropertiesModifier] = messagePropertiesModifier;
return settings;
Expand All @@ -107,7 +111,8 @@ public static RabbitMqMessageBusSettings UseMessagePropertiesModifier(this Rabbi
/// <returns></returns>
public static RabbitMqMessageBusSettings UseRoutingKeyProvider(this RabbitMqMessageBusSettings settings, RabbitMqMessageRoutingKeyProvider<object> routingKeyProvider)
{
if (routingKeyProvider == null) throw new ArgumentNullException(nameof(routingKeyProvider));
if (settings is null) throw new ArgumentNullException(nameof(settings));
if (routingKeyProvider is null) throw new ArgumentNullException(nameof(routingKeyProvider));

settings.Properties[RabbitMqProperties.MessageRoutingKeyProvider] = routingKeyProvider;
return settings;
Expand All @@ -119,8 +124,10 @@ public static RabbitMqMessageBusSettings UseRoutingKeyProvider(this RabbitMqMess
/// <param name="settings"></param>
/// <param name="mode"></param>
/// <returns></returns>
public static RabbitMqMessageBusSettings UseAcknowledgementMode(this RabbitMqMessageBusSettings settings, RabbitMqMessageAcknowledgementMode mode)
public static RabbitMqMessageBusSettings AcknowledgementMode(this RabbitMqMessageBusSettings settings, RabbitMqMessageAcknowledgementMode mode)
{
if (settings is null) throw new ArgumentNullException(nameof(settings));

settings.Properties[RabbitMqProperties.MessageAcknowledgementMode] = mode;
return settings;
}
Expand Down
1 change: 1 addition & 0 deletions src/Tests/Host.Test.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.10.0" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="AutoFixture" Version="4.18.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace SlimMessageBus.Host.RabbitMQ.Test.Config;

public class RabbitMqConsumerBuilderExtensionsTests
{
private readonly MessageBusSettings _settings = new();
private readonly ConsumerBuilder<object> _consumerBuilder;
private readonly Fixture _fixture = new();

public RabbitMqConsumerBuilderExtensionsTests()
{
_consumerBuilder = new ConsumerBuilder<object>(_settings);
}

[Fact]
internal void When_UseAcknowledgementMode_Then_ValueStoredProperly()
{
// arrange
var mode = _fixture.Create<RabbitMqMessageAcknowledgementMode>();

// act
_consumerBuilder.AcknowledgementMode(mode);

// assert
var modeReturned = _consumerBuilder.Settings.GetOrDefault<RabbitMqMessageAcknowledgementMode>(RabbitMqProperties.MessageAcknowledgementMode);
modeReturned.Should().Be(mode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace SlimMessageBus.Host.RabbitMQ.Test.Config;

public class RabbitMqMessageBusSettingsExtensionsTests
{
private readonly RabbitMqMessageBusSettings _settings = new();
private readonly Fixture _fixture = new();

[Fact]
internal void When_UseAcknowledgementMode_Then_ValueStoredProperly()
{
// arrange
var mode = _fixture.Create<RabbitMqMessageAcknowledgementMode>();

// act
_settings.AcknowledgementMode(mode);

// assert
var modeReturned = _settings.GetOrDefault<RabbitMqMessageAcknowledgementMode>(RabbitMqProperties.MessageAcknowledgementMode);
modeReturned.Should().Be(mode);
}

[Fact]
internal void When_UseRoutingKeyProvider_Then_ValueStoredProperly()
{
// arrange
var routingKeyProviderMock = new Mock<RabbitMqMessageRoutingKeyProvider<object>>();

// act
_settings.UseRoutingKeyProvider(routingKeyProviderMock.Object);

// assert
var routingKeyProviderReturned = _settings.GetOrDefault<RabbitMqMessageRoutingKeyProvider<object>>(RabbitMqProperties.MessageRoutingKeyProvider);
routingKeyProviderReturned.Should().BeSameAs(routingKeyProviderMock.Object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ protected override void SetupServices(ServiceCollection services, IConfiguration

public IMessageBus MessageBus => ServiceProvider.GetRequiredService<IMessageBus>();

[Fact]
public async Task PubSubOnFanoutExchange()
[Theory]
[InlineData(RabbitMqMessageAcknowledgementMode.AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade)]
[InlineData(RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)]
[InlineData(RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing)]
public async Task PubSubOnFanoutExchange(RabbitMqMessageAcknowledgementMode acknowledgementMode)
{
var subscribers = 2;
var topic = "test-ping";
Expand Down Expand Up @@ -99,6 +102,7 @@ public async Task PubSubOnFanoutExchange()
.Queue($"subscriber-{i}", autoDelete: false)
.ExchangeBinding(topic)
.DeadLetterExchange("subscriber-dlq")
.AcknowledgementMode(acknowledgementMode)
.WithConsumer<PingConsumer>()
.WithConsumer<PingDerivedConsumer, PingDerivedMessage>());
}));
Expand Down Expand Up @@ -173,8 +177,11 @@ private async Task BasicPubSub(int expectedMessageCopies, Action<TestData> addit
additionalAssertion?.Invoke(new TestData { ProducedMessages = producedMessages, ConsumedMessages = consumedMessages.Snapshot() });
}

[Fact]
public async Task BasicReqRespOnTopic()
[Theory]
[InlineData(RabbitMqMessageAcknowledgementMode.AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade)]
[InlineData(RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)]
[InlineData(RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing)]
public async Task BasicReqRespOnTopic(RabbitMqMessageAcknowledgementMode acknowledgementMode)
{
var topic = "test-echo";

Expand All @@ -199,6 +206,7 @@ public async Task BasicReqRespOnTopic()
.ExchangeBinding("test-echo")
// If the request handling fails, the failed messages will be routed to the DLQ exchange
.DeadLetterExchange("echo-request-handler-dlq")
.AcknowledgementMode(acknowledgementMode)
.WithHandler<EchoRequestHandler>())
.ExpectRequestResponses(x =>
{
Expand Down
2 changes: 2 additions & 0 deletions src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Usings.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
global using AutoFixture;

global using FluentAssertions;

global using Microsoft.Extensions.Configuration;
Expand Down

0 comments on commit 708278a

Please sign in to comment.