Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fire and forget anonymous send #121

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/ArtemisNetCoreClient/AnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ public ValueTask DisposeAsync()
return session.RemoveProducerAsync(ProducerId);
}

public void SendMessage(string address, RoutingType? routingType, Message message)
{
message.Address = address;
if (routingType != null)
{
message.RoutingType = routingType;
}

session.SendMessage(message: message, producerId: ProducerId);
}

public async ValueTask SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default)
{
message.Address = address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,35 @@ namespace ActiveMQ.Artemis.Core.Client;
public static class AnonymousProducerExtensions
{
/// <summary>
/// Sends a message to the specified address.
/// Sends a message synchronously to the broker. This method is primarily used for non-durable
/// message delivery since it does not wait for a confirmation from the broker and operates
/// in a fire-and-forget manner.
/// </summary>
/// <remarks>
/// This method should typically be used when message delivery speed is prioritized over reliability.
/// The message will be sent with 'at most once' delivery guarantee, as there's no acknowledgment
/// from the broker that the message has been received or persisted.
/// </remarks>
/// <param name="anonymousProducer">The <see cref="IAnonymousProducer"/> instance that this method extends.</param>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="message">The message to send.</param>
public static void SendMessage(this IAnonymousProducer anonymousProducer, string address, Message message)
{
anonymousProducer.SendMessage(address, routingType: null, message);
}

/// <summary>
/// Sends a message asynchronously to the broker. This method supports both durable and non-durable message
/// delivery modes, as specified by the message's durable property. It awaits for a confirmation
/// from the broker, ensuring that the message is either stored (for durable messages) or acknowledged
/// (for non-durable messages) before completing.
/// </summary>
/// <remarks>
/// This method should be used when reliability is required, and it supports awaiting the acknowledgment
/// from the broker. The delivery semantics are 'at least once' for durable messages, where the broker
/// confirms the persistence of the message. For non-durable messages, the completion of the task
/// indicates that the broker has received the message.
/// </remarks>
/// <param name="anonymousProducer">The <see cref="IAnonymousProducer"/> instance that this method extends.</param>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="message">The message to send.</param>
Expand Down
28 changes: 27 additions & 1 deletion src/ArtemisNetCoreClient/IAnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,34 @@ namespace ActiveMQ.Artemis.Core.Client;
public interface IAnonymousProducer : IAsyncDisposable
{
/// <summary>
/// Sends a message to the specified address.
/// Sends a message synchronously to the broker. This method is primarily used for non-durable
/// message delivery since it does not wait for a confirmation from the broker and operates
/// in a fire-and-forget manner.
/// </summary>
/// <remarks>
/// This method should typically be used when message delivery speed is prioritized over reliability.
/// The message will be sent with 'at most once' delivery guarantee, as there's no acknowledgment
/// from the broker that the message has been received or persisted.
/// </remarks>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="routingType">
/// The routing type to use when sending the message. Ensures that this message is only routed to queues with matching routing type.
/// </param>
/// <param name="message">The message to send.</param>
void SendMessage(string address, RoutingType? routingType, Message message);

/// <summary>
/// Sends a message asynchronously to the broker. This method supports both durable and non-durable message
/// delivery modes, as specified by the message's durable property. It awaits for a confirmation
/// from the broker, ensuring that the message is either stored (for durable messages) or acknowledged
/// (for non-durable messages) before completing.
/// </summary>
/// <remarks>
/// This method should be used when reliability is required, and it supports awaiting the acknowledgment
/// from the broker. The delivery semantics are 'at least once' for durable messages, where the broker
/// confirms the persistence of the message. For non-durable messages, the completion of the task
/// indicates that the broker has received the message.
/// </remarks>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="routingType">
/// The routing type to use when sending the message. Ensures that this message is only routed to queues with matching routing type.
Expand Down
48 changes: 48 additions & 0 deletions test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,52 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
});
});
}

[Fact]
public async Task Should_send_message_to_specific_address_in_a_fire_and_forget_manner()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper));

await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var (addressName, queueName) = await scenario.Step("Create address and queue", async () =>
{
var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast);
return (addressName, queueName);
});

await scenario.Step("Send a message in a fire-and-forget manner", async () =>
{
await using var producer = await session.CreateAnonymousProducerAsync(testFixture.CancellationToken);

// ReSharper disable once MethodHasAsyncOverload
producer.SendMessage(address: addressName, routingType: RoutingType.Anycast, message: new Message
{
Body = "fire_and_forget_msg"u8.ToArray(),
});
});

await scenario.Step("Confirm message count (one message should be available on the queue)", async () =>
{
await RetryUtil.RetryUntil(
func: () => session.GetQueueInfoAsync(queueName, testFixture.CancellationToken),
until: info => info?.MessageCount == 1,
cancellationToken: testFixture.CancellationToken
);
});

await scenario.Step("Verify message payload", async () =>
{
await using var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
}, testFixture.CancellationToken);
var message = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);
Assert.NotNull(message);
Assert.Equal("fire_and_forget_msg"u8.ToArray(), message.Body.ToArray());
});
}
}
Loading