Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Idempotent Producer Service Integration #15332

Merged
merged 1 commit into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
Expand Down Expand Up @@ -294,12 +295,16 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition to which the link should be bound; if unbound, <c>null</c>.</param>
/// <param name="features">The set of features which are active for the producer requesting the link.</param>
/// <param name="options">The set of options to consider when creating the link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A link for use with producer operations.</returns>
///
public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitionId,
TransportProducerFeatures features,
PartitionPublishingOptions options,
TimeSpan timeout,
CancellationToken cancellationToken)
{
Expand All @@ -312,7 +317,7 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var link = await CreateSendingLinkAsync(connection, producerEndpoint, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
var link = await CreateSendingLinkAsync(connection, producerEndpoint, features, options, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
Expand Down Expand Up @@ -523,7 +528,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon

if (ownerLevel.HasValue)
{
linkSettings.AddProperty(AmqpProperty.OwnerLevel, ownerLevel.Value);
linkSettings.AddProperty(AmqpProperty.ConsumerOwnerLevel, ownerLevel.Value);
}

if (trackLastEnqueuedEventProperties)
Expand Down Expand Up @@ -578,13 +583,17 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
///
/// <param name="connection">The active and opened AMQP connection to use for this link.</param>
/// <param name="endpoint">The fully qualified endpoint to open the link for.</param>
/// <param name="features">The set of features which are active for the producer requesting the link.</param>
/// <param name="options">The set of options to consider when creating the link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A link for use for operations related to receiving events.</returns>
///
protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnection connection,
Uri endpoint,
TransportProducerFeatures features,
PartitionPublishingOptions options,
TimeSpan timeout,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -623,6 +632,22 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnect
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
linkSettings.AddProperty(AmqpProperty.EntityType, (int)AmqpProperty.Entity.EventHub);

if ((features & TransportProducerFeatures.IdempotentPublishing) != 0)
{
linkSettings.DesiredCapabilities ??= new Multiple<AmqpSymbol>();
linkSettings.DesiredCapabilities.Add(AmqpProperty.EnableIdempotentPublishing);
}

// These values must either all be specified or none. It is valid for an individual value to be null to signal the
// service to supply the value, so long as the key is present.

if ((options.ProducerGroupId.HasValue) || (options.OwnerLevel.HasValue) || (options.StartingSequenceNumber.HasValue))
{
linkSettings.AddProperty(AmqpProperty.ProducerGroupId, options.ProducerGroupId);
linkSettings.AddProperty(AmqpProperty.ProducerOwnerLevel, options.OwnerLevel);
linkSettings.AddProperty(AmqpProperty.PublishedSequenceNumber, options.StartingSequenceNumber);
}

var link = new SendingAmqpLink(linkSettings);
linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
link.AttachTo(session);
Expand Down
26 changes: 26 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ internal static class AmqpError
///
public static AmqpSymbol ArgumentOutOfRangeError { get; } = AmqpConstants.Vendor + ":argument-out-of-range";

/// <summary>
/// Indicates that a sequence number was out of order.
/// </summary>
///
public static AmqpSymbol SequenceOutOfOrderError { get; } = AmqpConstants.Vendor + ":out-of-order-sequence";

/// <summary>
/// Indicates that a partition was stolen by another producer with exclusive access.
/// </summary>
///
public static AmqpSymbol ProducerStolenError { get; } = AmqpConstants.Vendor + ":producer-epoch-stolen";

/// <summary>
/// The expression to test for when the service returns a "Not Found" response to determine the context.
/// </summary>
Expand Down Expand Up @@ -179,6 +191,20 @@ private static Exception CreateException(string condition,
return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.ConsumerDisconnected);
}

// The producer was superseded by one with a higher owner level.

if (string.Equals(condition, ProducerStolenError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.ProducerDisconnected);
}

// The client-supplied sequence number was not in the expected order.

if (string.Equals(condition, SequenceOutOfOrderError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.InvalidClientState);
}

// Authorization was denied.

if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value, StringComparison.InvariantCultureIgnoreCase))
Expand Down
31 changes: 14 additions & 17 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,10 @@ internal class AmqpEventBatch : TransportEventBatch
public override long SizeInBytes => _sizeBytes;

/// <summary>
/// A flag that indicates whether space should be reserved for a publishing
/// sequence number when the event size is measured. If <c>false</c>, a sequence
/// number is not used in size calculations.
/// The flags specifying the set of special transport features that have been opted-into.
/// </summary>
///
/// <remarks>
/// The sequence number is only populated and relevant when certain features
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public override bool ReserveSpaceForSequenceNumber { get; }
public override TransportProducerFeatures ActiveFeatures { get; }

/// <summary>
/// The count of events contained in the batch.
Expand Down Expand Up @@ -94,11 +87,11 @@ internal class AmqpEventBatch : TransportEventBatch
///
/// <param name="messageConverter">The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.</param>
/// <param name="options">The set of options to apply to the batch.</param>
/// <param name="reserveSpaceForSequenceNumber">A flag that indicates whether space should be reserved for a publishing sequence number when the event size is measured. If <c>false</c>, a sequence number is not used in size calculations.</param>
/// <param name="activeFeatures">The flags specifying the set of special transport features have been opted-into.</param>
///
public AmqpEventBatch(AmqpMessageConverter messageConverter,
CreateBatchOptions options,
bool reserveSpaceForSequenceNumber)
TransportProducerFeatures activeFeatures)
{
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
Argument.AssertNotNull(options, nameof(options));
Expand All @@ -107,7 +100,7 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter,
MessageConverter = messageConverter;
Options = options;
MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
ReserveSpaceForSequenceNumber = reserveSpaceForSequenceNumber;
ActiveFeatures = activeFeatures;

// Initialize the size by reserving space for the batch envelope.

Expand All @@ -130,15 +123,20 @@ public override bool TryAdd(EventData eventData)
Argument.AssertNotNull(eventData, nameof(eventData));
Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch));

if (ReserveSpaceForSequenceNumber)
// Reserve space for producer-owned fields that correspond to special
// features, if enabled.

if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0)
{
eventData.PendingPublishSequenceNumber = int.MaxValue;
eventData.PendingProducerGroupId = long.MaxValue;
eventData.PendingProducerOwnerLevel = short.MaxValue;
}

var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);

try
{
using var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);

// Calculate the size for the event, based on the AMQP message size and accounting for a
// bit of reserved overhead size.

Expand All @@ -160,8 +158,7 @@ public override bool TryAdd(EventData eventData)
}
finally
{
eventData.PendingPublishSequenceNumber = default;
eventMessage?.Dispose();
eventData.ClearPublishingState();
}
}

Expand Down
15 changes: 15 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,21 @@ private static AmqpMessage BuildAmqpMessageFromEvent(EventData source,
message.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey;
}

if (source.PendingPublishSequenceNumber.HasValue)
{
message.MessageAnnotations.Map[AmqpProperty.PublishedSequenceNumber] = source.PendingPublishSequenceNumber;
}

if (source.PendingProducerGroupId.HasValue)
{
message.MessageAnnotations.Map[AmqpProperty.ProducerGroupId] = source.PendingProducerGroupId;
}

if (source.PendingProducerOwnerLevel.HasValue)
{
message.MessageAnnotations.Map[AmqpProperty.ProducerOwnerLevel] = source.PendingProducerOwnerLevel;
}

return message;
}

Expand Down
50 changes: 25 additions & 25 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ internal class AmqpProducer : TransportProducer
///
private TransportProducerFeatures ActiveFeatures { get; }

/// <summary>
/// The set of options currently active for the partition associated with this producer.
/// </summary>
///
/// <remarks>
/// These options are managed by the producer and will be mutated as part of state
/// updates.
/// </remarks>
///
private PartitionPublishingOptions ActiveOptions { get; }

/// <summary>
/// The policy to use for determining retry behavior for when an operation fails.
/// </summary>
Expand Down Expand Up @@ -139,8 +150,6 @@ public AmqpProducer(string eventHubName,
TransportProducerFeatures requestedFeatures = TransportProducerFeatures.None,
PartitionPublishingOptions partitionOptions = null)
{
partitionOptions ??= new PartitionPublishingOptions();

Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
Expand All @@ -152,9 +161,10 @@ public AmqpProducer(string eventHubName,
ConnectionScope = connectionScope;
MessageConverter = messageConverter;
ActiveFeatures = requestedFeatures;
ActiveOptions = partitionOptions?.Clone() ?? new PartitionPublishingOptions();

SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, partitionOptions, timeout, CancellationToken.None),
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, ActiveOptions, timeout, CancellationToken.None),
link =>
{
link.Session?.SafeClose();
Expand Down Expand Up @@ -280,7 +290,7 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
options.MaximumSizeInBytes ??= MaximumMessageSize;
Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes));

return new AmqpEventBatch(MessageConverter, options, IsSequenceMeasurementRequired(ActiveFeatures));
return new AmqpEventBatch(MessageConverter, options, ActiveFeatures);
}

/// <summary>
Expand Down Expand Up @@ -538,7 +548,7 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs

try
{
link = await ConnectionScope.OpenProducerLinkAsync(partitionId, timeout, cancellationToken).ConfigureAwait(false);
link = await ConnectionScope.OpenProducerLinkAsync(partitionId, ActiveFeatures, partitionOptions, timeout, cancellationToken).ConfigureAwait(false);

if (!MaximumMessageSize.HasValue)
{
Expand All @@ -556,14 +566,16 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs

if (InitializedPartitionProperties == null)
{
if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0)
{
throw new NotImplementedException(nameof(CreateLinkAndEnsureProducerStateAsync));
}
else
{
InitializedPartitionProperties = new PartitionPublishingProperties(false, null, null, null);
}
var producerGroup = link.ExtractSettingPropertyValueOrDefault(AmqpProperty.ProducerGroupId, default(long?));
var ownerLevel = link.ExtractSettingPropertyValueOrDefault(AmqpProperty.ProducerOwnerLevel, default(short?));
var sequence = link.ExtractSettingPropertyValueOrDefault(AmqpProperty.SequenceNumber, default(int?));

// Once the properties are initialized, clear the starting sequence number to ensure that the current
// sequence tracked by the service is used should the link need to be recreated; this avoids the need for
// the transport producer to have awareness of the sequence numbers of events being sent.

InitializedPartitionProperties = new PartitionPublishingProperties(false, producerGroup, ownerLevel, sequence);
partitionOptions.StartingSequenceNumber = null;
}

}
Expand All @@ -575,18 +587,6 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
return link;
}

/// <summary>
/// Determines if measuring a sequence number is required to accurately calculate
/// the size of an event.
/// </summary>
///
/// <param name="activeFeatures">The set of features which are active for the producer.</param>
///
/// <returns><c>true</c> if a sequence number should be measured; otherwise, <c>false</c>.</returns>
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool IsSequenceMeasurementRequired(TransportProducerFeatures activeFeatures) => ((activeFeatures & TransportProducerFeatures.IdempotentPublishing) != 0);

/// <summary>
/// Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
/// </summary>
Expand Down
Loading