diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs index 12e720513448..c65321595b7f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs @@ -14,6 +14,7 @@ using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Diagnostics; +using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Encoding; using Microsoft.Azure.Amqp.Framing; @@ -294,12 +295,16 @@ public virtual async Task OpenConsumerLinkAsync(string consum /// /// /// The identifier of the Event Hub partition to which the link should be bound; if unbound, null. + /// The set of features which are active for the producer requesting the link. + /// The set of options to consider when creating the link. /// The timeout to apply when creating the link. /// An optional instance to signal the request to cancel the operation. /// /// A link for use with producer operations. /// public virtual async Task OpenProducerLinkAsync(string partitionId, + TransportProducerFeatures features, + PartitionPublishingOptions options, TimeSpan timeout, CancellationToken cancellationToken) { @@ -312,7 +317,7 @@ public virtual async Task OpenProducerLinkAsync(string partitio var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); - var link = await CreateSendingLinkAsync(connection, producerEndpoint, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false); + var link = await CreateSendingLinkAsync(connection, producerEndpoint, features, options, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false); @@ -523,7 +528,7 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon if (ownerLevel.HasValue) { - linkSettings.AddProperty(AmqpProperty.OwnerLevel, ownerLevel.Value); + linkSettings.AddProperty(AmqpProperty.ConsumerOwnerLevel, ownerLevel.Value); } if (trackLastEnqueuedEventProperties) @@ -578,6 +583,8 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon /// /// The active and opened AMQP connection to use for this link. /// The fully qualified endpoint to open the link for. + /// The set of features which are active for the producer requesting the link. + /// The set of options to consider when creating the link. /// The timeout to apply when creating the link. /// An optional instance to signal the request to cancel the operation. /// @@ -585,6 +592,8 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon /// protected virtual async Task CreateSendingLinkAsync(AmqpConnection connection, Uri endpoint, + TransportProducerFeatures features, + PartitionPublishingOptions options, TimeSpan timeout, CancellationToken cancellationToken) { @@ -623,6 +632,22 @@ protected virtual async Task CreateSendingLinkAsync(AmqpConnect linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds); linkSettings.AddProperty(AmqpProperty.EntityType, (int)AmqpProperty.Entity.EventHub); + if ((features & TransportProducerFeatures.IdempotentPublishing) != 0) + { + linkSettings.DesiredCapabilities ??= new Multiple(); + linkSettings.DesiredCapabilities.Add(AmqpProperty.EnableIdempotentPublishing); + } + + // These values must either all be specified or none. It is valid for an individual value to be null to signal the + // service to supply the value, so long as the key is present. + + if ((options.ProducerGroupId.HasValue) || (options.OwnerLevel.HasValue) || (options.StartingSequenceNumber.HasValue)) + { + linkSettings.AddProperty(AmqpProperty.ProducerGroupId, options.ProducerGroupId); + linkSettings.AddProperty(AmqpProperty.ProducerOwnerLevel, options.OwnerLevel); + linkSettings.AddProperty(AmqpProperty.PublishedSequenceNumber, options.StartingSequenceNumber); + } + var link = new SendingAmqpLink(linkSettings); linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }"; link.AttachTo(session); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs old mode 100755 new mode 100644 index f05502d8e299..8948b807a4c6 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs @@ -47,6 +47,18 @@ internal static class AmqpError /// public static AmqpSymbol ArgumentOutOfRangeError { get; } = AmqpConstants.Vendor + ":argument-out-of-range"; + /// + /// Indicates that a sequence number was out of order. + /// + /// + public static AmqpSymbol SequenceOutOfOrderError { get; } = AmqpConstants.Vendor + ":out-of-order-sequence"; + + /// + /// Indicates that a partition was stolen by another producer with exclusive access. + /// + /// + public static AmqpSymbol ProducerStolenError { get; } = AmqpConstants.Vendor + ":producer-epoch-stolen"; + /// /// The expression to test for when the service returns a "Not Found" response to determine the context. /// @@ -179,6 +191,20 @@ private static Exception CreateException(string condition, return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.ConsumerDisconnected); } + // The producer was superseded by one with a higher owner level. + + if (string.Equals(condition, ProducerStolenError.Value, StringComparison.InvariantCultureIgnoreCase)) + { + return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.ProducerDisconnected); + } + + // The client-supplied sequence number was not in the expected order. + + if (string.Equals(condition, SequenceOutOfOrderError.Value, StringComparison.InvariantCultureIgnoreCase)) + { + return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.InvalidClientState); + } + // Authorization was denied. if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value, StringComparison.InvariantCultureIgnoreCase)) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs index b7da325e8e76..bbc3b0d75f87 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs @@ -52,17 +52,10 @@ internal class AmqpEventBatch : TransportEventBatch public override long SizeInBytes => _sizeBytes; /// - /// A flag that indicates whether space should be reserved for a publishing - /// sequence number when the event size is measured. If false, a sequence - /// number is not used in size calculations. + /// The flags specifying the set of special transport features that have been opted-into. /// /// - /// - /// The sequence number is only populated and relevant when certain features - /// of the producer are enabled. For example, it is used by idempotent publishing. - /// - /// - public override bool ReserveSpaceForSequenceNumber { get; } + public override TransportProducerFeatures ActiveFeatures { get; } /// /// The count of events contained in the batch. @@ -94,11 +87,11 @@ internal class AmqpEventBatch : TransportEventBatch /// /// The converter to use for translating into the corresponding AMQP message. /// The set of options to apply to the batch. - /// A flag that indicates whether space should be reserved for a publishing sequence number when the event size is measured. If false, a sequence number is not used in size calculations. + /// The flags specifying the set of special transport features have been opted-into. /// public AmqpEventBatch(AmqpMessageConverter messageConverter, CreateBatchOptions options, - bool reserveSpaceForSequenceNumber) + TransportProducerFeatures activeFeatures) { Argument.AssertNotNull(messageConverter, nameof(messageConverter)); Argument.AssertNotNull(options, nameof(options)); @@ -107,7 +100,7 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter, MessageConverter = messageConverter; Options = options; MaximumSizeInBytes = options.MaximumSizeInBytes.Value; - ReserveSpaceForSequenceNumber = reserveSpaceForSequenceNumber; + ActiveFeatures = activeFeatures; // Initialize the size by reserving space for the batch envelope. @@ -130,15 +123,20 @@ public override bool TryAdd(EventData eventData) Argument.AssertNotNull(eventData, nameof(eventData)); Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch)); - if (ReserveSpaceForSequenceNumber) + // Reserve space for producer-owned fields that correspond to special + // features, if enabled. + + if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0) { eventData.PendingPublishSequenceNumber = int.MaxValue; + eventData.PendingProducerGroupId = long.MaxValue; + eventData.PendingProducerOwnerLevel = short.MaxValue; } - var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey); - try { + using var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey); + // Calculate the size for the event, based on the AMQP message size and accounting for a // bit of reserved overhead size. @@ -160,8 +158,7 @@ public override bool TryAdd(EventData eventData) } finally { - eventData.PendingPublishSequenceNumber = default; - eventMessage?.Dispose(); + eventData.ClearPublishingState(); } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs old mode 100755 new mode 100644 index 5549207e3f54..d3536f3cba95 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs @@ -325,6 +325,21 @@ private static AmqpMessage BuildAmqpMessageFromEvent(EventData source, message.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey; } + if (source.PendingPublishSequenceNumber.HasValue) + { + message.MessageAnnotations.Map[AmqpProperty.PublishedSequenceNumber] = source.PendingPublishSequenceNumber; + } + + if (source.PendingProducerGroupId.HasValue) + { + message.MessageAnnotations.Map[AmqpProperty.ProducerGroupId] = source.PendingProducerGroupId; + } + + if (source.PendingProducerOwnerLevel.HasValue) + { + message.MessageAnnotations.Map[AmqpProperty.ProducerOwnerLevel] = source.PendingProducerOwnerLevel; + } + return message; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs index 1f6e499fa152..f62109397abb 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs @@ -65,6 +65,17 @@ internal class AmqpProducer : TransportProducer /// private TransportProducerFeatures ActiveFeatures { get; } + /// + /// The set of options currently active for the partition associated with this producer. + /// + /// + /// + /// These options are managed by the producer and will be mutated as part of state + /// updates. + /// + /// + private PartitionPublishingOptions ActiveOptions { get; } + /// /// The policy to use for determining retry behavior for when an operation fails. /// @@ -139,8 +150,6 @@ public AmqpProducer(string eventHubName, TransportProducerFeatures requestedFeatures = TransportProducerFeatures.None, PartitionPublishingOptions partitionOptions = null) { - partitionOptions ??= new PartitionPublishingOptions(); - Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); Argument.AssertNotNull(connectionScope, nameof(connectionScope)); Argument.AssertNotNull(messageConverter, nameof(messageConverter)); @@ -152,9 +161,10 @@ public AmqpProducer(string eventHubName, ConnectionScope = connectionScope; MessageConverter = messageConverter; ActiveFeatures = requestedFeatures; + ActiveOptions = partitionOptions?.Clone() ?? new PartitionPublishingOptions(); SendLink = new FaultTolerantAmqpObject( - timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, partitionOptions, timeout, CancellationToken.None), + timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, ActiveOptions, timeout, CancellationToken.None), link => { link.Session?.SafeClose(); @@ -280,7 +290,7 @@ public override async ValueTask CreateBatchAsync(CreateBatc options.MaximumSizeInBytes ??= MaximumMessageSize; Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes)); - return new AmqpEventBatch(MessageConverter, options, IsSequenceMeasurementRequired(ActiveFeatures)); + return new AmqpEventBatch(MessageConverter, options, ActiveFeatures); } /// @@ -538,7 +548,7 @@ protected virtual async Task CreateLinkAndEnsureProducerStateAs try { - link = await ConnectionScope.OpenProducerLinkAsync(partitionId, timeout, cancellationToken).ConfigureAwait(false); + link = await ConnectionScope.OpenProducerLinkAsync(partitionId, ActiveFeatures, partitionOptions, timeout, cancellationToken).ConfigureAwait(false); if (!MaximumMessageSize.HasValue) { @@ -556,14 +566,16 @@ protected virtual async Task CreateLinkAndEnsureProducerStateAs if (InitializedPartitionProperties == null) { - if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0) - { - throw new NotImplementedException(nameof(CreateLinkAndEnsureProducerStateAsync)); - } - else - { - InitializedPartitionProperties = new PartitionPublishingProperties(false, null, null, null); - } + var producerGroup = link.ExtractSettingPropertyValueOrDefault(AmqpProperty.ProducerGroupId, default(long?)); + var ownerLevel = link.ExtractSettingPropertyValueOrDefault(AmqpProperty.ProducerOwnerLevel, default(short?)); + var sequence = link.ExtractSettingPropertyValueOrDefault(AmqpProperty.SequenceNumber, default(int?)); + + // Once the properties are initialized, clear the starting sequence number to ensure that the current + // sequence tracked by the service is used should the link need to be recreated; this avoids the need for + // the transport producer to have awareness of the sequence numbers of events being sent. + + InitializedPartitionProperties = new PartitionPublishingProperties(false, producerGroup, ownerLevel, sequence); + partitionOptions.StartingSequenceNumber = null; } } @@ -575,18 +587,6 @@ protected virtual async Task CreateLinkAndEnsureProducerStateAs return link; } - /// - /// Determines if measuring a sequence number is required to accurately calculate - /// the size of an event. - /// - /// - /// The set of features which are active for the producer. - /// - /// true if a sequence number should be measured; otherwise, false. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static bool IsSequenceMeasurementRequired(TransportProducerFeatures activeFeatures) => ((activeFeatures & TransportProducerFeatures.IdempotentPublishing) != 0); - /// /// Uses the minimum value of the two specified instances. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs old mode 100755 new mode 100644 index b7e1bac2d720..2ecc701f8dec --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs @@ -14,10 +14,16 @@ namespace Azure.Messaging.EventHubs internal static class AmqpProperty { /// - /// The owner level (a.k.a. epoch) to associate with a link. + /// The owner level (a.k.a. epoch) to associate with a receiver link. /// /// - public static AmqpSymbol OwnerLevel { get; } = AmqpConstants.Vendor + ":epoch"; + public static AmqpSymbol ConsumerOwnerLevel { get; } = AmqpConstants.Vendor + ":epoch"; + + /// + /// The owner level (a.k.a. epoch) to associate with a sending link. + /// + /// + public static AmqpSymbol ProducerOwnerLevel { get; } = AmqpConstants.Vendor + ":producer-epoch"; /// /// The type of Event Hubs entity to associate with a link. @@ -31,6 +37,24 @@ internal static class AmqpProperty /// public static AmqpSymbol TrackLastEnqueuedEventProperties { get; } = AmqpConstants.Vendor + ":enable-receiver-runtime-metric"; + /// + /// The capability for opting-into idempotent publishing. + /// + /// + public static AmqpSymbol EnableIdempotentPublishing { get; } = AmqpConstants.Vendor + ":idempotent-producer"; + + /// + /// The identifier of the producer group to associate with a producer. + /// + /// + public static AmqpSymbol ProducerGroupId { get; } = AmqpConstants.Vendor + ":producer-sequence-number"; + + /// + /// The sequence number assigned by a producer to an event when it was published. + /// + /// + public static AmqpSymbol PublishedSequenceNumber { get; } = AmqpConstants.Vendor + ":producer-id"; + /// /// The timeout to associate with a link. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs index 168d315736b1..b46550d06ef4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs @@ -31,17 +31,10 @@ internal abstract class TransportEventBatch : IDisposable public abstract long SizeInBytes { get; } /// - /// A flag that indicates whether space should be reserved for a publishing - /// sequence number when the event size is measured. If false, a sequence - /// number is not used in size calculations. + /// The flags specifying the set of special transport features that have been opted-into. /// /// - /// - /// The sequence number is only populated and relevant when certain features - /// of the producer are enabled. For example, it is used by idempotent publishing. - /// - /// - public abstract bool ReserveSpaceForSequenceNumber { get; } + public abstract TransportProducerFeatures ActiveFeatures { get; } /// /// The count of events contained in the batch. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs index cf4c0dae67a3..28d0cfb7ed42 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs @@ -145,24 +145,6 @@ public Stream BodyAsStream /// public string PartitionKey { get; } - /// - /// The publishing sequence number assigned to the event as part of a publishing operation. - /// - /// - /// - /// The sequence number that was assigned during publishing, if the event was successfully - /// published by a sequence-aware producer. If the producer was not configured to apply - /// sequence numbering or if the event has not yet been successfully published, this member - /// will be null. - /// - /// - /// - /// The published sequence number is only populated and relevant when certain features - /// of the producer are enabled. For example, it is used by idempotent publishing. - /// - /// - internal int? PendingPublishSequenceNumber { get; set; } - /// /// The sequence number of the event that was last enqueued into the Event Hub partition from which this /// event was received. @@ -211,6 +193,54 @@ public Stream BodyAsStream /// internal DateTimeOffset? LastPartitionPropertiesRetrievalTime { get; } + /// + /// The publishing sequence number assigned to the event as part of a publishing operation. + /// + /// + /// + /// This member is only populated while a publishing operation is taking place; once the + /// operation has completed, successfully or not, the value is cleared. + /// + /// + /// + /// The published sequence number is only populated and relevant when certain features + /// of the producer are enabled. For example, it is used by idempotent publishing. + /// + /// + internal int? PendingPublishSequenceNumber { get; set; } + + /// + /// The producer group identifier assigned to the event as part of a publishing operation. + /// + /// + /// + /// This member is only populated while a publishing operation is taking place; once the + /// operation has completed, successfully or not, the value is cleared. + /// + /// + /// + /// The producer group identifier is only populated and relevant when certain features + /// of the producer are enabled. For example, it is used by idempotent publishing. + /// + /// + internal long? PendingProducerGroupId { get; set; } + + /// + /// The producer owner level assigned to the event as part of a publishing operation. + /// + /// + /// + /// This member is only populated while a publishing operation is taking place; once the + /// operation has completed, successfully or not, the value is cleared. + /// + /// + /// + /// The producer group identifier is only populated and relevant when certain features + /// of the producer are enabled. For example, it is used by idempotent publishing. + /// + /// + internal short? PendingProducerOwnerLevel { get; set; } + /// /// Initializes a new instance of the class. /// @@ -238,6 +268,8 @@ public EventData(ReadOnlyMemory eventBody) : this(eventBody, lastPartition /// The date and time, in UTC, that the last event information for the Event Hub partition was retrieved from the service. /// The publishing sequence number assigned to the event at the time it was successfully published. /// The publishing sequence number assigned to the event as part of a publishing operation. + /// The producer group identifier assigned to the event as part of a publishing operation. + /// The producer owner level assigned to the event as part of a publishing operation. /// internal EventData(ReadOnlyMemory eventBody, IDictionary properties = null, @@ -251,7 +283,9 @@ internal EventData(ReadOnlyMemory eventBody, DateTimeOffset? lastPartitionEnqueuedTime = null, DateTimeOffset? lastPartitionPropertiesRetrievalTime = null, int? publishedSequenceNumber = null, - int? pendingPublishSequenceNumber = null) + int? pendingPublishSequenceNumber = null, + long? pendingProducerGroupId = null, + short? pendingOwnerLevel = null) { Body = eventBody; Properties = properties ?? new Dictionary(); @@ -260,12 +294,14 @@ internal EventData(ReadOnlyMemory eventBody, Offset = offset; EnqueuedTime = enqueuedTime; PartitionKey = partitionKey; - PendingPublishSequenceNumber = pendingPublishSequenceNumber; LastPartitionSequenceNumber = lastPartitionSequenceNumber; LastPartitionOffset = lastPartitionOffset; LastPartitionEnqueuedTime = lastPartitionEnqueuedTime; LastPartitionPropertiesRetrievalTime = lastPartitionPropertiesRetrievalTime; PublishedSequenceNumber = publishedSequenceNumber; + PendingPublishSequenceNumber = pendingPublishSequenceNumber; + PendingProducerGroupId = pendingProducerGroupId; + PendingProducerOwnerLevel = pendingOwnerLevel; } /// @@ -320,12 +356,23 @@ protected EventData(ReadOnlyMemory eventBody, public override string ToString() => base.ToString(); /// - /// Transitions the pending publishing sequence number to the published sequence number. + /// Transitions the pending state to its permanent form. /// /// internal void CommitPublishingState() { PublishedSequenceNumber = PendingPublishSequenceNumber; + ClearPublishingState(); + } + + /// + /// Clears the pending publishing state. + /// + /// + internal void ClearPublishingState() + { + PendingProducerGroupId = default; + PendingProducerOwnerLevel = default; PendingPublishSequenceNumber = default; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index 493c86c46da5..3c26d54e5faa 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -884,6 +884,8 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, { lastSequence = NextSequence(lastSequence); eventData.PendingPublishSequenceNumber = lastSequence; + eventData.PendingProducerGroupId = partitionState.ProducerGroupId; + eventData.PendingProducerOwnerLevel = partitionState.OwnerLevel; } // Publish the events. @@ -893,7 +895,7 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, EventHubsEventSource.Log.IdempotentSequencePublish(EventHubName, options.PartitionId, firstSequence, lastSequence); await SendInternalAsync(eventSet, options, cancellationToken).ConfigureAwait(false); - // Update state and commit the sequencing. + // Update state and commit the state. EventHubsEventSource.Log.IdempotentSequenceUpdate(EventHubName, options.PartitionId, partitionState.LastPublishedSequenceNumber.Value, lastSequence); partitionState.LastPublishedSequenceNumber = lastSequence; @@ -905,11 +907,11 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, } catch { - // Clear the pending sequence numbers in the face of an exception. + // Clear the pending state in the face of an exception. foreach (var eventData in eventSet) { - eventData.PendingPublishSequenceNumber = null; + eventData.ClearPublishingState(); } throw; @@ -983,6 +985,8 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch, { lastSequence = NextSequence(lastSequence); eventData.PendingPublishSequenceNumber = lastSequence; + eventData.PendingProducerGroupId = partitionState.ProducerGroupId; + eventData.PendingProducerOwnerLevel = partitionState.OwnerLevel; } // Publish the events. @@ -1005,7 +1009,7 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch, foreach (var eventData in eventSet) { - eventData.PendingPublishSequenceNumber = null; + eventData.ClearPublishingState(); } throw; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs index e06736ad275a..88c2b515cb2b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System; +using System.Collections.Generic; using System.Collections.Concurrent; using System.Linq; using System.Net; @@ -12,12 +13,15 @@ using Azure.Messaging.EventHubs.Amqp; using Azure.Messaging.EventHubs.Authorization; using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.Amqp.Transport; using Moq; using Moq.Protected; using NUnit.Framework; +using NUnit.Framework.Constraints; namespace Azure.Messaging.EventHubs.Tests { @@ -29,6 +33,23 @@ namespace Azure.Messaging.EventHubs.Tests [TestFixture] public class AmqpConnectionScopeTests { + /// + /// The set test cases for partially populated partition publishing options. + /// + /// + public static IEnumerable PartitionPublishingPartialOptionsTestCases + { + get + { + yield return new object[] { new PartitionPublishingOptions { ProducerGroupId = 123 } }; + yield return new object[] { new PartitionPublishingOptions { OwnerLevel = 123 } }; + yield return new object[] { new PartitionPublishingOptions { StartingSequenceNumber = 123 } }; + yield return new object[] { new PartitionPublishingOptions { ProducerGroupId = 123, OwnerLevel = 789 } }; + yield return new object[] { new PartitionPublishingOptions { ProducerGroupId = 123, StartingSequenceNumber = 789 } }; + yield return new object[] { new PartitionPublishingOptions { StartingSequenceNumber = 123, OwnerLevel = 789 } }; + } + } + /// /// Verifies functionality of the constructor. /// @@ -498,7 +519,7 @@ public async Task OpenConsumerLinkAsyncConfiguresTheLink() Assert.That(link.Settings.TotalLinkCredit, Is.EqualTo(prefetchCount), "The prefetch count should have been used to set the credits."); Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.EntityType.ToString()), Is.True, "There should be an entity type specified."); - Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.OwnerLevel, -1), Is.EqualTo(ownerLevel), "The owner level should have been used."); + Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.ConsumerOwnerLevel, -1), Is.EqualTo(ownerLevel), "The owner level should have been used."); Assert.That(link.Settings.DesiredCapabilities, Is.Not.Null, "There should have been a set of desired capabilities created."); Assert.That(link.Settings.DesiredCapabilities.Contains(AmqpProperty.TrackLastEnqueuedEventProperties), Is.True, "Last event tracking should be requested."); @@ -565,7 +586,7 @@ public async Task OpenConsumerLinkAsyncRespectsTheOwnerLevelOption() var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, prefetchSizeInBytes, ownerLevel, trackLastEvent, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); - Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.OwnerLevel, long.MinValue), Is.EqualTo(long.MinValue), "The owner level should have been used."); + Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.ConsumerOwnerLevel, long.MinValue), Is.EqualTo(long.MinValue), "The owner level should have been used."); } /// @@ -950,7 +971,7 @@ public void OpenProducerLinkAsyncRespectsTokenCancellation() var cancellationSource = new CancellationTokenSource(); cancellationSource.Cancel(); - Assert.That(() => scope.OpenProducerLinkAsync(partitionId, TimeSpan.FromDays(1), cancellationSource.Token), Throws.InstanceOf()); + Assert.That(() => scope.OpenProducerLinkAsync(partitionId, TransportProducerFeatures.None, new PartitionPublishingOptions(), TimeSpan.FromDays(1), cancellationSource.Token), Throws.InstanceOf()); } /// @@ -970,7 +991,7 @@ public void OpenProducerLinkAsyncRespectsDisposal() var scope = new AmqpConnectionScope(endpoint, eventHub, credential.Object, transport, null, identifier); scope.Dispose(); - Assert.That(() => scope.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf()); + Assert.That(() => scope.OpenProducerLinkAsync(null, TransportProducerFeatures.None, new PartitionPublishingOptions(), TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf()); } /// @@ -987,6 +1008,8 @@ public async Task OpenProducerLinkAsyncRequestsTheLink() var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); var transport = EventHubsTransportType.AmqpTcp; var identifier = "customIdentIFIER"; + var features = TransportProducerFeatures.IdempotentPublishing; + var options = new PartitionPublishingOptions(); var cancellationSource = new CancellationTokenSource(); var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); @@ -1014,6 +1037,8 @@ public async Task OpenProducerLinkAsyncRequestsTheLink() .Setup>("CreateSendingLinkAsync", ItExpr.Is(value => value == mockConnection), ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)), + ItExpr.Is(value => value == features), + ItExpr.Is(value => value == options), ItExpr.IsAny(), ItExpr.Is(value => value == cancellationSource.Token)) .Returns(Task.FromResult(mockLink)) @@ -1027,7 +1052,7 @@ public async Task OpenProducerLinkAsyncRequestsTheLink() .Returns(Task.CompletedTask) .Verifiable(); - var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, TimeSpan.FromDays(1), cancellationSource.Token); + var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, features, options, TimeSpan.FromDays(1), cancellationSource.Token); Assert.That(link, Is.EqualTo(mockLink), "The mock return was incorrect"); mockScope.VerifyAll(); @@ -1047,6 +1072,214 @@ public async Task OpenProducerLinkAsyncConfiguresTheLink() var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); var transport = EventHubsTransportType.AmqpTcp; var identifier = "customIdentIFIER"; + var features = TransportProducerFeatures.IdempotentPublishing; + var cancellationSource = new CancellationTokenSource(); + var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); + + var mockScope = new Mock(endpoint, eventHub, credential.Object, transport, null, identifier) + { + CallBase = true + }; + + var options = new PartitionPublishingOptions + { + ProducerGroupId = 123, + OwnerLevel = 456, + StartingSequenceNumber = 789 + }; + + mockScope + .Protected() + .Setup>("CreateAndOpenConnectionAsync", + ItExpr.IsAny(), + ItExpr.Is(value => value == endpoint), + ItExpr.Is(value => value == transport), + ItExpr.Is(value => value == null), + ItExpr.Is(value => value == identifier), + ItExpr.IsAny()) + .Returns(Task.FromResult(mockConnection)); + + mockScope + .Protected() + .Setup>("RequestAuthorizationUsingCbsAsync", + ItExpr.Is(value => value == mockConnection), + ItExpr.IsAny(), + ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Send), + ItExpr.IsAny()) + .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1))); + + mockScope + .Protected() + .Setup("OpenAmqpObjectAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .Returns(Task.CompletedTask); + + var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, features, options, TimeSpan.FromDays(1), cancellationSource.Token); + Assert.That(link, Is.Not.Null, "The link produced was null"); + + var linkTarget = (Target)link.Settings.Target; + Assert.That(linkTarget.Address.ToString(), Contains.Substring($"/{ partitionId }"), "The partition identifier should have been part of the link address."); + Assert.That(link.Settings.DesiredCapabilities.Contains(AmqpProperty.EnableIdempotentPublishing), Is.True, "The idempotent publishing capability should have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.EntityType.ToString()), Is.True, "There should be an entity type specified."); + Assert.That(link.Settings.Properties[AmqpProperty.ProducerGroupId], Is.EqualTo(options.ProducerGroupId), "The producer group should have been set."); + Assert.That(link.Settings.Properties[AmqpProperty.ProducerOwnerLevel], Is.EqualTo(options.OwnerLevel), "The owner level should have been set."); + Assert.That(link.Settings.Properties[AmqpProperty.PublishedSequenceNumber], Is.EqualTo(options.StartingSequenceNumber), "The published sequence number should have been set."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task OpenProducerLinkAsyncConfiguresTheLinkWhenOptionsAreEmpty() + { + var endpoint = new Uri("amqp://test.service.gov"); + var eventHub = "myHub"; + var partitionId = "00_partition_00"; + var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); + var transport = EventHubsTransportType.AmqpTcp; + var identifier = "customIdentIFIER"; + var options = new PartitionPublishingOptions(); + var features = TransportProducerFeatures.IdempotentPublishing; + var cancellationSource = new CancellationTokenSource(); + var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); + + var mockScope = new Mock(endpoint, eventHub, credential.Object, transport, null, identifier) + { + CallBase = true + }; + + mockScope + .Protected() + .Setup>("CreateAndOpenConnectionAsync", + ItExpr.IsAny(), + ItExpr.Is(value => value == endpoint), + ItExpr.Is(value => value == transport), + ItExpr.Is(value => value == null), + ItExpr.Is(value => value == identifier), + ItExpr.IsAny()) + .Returns(Task.FromResult(mockConnection)); + + mockScope + .Protected() + .Setup>("RequestAuthorizationUsingCbsAsync", + ItExpr.Is(value => value == mockConnection), + ItExpr.IsAny(), + ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Send), + ItExpr.IsAny()) + .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1))); + + mockScope + .Protected() + .Setup("OpenAmqpObjectAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .Returns(Task.CompletedTask); + + var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, features, options, TimeSpan.FromDays(1), cancellationSource.Token); + Assert.That(link, Is.Not.Null, "The link produced was null"); + + var linkTarget = (Target)link.Settings.Target; + Assert.That(linkTarget.Address.ToString(), Contains.Substring($"/{ partitionId }"), "The partition identifier should have been part of the link address."); + Assert.That(link.Settings.DesiredCapabilities.Contains(AmqpProperty.EnableIdempotentPublishing), Is.True, "The idempotent publishing capability should have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.EntityType.ToString()), Is.True, "There should be an entity type specified."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.ProducerGroupId.ToString()), Is.False, "The producer group should not have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.ProducerOwnerLevel.ToString()), Is.False, "The owner level should not have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.PublishedSequenceNumber.ToString()), Is.False, "The published sequence number should not have been set."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCaseSource(nameof(PartitionPublishingPartialOptionsTestCases))] + public async Task OpenProducerLinkAsyncConfiguresTheLinkWhenOptionsAreEmpty(PartitionPublishingOptions options) + { + var endpoint = new Uri("amqp://test.service.gov"); + var eventHub = "myHub"; + var partitionId = "00_partition_00"; + var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); + var transport = EventHubsTransportType.AmqpTcp; + var identifier = "customIdentIFIER"; + var features = TransportProducerFeatures.IdempotentPublishing; + var cancellationSource = new CancellationTokenSource(); + var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); + + var mockScope = new Mock(endpoint, eventHub, credential.Object, transport, null, identifier) + { + CallBase = true + }; + + mockScope + .Protected() + .Setup>("CreateAndOpenConnectionAsync", + ItExpr.IsAny(), + ItExpr.Is(value => value == endpoint), + ItExpr.Is(value => value == transport), + ItExpr.Is(value => value == null), + ItExpr.Is(value => value == identifier), + ItExpr.IsAny()) + .Returns(Task.FromResult(mockConnection)); + + mockScope + .Protected() + .Setup>("RequestAuthorizationUsingCbsAsync", + ItExpr.Is(value => value == mockConnection), + ItExpr.IsAny(), + ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Send), + ItExpr.IsAny()) + .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1))); + + mockScope + .Protected() + .Setup("OpenAmqpObjectAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .Returns(Task.CompletedTask); + + var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, features, options, TimeSpan.FromDays(1), cancellationSource.Token); + Assert.That(link, Is.Not.Null, "The link produced was null"); + + var linkTarget = (Target)link.Settings.Target; + Assert.That(linkTarget.Address.ToString(), Contains.Substring($"/{ partitionId }"), "The partition identifier should have been part of the link address."); + Assert.That(link.Settings.DesiredCapabilities.Contains(AmqpProperty.EnableIdempotentPublishing), Is.True, "The idempotent publishing capability should have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.EntityType.ToString()), Is.True, "There should be an entity type specified."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.ProducerGroupId.ToString()), Is.True, "The producer group should have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.ProducerOwnerLevel.ToString()), Is.True, "The owner level should have been set."); + Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.PublishedSequenceNumber.ToString()), Is.True, "The published sequence number should have been set."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task OpenProducerLinkAsyncConfiguresTheLinkWhenNoFeaturesAreActive() + { + var endpoint = new Uri("amqp://test.service.gov"); + var eventHub = "myHub"; + var partitionId = "00_partition_00"; + var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); + var transport = EventHubsTransportType.AmqpTcp; + var identifier = "customIdentIFIER"; + var features = TransportProducerFeatures.None; var cancellationSource = new CancellationTokenSource(); var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); @@ -1056,6 +1289,13 @@ public async Task OpenProducerLinkAsyncConfiguresTheLink() CallBase = true }; + var options = new PartitionPublishingOptions + { + ProducerGroupId = 123, + OwnerLevel = 456, + StartingSequenceNumber = 789 + }; + mockScope .Protected() .Setup>("CreateAndOpenConnectionAsync", @@ -1086,12 +1326,16 @@ public async Task OpenProducerLinkAsyncConfiguresTheLink() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, TimeSpan.FromDays(1), cancellationSource.Token); + var link = await mockScope.Object.OpenProducerLinkAsync(partitionId, features, options, TimeSpan.FromDays(1), cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var linkTarget = (Target)link.Settings.Target; Assert.That(linkTarget.Address.ToString(), Contains.Substring($"/{ partitionId }"), "The partition identifier should have been part of the link address."); + Assert.That(link.Settings.DesiredCapabilities, Is.Null, "The idempotent publishing capability should not have been set."); Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.EntityType.ToString()), Is.True, "There should be an entity type specified."); + Assert.That(link.Settings.Properties[AmqpProperty.ProducerGroupId], Is.EqualTo(options.ProducerGroupId), "The producer group should have been set."); + Assert.That(link.Settings.Properties[AmqpProperty.ProducerOwnerLevel], Is.EqualTo(options.OwnerLevel), "The owner level should have been set."); + Assert.That(link.Settings.Properties[AmqpProperty.PublishedSequenceNumber], Is.EqualTo(options.StartingSequenceNumber), "The published sequence number should have been set."); } /// @@ -1150,7 +1394,7 @@ public async Task OpenProducerLinkAsyncManagesActiveLinks() Assert.That(activeLinks, Is.Not.Null, "The set of active links was null."); Assert.That(activeLinks.Count, Is.Zero, "There should be no active links when none have been created."); - var link = await mockScope.Object.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token); + var link = await mockScope.Object.OpenProducerLinkAsync(null, TransportProducerFeatures.None, new PartitionPublishingOptions(), TimeSpan.FromDays(1), cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); Assert.That(activeLinks.Count, Is.EqualTo(1), "There should be an active link being tracked."); @@ -1236,7 +1480,7 @@ public async Task OpenProducerLinkAsyncConfiguresAuthorizationRefresh() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token); + var link = await mockScope.Object.OpenProducerLinkAsync(null, TransportProducerFeatures.None, new PartitionPublishingOptions(), TimeSpan.FromDays(1), cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var activeLinks = GetActiveLinks(mockScope.Object); @@ -1312,7 +1556,7 @@ public async Task OpenProducerLinkAsyncRefreshesAuthorization() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token); + var link = await mockScope.Object.OpenProducerLinkAsync(null, TransportProducerFeatures.None, new PartitionPublishingOptions(), TimeSpan.FromDays(1), cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var activeLinks = GetActiveLinks(mockScope.Object); @@ -1398,7 +1642,7 @@ public async Task AuthorizationTimerCallbackToleratesDisposal() var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); var mockScope = new DisposeOnAuthorizationTimerCallbackMockScope(endpoint, eventHub, credential.Object, transport, null); - var link = await mockScope.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token); + var link = await mockScope.OpenProducerLinkAsync(null, TransportProducerFeatures.None, new PartitionPublishingOptions(), TimeSpan.FromDays(1), cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var activeLinks = GetActiveLinks(mockScope); @@ -1701,7 +1945,6 @@ public async Task RequestAuthorizationUsingCbsAsyncRespectsTheConnectionClosing( "The token should not have been requested."); } - /// /// Gets the active connection for the given scope, using the /// private property accessor. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs old mode 100755 new mode 100644 index 3122ceb9f431..390db4ce4e99 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs @@ -31,6 +31,8 @@ public static IEnumerable SimpleConditionExceptionMatchTestCases() yield return new object[] { AmqpError.TimeoutError, typeof(EventHubsException), EventHubsException.FailureReason.ServiceTimeout }; yield return new object[] { AmqpError.ServerBusyError, typeof(EventHubsException), EventHubsException.FailureReason.ServiceBusy }; + yield return new object[] { AmqpError.ProducerStolenError, typeof(EventHubsException), EventHubsException.FailureReason.ProducerDisconnected }; + yield return new object[] { AmqpError.SequenceOutOfOrderError, typeof(EventHubsException), EventHubsException.FailureReason.InvalidClientState }; yield return new object[] { AmqpError.ArgumentError, typeof(ArgumentException), null }; yield return new object[] { AmqpError.ArgumentOutOfRangeError, typeof(ArgumentOutOfRangeException), null }; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs index e8a621c1081d..856d1cfa4896 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs @@ -6,11 +6,13 @@ using System.Linq; using System.Reflection; using Azure.Messaging.EventHubs.Amqp; +using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Moq; using NUnit.Framework; +using NUnit.Framework.Constraints; namespace Azure.Messaging.EventHubs.Tests { @@ -145,13 +147,15 @@ public void TryAddValidatesNotDisposed() /// /// [Test] - [TestCase(true)] - [TestCase(false)] - public void TryAddHonorsTheMeasureSequenceNumber(bool measureSequenceNumber) + [TestCase(TransportProducerFeatures.None)] + [TestCase(TransportProducerFeatures.IdempotentPublishing)] + public void TryAddHonorStatefulFeatures(byte activeFeatures) { var maximumSize = 50; var batchEnvelopeSize = 0; var capturedSequence = default(int?); + var capturedGroupId = default(long?); + var capturedOwnerLevel = default(short?); var options = new CreateBatchOptions { MaximumSizeInBytes = maximumSize }; var mockEnvelope = new Mock(); var mockEvent = new Mock(); @@ -163,6 +167,8 @@ public void TryAddHonorsTheMeasureSequenceNumber(bool measureSequenceNumber) CreateMessageFromEventHandler = (_e, _p) => { capturedSequence = _e.PendingPublishSequenceNumber; + capturedGroupId = _e.PendingProducerGroupId; + capturedOwnerLevel = _e.PendingProducerOwnerLevel; return mockEvent.Object; } }; @@ -175,14 +181,17 @@ public void TryAddHonorsTheMeasureSequenceNumber(bool measureSequenceNumber) .Setup(message => message.SerializedMessageSize) .Returns(maximumSize); - var batch = new AmqpEventBatch(mockConverter, options, measureSequenceNumber); + var batch = new AmqpEventBatch(mockConverter, options, (TransportProducerFeatures)activeFeatures); batch.TryAdd(EventGenerator.CreateEvents(1).Single()); - var expectationConstraint = (measureSequenceNumber) - ? Is.Not.Null - : Is.Null; + NullConstraint generateConstraint() => + ((TransportProducerFeatures)activeFeatures == TransportProducerFeatures.None) + ? Is.Null + : Is.Not.Null; - Assert.That(capturedSequence, expectationConstraint); + Assert.That(capturedSequence, generateConstraint(), "The sequence was not set as expected."); + Assert.That(capturedGroupId, generateConstraint(), "The group identifier was not set as expected."); + Assert.That(capturedOwnerLevel, generateConstraint(), "The owner level was not set as expected."); } /// @@ -191,7 +200,7 @@ public void TryAddHonorsTheMeasureSequenceNumber(bool measureSequenceNumber) /// /// [Test] - public void TryAddRemovesTheMeasureSequenceNumber() + public void TryAddResetsPublishingState() { var maximumSize = 50; var batchEnvelopeSize = 0; @@ -219,10 +228,13 @@ public void TryAddRemovesTheMeasureSequenceNumber() .Setup(message => message.SerializedMessageSize) .Returns(maximumSize); - var batch = new AmqpEventBatch(mockConverter, options, true); + var batch = new AmqpEventBatch(mockConverter, options, TransportProducerFeatures.IdempotentPublishing); batch.TryAdd(EventGenerator.CreateEvents(1).Single()); - Assert.That(capturedEvent.PublishedSequenceNumber, Is.Null); + Assert.That(capturedEvent.PublishedSequenceNumber, Is.Null, "The final sequence should not have been set."); + Assert.That(capturedEvent.PendingPublishSequenceNumber, Is.Null, "The pending sequence was not cleared."); + Assert.That(capturedEvent.PendingProducerGroupId, Is.Null, "The group identifier was not cleared."); + Assert.That(capturedEvent.PendingProducerOwnerLevel, Is.Null, "The owner level was not cleared."); } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs old mode 100755 new mode 100644 index e57dac26034b..a361e50c5ba3 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs @@ -59,6 +59,27 @@ public static IEnumerable StreamPropertyTestCases() yield return new object[] { new BufferedStream(new MemoryStream(contents, false), 512), contents }; } + /// + /// The set of test cases for optional idempotent publishing properties. + /// + /// + public static IEnumerable IdempotentPropertyTestCases() + { + // The values represent the test arguments: + // - Pending Sequence Number (int?) + // - Pending Producer Group Id (long?) + // - Pending Owner Level (short?) + + yield return new object[] { (int?)123, (long?)456, (short?)789 }; + yield return new object[] { null, (long?)456, (short?)789 }; + yield return new object[] { (int?)123, null, (short?)789 }; + yield return new object[] { (int?)123, (long?)456, null }; + yield return new object[] { (int?)123, null, null }; + yield return new object[] { null, (long?)456, null }; + yield return new object[] { null, null, (short?)789 }; + yield return new object[] { null, null, null }; + } + /// /// Verifies functionality of the /// method. @@ -182,6 +203,58 @@ public void CreateMessageFromEventPopulatesSimpleApplicationProperties() } } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCaseSource(nameof(IdempotentPropertyTestCases))] + public void CreateMessageFromEventPopulatesIdempotentAnnotations(int? pendingSequenceNumber, + long? pendingGroupId, + short? pendingOwnerLevel) + { + var eventData = new EventData(new byte[] { 0x11, 0x22, 0x33 }); + eventData.PendingPublishSequenceNumber = pendingSequenceNumber; + eventData.PendingProducerGroupId = pendingGroupId; + eventData.PendingProducerOwnerLevel = pendingOwnerLevel; + + using AmqpMessage message = new AmqpMessageConverter().CreateMessageFromEvent(eventData); + + Assert.That(message, Is.Not.Null, "The AMQP message should have been created."); + Assert.That(message.DataBody, Is.Not.Null, "The AMQP message should a body."); + Assert.That(message.MessageAnnotations, Is.Not.Null, "The AMQP message annotations should be present."); + + // Each annotation should only be present if a value was assigned. + + if (pendingSequenceNumber.HasValue) + { + Assert.That(message.MessageAnnotations.Map[AmqpProperty.PublishedSequenceNumber], Is.EqualTo(eventData.PendingPublishSequenceNumber.Value), "The publishing sequence number should have been set."); + } + else + { + Assert.That(message.MessageAnnotations.Map.Any(item => item.Key.ToString() == AmqpProperty.PublishedSequenceNumber.Value), Is.False, "The publishing sequence number should not have been set."); + } + + if (pendingGroupId.HasValue) + { + Assert.That(message.MessageAnnotations.Map[AmqpProperty.ProducerGroupId], Is.EqualTo(eventData.PendingProducerGroupId.Value), "The producer group should have been set."); + } + else + { + Assert.That(message.MessageAnnotations.Map.Any(item => item.Key.ToString() == AmqpProperty.ProducerGroupId.Value), Is.False, "The producer group should not have been set."); + } + + if (pendingOwnerLevel.HasValue) + { + Assert.That(message.MessageAnnotations.Map[AmqpProperty.ProducerOwnerLevel], Is.EqualTo(eventData.PendingProducerOwnerLevel.Value), "The producer owner level should have been set."); + } + else + { + Assert.That(message.MessageAnnotations.Map.Any(item => item.Key.ToString() == AmqpProperty.ProducerOwnerLevel.Value), Is.False, "The producer owner level should not have been set."); + } + } + /// /// Verifies functionality of the /// method. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs index 1566216b40bd..69dbb63f8ff4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs @@ -12,6 +12,7 @@ using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; +using Microsoft.Identity.Client; using Moq; using Moq.Protected; using NUnit.Framework; @@ -125,15 +126,55 @@ public async Task CreateLinkAndEnsureProducerStateAsyncRespectsPartitionOptions( await producer.Object.ReadInitializationPublishingPropertiesAsync(default); + Func areOptionsEqual = (first, second) => + first.ProducerGroupId == second.ProducerGroupId + && first.OwnerLevel == second.OwnerLevel + && first.StartingSequenceNumber == second.StartingSequenceNumber; + producer .Protected() .Verify>("CreateLinkAndEnsureProducerStateAsync", Times.Once(), ItExpr.IsAny(), - ItExpr.Is(options => object.ReferenceEquals(options, expectedOptions)), + ItExpr.Is(options => areOptionsEqual(options, expectedOptions)), ItExpr.IsAny(), ItExpr.IsAny()); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task CreateLinkAndEnsureProducerStateAsyncClearsTheStartingSequenceNumberAfterInitialization() + { + var expectedOptions = new PartitionPublishingOptions { ProducerGroupId = 1, OwnerLevel = 4, StartingSequenceNumber = 88 }; + var retryPolicy = new BasicRetryPolicy(new EventHubsRetryOptions { TryTimeout = TimeSpan.FromSeconds(17) }); + var mockConnectionScope = new Mock(); + + var producer = new Mock("aHub", null, mockConnectionScope.Object, new AmqpMessageConverter(), retryPolicy, TransportProducerFeatures.IdempotentPublishing, expectedOptions) + { + CallBase = true + }; + + mockConnectionScope + .Setup(scope => scope.OpenProducerLinkAsync( + It.IsAny(), + It.IsAny(), + It.Is(options => options.StartingSequenceNumber == expectedOptions.StartingSequenceNumber), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(new SendingAmqpLink(new AmqpLinkSettings { MaxMessageSize = 512 }))) + .Verifiable(); + + Assert.That(GetPartitionPublishingOptions(producer.Object).StartingSequenceNumber, Is.EqualTo(expectedOptions.StartingSequenceNumber), "The initial options should have the sequence number set."); + + await producer.Object.ReadInitializationPublishingPropertiesAsync(default); + Assert.That(GetPartitionPublishingOptions(producer.Object).StartingSequenceNumber, Is.Null, "After initializing state, the active options should not have a sequence number."); + + mockConnectionScope.VerifyAll(); + } + /// /// Verifies functionality of the /// method. @@ -1690,6 +1731,20 @@ private static CreateBatchOptions GetEventBatchOptions(AmqpEventBatch batch) => .GetProperty("Options", BindingFlags.Instance | BindingFlags.NonPublic) .GetValue(batch); + /// + /// Gets the active set of publishing options for a partition by accessing its private field. + /// + /// + /// The producer to retrieve the options from. + /// + /// The partition publishing options. + /// + private static PartitionPublishingOptions GetPartitionPublishingOptions(AmqpProducer target) => + (PartitionPublishingOptions) + typeof(AmqpProducer) + .GetProperty("ActiveOptions", BindingFlags.Instance | BindingFlags.NonPublic) + .GetValue(target); + /// /// Sets the maximum message size for the given producer, using its /// private accessor. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs index fc900112c0ca..7f88ebf83bb6 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs @@ -275,7 +275,7 @@ private class MockTransportBatch : TransportEventBatch public override long SizeInBytes { get; } = 100; - public override bool ReserveSpaceForSequenceNumber { get; } = true; + public override TransportProducerFeatures ActiveFeatures { get; } = TransportProducerFeatures.None; public override int Count { get; } = 400; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs index db3240f12ba2..dcfc2f92986f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics.Tracing; using System.Linq; using System.Reflection; using System.Threading; @@ -13,7 +12,6 @@ using Azure.Messaging.EventHubs.Authorization; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Producer; -using Microsoft.Azure.Amqp; using Moq; using NUnit.Framework; @@ -2651,7 +2649,7 @@ private class MockTransportBatch : TransportEventBatch public override long MaximumSizeInBytes { get; } public override long SizeInBytes { get; } - public override bool ReserveSpaceForSequenceNumber { get; } + public override TransportProducerFeatures ActiveFeatures { get; } public override int Count => Events.Count; public override IEnumerable AsEnumerable() => (IEnumerable)Events; public override void Clear() => Events.Clear(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs index 0aa222530a52..f0c03aa7775c 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs @@ -382,7 +382,7 @@ private class MockTransportBatch : TransportEventBatch public override long MaximumSizeInBytes { get; } public override long SizeInBytes { get; } public override int Count { get; } - public override bool ReserveSpaceForSequenceNumber { get; } + public override TransportProducerFeatures ActiveFeatures { get; } public override bool TryAdd(EventData eventData) => throw new NotImplementedException(); public override IEnumerable AsEnumerable() => throw new NotImplementedException(); public override void Dispose() => throw new NotImplementedException();