Skip to content

Commit

Permalink
[Event Hubs Client] Idempotent Producer Client (#15034)
Browse files Browse the repository at this point in the history
The focus of these changes is implementing the idempotent publishing feature
infrastructure into the `EventHubProducerClient` and associated types,
refactoring the current structure to support idempotent and non-idempotent
publishing as unique code paths.

Not included in these changes is the addition of the `ReadPartitionPublishingProperties`
member and live tests; those will be covered in dedicated work streams.
  • Loading branch information
jsquire authored Sep 11, 2020
1 parent 4ed7fdf commit 3d59356
Show file tree
Hide file tree
Showing 27 changed files with 2,827 additions and 354 deletions.
24 changes: 23 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs
100755 → 100644

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
<value>The requested transport type, '{0}' is not supported.</value>
</data>
<data name="CannotSendWithPartitionIdAndPartitionKey" xml:space="preserve">
<value>A producer created for a specific partition cannot send events using a partition key. This producer is associated with partition '{0}'.</value>
<value>An event cannot be published using both a partition key and a partition identifier. This operation specified partition key `{0}` and partition id `{1}`.</value>
</data>
<data name="UnsupportedCredential" xml:space="preserve">
<value>The credential is not a known and supported credential type. Please use a JWT credential or shared key credential.</value>
Expand Down Expand Up @@ -289,6 +289,12 @@
<value>One or more exceptions occured during event processing. Please see the inner exceptions for more detail.</value>
</data>
<data name="OnlyOneSharedAccessAuthorizationMayBeSpecified" xml:space="preserve">
<value>The authorization for a connection string may specifiy a shared key or precomputed shared access signature, but not both. Please verify that your connection string does not have the `SharedAccessSignature` token if you are passing the `SharedKeyName` and `SharedKey`.</value>
<value>The authorization for a connection string may specify a shared key or pre-computed shared access signature, but not both. Please verify that your connection string does not have the `SharedAccessSignature` token if you are passing the `SharedKeyName` and `SharedKey`.</value>
</data>
<data name="CannotPublishToGateway" xml:space="preserve">
<value>The producer was configured to use features that require publishing to a specific partition. Publishing with automatic routing or using a partition key is not supported by this producer.</value>
</data>
<data name="IdempotentAlreadyPublished" xml:space="preserve">
<value>These events have already been successfully published. When idempotent publishing is enabled, events that were acknowledged by the Event Hubs service may not be published again.</value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
Expand Down Expand Up @@ -495,9 +495,9 @@ public partial class PartitionPublishingProperties
{
protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnabled, long? producerGroupId, short? ownerLevel, int? lastPublishedSequenceNumber) { }
public bool IsIdempotentPublishingEnabled { get { throw null; } }
public int? LastPublishedSequenceNumber { get { throw null; } set { } }
public short? OwnerLevel { get { throw null; } set { } }
public long? ProducerGroupId { get { throw null; } set { } }
public int? LastPublishedSequenceNumber { get { throw null; } }
public short? OwnerLevel { get { throw null; } }
public long? ProducerGroupId { get { throw null; } }
}
public partial class SendEventOptions
{
Expand Down
9 changes: 8 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Amqp;

namespace Azure.Messaging.EventHubs.Amqp
Expand Down Expand Up @@ -374,11 +375,15 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
/// </summary>
///
/// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; if <c>null</c>, the producer is unbound.</param>
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
///
/// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
///
public override TransportProducer CreateProducer(string partitionId,
TransportProducerFeatures requestedFeatures,
PartitionPublishingOptions partitionOptions,
EventHubsRetryPolicy retryPolicy)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
Expand All @@ -389,7 +394,9 @@ public override TransportProducer CreateProducer(string partitionId,
partitionId,
ConnectionScope,
MessageConverter,
retryPolicy
retryPolicy,
requestedFeatures,
partitionOptions
);
}

Expand Down
126 changes: 113 additions & 13 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ internal class AmqpProducer : TransportProducer
///
private string PartitionId { get; }

/// <summary>
/// The flags specifying the set of special transport features that this producer has opted-into.
/// </summary>
///
private TransportProducerFeatures ActiveFeatures { get; }

/// <summary>
/// The policy to use for determining retry behavior for when an operation fails.
/// </summary>
Expand Down Expand Up @@ -92,6 +98,17 @@ internal class AmqpProducer : TransportProducer
///
private long? MaximumMessageSize { get; set; }

/// <summary>
/// The set of partition publishing properties active for this producer at the time it was initialized.
/// </summary>
///
/// <remarks>
/// It is important to note that these properties are a snapshot of the service state at the time when the
/// producer was initialized; they do not necessarily represent the current state of the service.
/// </remarks>
///
private PartitionPublishingProperties InitializedPartitionProperties { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="AmqpProducer"/> class.
/// </summary>
Expand All @@ -101,6 +118,8 @@ internal class AmqpProducer : TransportProducer
/// <param name="connectionScope">The AMQP connection context for operations.</param>
/// <param name="messageConverter">The converter to use for translating between AMQP messages and client types.</param>
/// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -115,8 +134,12 @@ public AmqpProducer(string eventHubName,
string partitionId,
AmqpConnectionScope connectionScope,
AmqpMessageConverter messageConverter,
EventHubsRetryPolicy retryPolicy)
EventHubsRetryPolicy retryPolicy,
TransportProducerFeatures requestedFeatures = TransportProducerFeatures.None,
PartitionPublishingOptions partitionOptions = null)
{
partitionOptions ??= new PartitionPublishingOptions();

Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
Expand All @@ -127,9 +150,10 @@ public AmqpProducer(string eventHubName,
RetryPolicy = retryPolicy;
ConnectionScope = connectionScope;
MessageConverter = messageConverter;
ActiveFeatures = requestedFeatures;

SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None),
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, partitionOptions, timeout, CancellationToken.None),
link =>
{
link.Session?.SafeClose();
Expand Down Expand Up @@ -211,7 +235,6 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
if (!MaximumMessageSize.HasValue)
{
var failedAttemptCount = 0;
var retryDelay = default(TimeSpan?);
var tryTimeout = RetryPolicy.CalculateTryTimeout(0);

while ((!cancellationToken.IsCancellationRequested) && (!_closed))
Expand All @@ -223,13 +246,13 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
}
catch (Exception ex)
{
Exception activeEx = ex.TranslateServiceException(EventHubName);
++failedAttemptCount;

// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
// Otherwise, bubble the exception.

++failedAttemptCount;
retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
var activeEx = ex.TranslateServiceException(EventHubName);
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);

if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
{
Expand All @@ -247,13 +270,7 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
}
}

// If MaximumMessageSize has not been populated nor exception thrown
// by this point, then cancellation has been requested.

if (!MaximumMessageSize.HasValue)
{
throw new TaskCanceledException();
}
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
}

// Ensure that there was a maximum size populated; if none was provided,
Expand All @@ -265,6 +282,74 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
return new AmqpEventBatch(MessageConverter, options);
}

/// <summary>
/// Reads the set of partition publishing properties active for this producer at the time it was initialized.
/// </summary>
///
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
///
/// <returns>The set of <see cref="PartitionPublishingProperties" /> observed when the producer was initialized.</returns>
///
/// <remarks>
/// It is important to note that these properties are a snapshot of the service state at the time when the
/// producer was initialized; they do not necessarily represent the current state of the service.
/// </remarks>
///
public override async ValueTask<PartitionPublishingProperties> ReadInitializationPublishingPropertiesAsync(CancellationToken cancellationToken)
{
Argument.AssertNotClosed(_closed, nameof(AmqpProducer));

// If the properties were already initialized, use them.

if (InitializedPartitionProperties != null)
{
return InitializedPartitionProperties;
}

// Initialize the properties by forcing the link to be opened.

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var failedAttemptCount = 0;
var tryTimeout = RetryPolicy.CalculateTryTimeout(0);

while ((!cancellationToken.IsCancellationRequested) && (!_closed))
{
try
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).ConfigureAwait(false);
break;
}
catch (Exception ex)
{
++failedAttemptCount;

// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
// Otherwise, bubble the exception.

var activeEx = ex.TranslateServiceException(EventHubName);
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);

if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
{
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
}
else if (ex is AmqpException)
{
ExceptionDispatchInfo.Capture(activeEx).Throw();
}
else
{
throw;
}
}
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
return InitializedPartitionProperties;
}

/// <summary>
/// Closes the connection to the transport producer instance.
/// </summary>
Expand Down Expand Up @@ -430,6 +515,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>null</c>.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
///
Expand All @@ -443,6 +529,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
/// </remarks>
///
protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAsync(string partitionId,
PartitionPublishingOptions partitionOptions,
TimeSpan timeout,
CancellationToken cancellationToken)
{
Expand All @@ -465,6 +552,19 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaximumMessageSize = (long)link.Settings.MaxMessageSize;
}

if (InitializedPartitionProperties == null)
{
if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0)
{
throw new NotImplementedException(nameof(CreateLinkAndEnsureProducerStateAsync));
}
else
{
InitializedPartitionProperties = new PartitionPublishingProperties(false, null, null, null);
}
}

}
catch (Exception ex)
{
Expand Down
Loading

0 comments on commit 3d59356

Please sign in to comment.