Skip to content

Commit

Permalink
[Event Hubs Client] Idempotent Producer Client
Browse files Browse the repository at this point in the history
The focus of these changes is to enable the `EventDataBatch` to reserve
appropriate space for publishing sequence numbers, when required by the active
producer configuration and to complete the client API surface by implementing
the `ReadPartitionPublishingProperties` method.
  • Loading branch information
jsquire committed Sep 14, 2020
1 parent 5351298 commit 1d2d651
Show file tree
Hide file tree
Showing 17 changed files with 531 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,10 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
public override int GetHashCode() { throw null; }
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Producer.PartitionPublishingProperties> ReadPartitionPublishingPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
Expand All @@ -490,6 +491,12 @@ public PartitionPublishingOptions() { }
public short? OwnerLevel { get { throw null; } set { } }
public long? ProducerGroupId { get { throw null; } set { } }
public int? StartingSequenceNumber { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
public partial class PartitionPublishingProperties
{
Expand All @@ -498,6 +505,12 @@ protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnab
public int? LastPublishedSequenceNumber { get { throw null; } }
public short? OwnerLevel { get { throw null; } }
public long? ProducerGroupId { get { throw null; } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
public partial class SendEventOptions
{
Expand Down
23 changes: 16 additions & 7 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ internal class AmqpEventBatch : TransportEventBatch
public override long SizeInBytes => _sizeBytes;

/// <summary>
/// The publishing sequence number assigned to the first event in the batch at the time
/// the batch was successfully published.
/// A flag that indicates whether space should be reserved for a publishing
/// sequence number when the event size is measured. If <c>false</c>, a sequence
/// number is not used in size calculations.
/// </summary>
///
/// <remarks>
/// The starting published sequence number is only populated and relevant when certain features
/// The sequence number is only populated and relevant when certain features
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public override int? StartingPublishedSequenceNumber { get; set; }
public override bool ReserveSpaceForSequenceNumber { get; }

/// <summary>
/// The count of events contained in the batch.
Expand Down Expand Up @@ -93,9 +94,11 @@ internal class AmqpEventBatch : TransportEventBatch
///
/// <param name="messageConverter">The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.</param>
/// <param name="options">The set of options to apply to the batch.</param>
/// <param name="reserveSpaceForSequenceNumber">A flag that indicates whether space should be reserved for a publishing sequence number when the event size is measured. If <c>false</c>, a sequence number is not used in size calculations.</param>
///
public AmqpEventBatch(AmqpMessageConverter messageConverter,
CreateBatchOptions options)
CreateBatchOptions options,
bool reserveSpaceForSequenceNumber)
{
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
Argument.AssertNotNull(options, nameof(options));
Expand All @@ -104,13 +107,13 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter,
MessageConverter = messageConverter;
Options = options;
MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
ReserveSpaceForSequenceNumber = reserveSpaceForSequenceNumber;

// Initialize the size by reserving space for the batch envelope.

using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.PartitionKey);
ReservedSize = envelope.SerializedMessageSize;
_sizeBytes = ReservedSize;

}

/// <summary>
Expand All @@ -127,7 +130,12 @@ public override bool TryAdd(EventData eventData)
Argument.AssertNotNull(eventData, nameof(eventData));
Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch));

AmqpMessage eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);
if (ReserveSpaceForSequenceNumber)
{
eventData.PendingPublishSequenceNumber = int.MaxValue;
}

var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);

try
{
Expand All @@ -152,6 +160,7 @@ public override bool TryAdd(EventData eventData)
}
finally
{
eventData.PendingPublishSequenceNumber = default;
eventMessage?.Dispose();
}
}
Expand Down
17 changes: 15 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -277,9 +278,9 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
// default to the maximum size allowed by the link.

options.MaximumSizeInBytes ??= MaximumMessageSize;

Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes));
return new AmqpEventBatch(MessageConverter, options);

return new AmqpEventBatch(MessageConverter, options, IsSequenceMeasurementRequired(ActiveFeatures));
}

/// <summary>
Expand Down Expand Up @@ -574,6 +575,18 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
return link;
}

/// <summary>
/// Determines if measuring a sequence number is required to accurately calculate
/// the size of an event.
/// </summary>
///
/// <param name="activeFeatures">The set of features which are active for the producer.</param>
///
/// <returns><c>true</c> if a sequence number should be measured; otherwise, <c>false</c>.</returns>
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool IsSequenceMeasurementRequired(TransportProducerFeatures activeFeatures) => ((activeFeatures & TransportProducerFeatures.IdempotentPublishing) != 0);

/// <summary>
/// Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,17 @@ internal abstract class TransportEventBatch : IDisposable
public abstract long SizeInBytes { get; }

/// <summary>
/// The publishing sequence number assigned to the first event in the batch at the time
/// the batch was successfully published.
/// A flag that indicates whether space should be reserved for a publishing
/// sequence number when the event size is measured. If <c>false</c>, a sequence
/// number is not used in size calculations.
/// </summary>
///
/// <value>
/// The sequence number of the first event in the batch, if the batch was successfully
/// published by a sequence-aware producer. If the producer was not configured to apply
/// sequence numbering or if the batch has not yet been successfully published, this member
/// will be <c>null</c>.
///</value>
///
/// <remarks>
/// The starting published sequence number is only populated and relevant when certain features
/// The sequence number is only populated and relevant when certain features
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public abstract int? StartingPublishedSequenceNumber { get; set; }
public abstract bool ReserveSpaceForSequenceNumber { get; }

/// <summary>
/// The count of events contained in the batch.
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ protected EventData(ReadOnlyMemory<byte> eventBody,
/// Transitions the pending publishing sequence number to the published sequence number.
/// </summary>
///
internal void CommitPublishingSequenceNumber()
internal void CommitPublishingState()
{
PublishedSequenceNumber = PendingPublishSequenceNumber;
PendingPublishSequenceNumber = default;
Expand Down
28 changes: 14 additions & 14 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@ public long? MaximumSizeInBytes
}
}

/// <summary>
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
/// </summary>
///
/// <returns>A new copy of <see cref="CreateBatchOptions" />.</returns>
///
internal CreateBatchOptions Clone() =>
new CreateBatchOptions
{
PartitionId = PartitionId,
PartitionKey = PartitionKey,
_maximumSizeInBytes = MaximumSizeInBytes
};

/// <summary>
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
/// </summary>
Expand Down Expand Up @@ -82,5 +68,19 @@ internal CreateBatchOptions Clone() =>
///
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => base.ToString();

/// <summary>
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
/// </summary>
///
/// <returns>A new copy of <see cref="CreateBatchOptions" />.</returns>
///
internal new CreateBatchOptions Clone() =>
new CreateBatchOptions
{
PartitionId = PartitionId,
PartitionKey = PartitionKey,
_maximumSizeInBytes = _maximumSizeInBytes
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ public sealed class EventDataBatch : IDisposable
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public int? StartingPublishedSequenceNumber
{
get => InnerBatch.StartingPublishedSequenceNumber;
internal set => InnerBatch.StartingPublishedSequenceNumber = value;
}
public int? StartingPublishedSequenceNumber { get; internal set; }

/// <summary>
/// The count of events contained in the batch.
Expand Down
Loading

0 comments on commit 1d2d651

Please sign in to comment.