From 4a462e5927b94e2a03b9c9ba5cd899626a5b3395 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Thu, 25 Jun 2020 15:55:23 -0400 Subject: [PATCH] [Event Hubs Client] Load Balancer Options The focus of these changes is to allow for additional options to control load balancing within the event processor types set the stage for implementing different strategies for load balancing. Also included are surfacing options within the consumer client to allow tuning the prefetch count and batch size, to allow performance tuning. --- .../Azure.Messaging.EventHubs.Processor.sln | 7 ++ ...ging.EventHubs.Processor.netstandard2.0.cs | 3 + ...Azure.Messaging.EventHubs.Processor.csproj | 7 +- .../src/EventProcessorClient.cs | 26 ++--- .../src/EventProcessorClientOptions.cs | 94 +++++++++++++++++++ ...Messaging.EventHubs.Processor.Tests.csproj | 5 + .../tests/Diagnostics/DiagnosticsTests.cs | 2 +- .../EventProcessorClientLiveTests.cs | 2 +- .../EventProcessorClientOptionsTests.cs | 40 ++++++++ .../Processor/EventProcessorClientTests.cs | 23 ++--- .../src/Processor/PartitionLoadBalancer.cs | 2 +- .../MigrationGuide.md | 8 +- ...zure.Messaging.EventHubs.netstandard2.0.cs | 8 ++ .../src/Consumer/EventHubConsumerClient.cs | 26 +++-- .../src/Consumer/ReadEventOptions.cs | 85 ++++++++++++++++- .../src/Primitives/EventProcessorOptions.cs | 32 ++++++- .../src/Processor/LoadBalancingStrategy.cs | 42 +++++++++ .../Consumer/EventHubConsumerClientTests.cs | 32 +++---- .../tests/Consumer/ReadOptionsTests.cs | 53 ++++++++++- .../Primitives/EventProcessorOptionsTests.cs | 23 +++++ 20 files changed, 450 insertions(+), 70 deletions(-) mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Processor/LoadBalancingStrategy.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln index 6d9664bd9835..7f8b923ff0b6 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln @@ -13,6 +13,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.TestFramework", EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{797FF941-76FD-45FD-AC17-A73DFE2BA621}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,12 +37,17 @@ Global {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Debug|Any CPU.Build.0 = Debug|Any CPU {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.ActiveCfg = Release|Any CPU {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.Build.0 = Release|Any CPU + {65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} + {65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs old mode 100755 new mode 100644 index aec628865911..34b87ff51cf4 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs @@ -39,9 +39,12 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt public partial class EventProcessorClientOptions { public EventProcessorClientOptions() { } + public int CacheEventCount { get { throw null; } set { } } public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } } public string Identifier { get { throw null; } set { } } + public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } } public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } } + public int PrefetchCount { get { throw null; } set { } } public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } } public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj index b691f3c920a8..e0d4d93eccc4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj @@ -11,8 +11,13 @@ + - + + + + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs index f4e1cdbad097..bf7dbdd275e2 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs @@ -31,21 +31,21 @@ namespace Azure.Messaging.EventHubs [SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal is managed internally as part of the Stop operation.")] public class EventProcessorClient : EventProcessor { - /// The number of events to request as the maximum size for batches read from a partition. - private const int ReadBatchSize = 15; - /// The delegate to invoke when attempting to update a checkpoint using an empty event. private static readonly Func EmptyEventUpdateCheckpoint = cancellationToken => throw new InvalidOperationException(Resources.CannotCreateCheckpointForEmptyEvent); + /// The set of default options for the processor. + private static readonly EventProcessorClientOptions DefaultClientOptions = new EventProcessorClientOptions(); + + /// The default starting position for the processor. + private readonly EventPosition DefaultStartingPosition = new EventProcessorOptions().DefaultStartingPosition; + /// The set of default starting positions for partitions being processed; these are collected at initialization and are surfaced as checkpoints to override defaults on a partition-specific basis. private readonly ConcurrentDictionary PartitionStartingPositionDefaults = new ConcurrentDictionary(); /// The primitive for synchronizing access during start and set handler operations. private readonly SemaphoreSlim ProcessorStatusGuard = new SemaphoreSlim(1, 1); - /// The active default starting position for the processor. - private readonly EventPosition DefaultStartingPosition = new EventProcessorOptions().DefaultStartingPosition; - /// The handler to be called just before event processing starts for a given partition. private Func _partitionInitializingAsync; @@ -364,7 +364,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore, string consumerGroup, string connectionString, string eventHubName, - EventProcessorClientOptions clientOptions) : base(ReadBatchSize, consumerGroup, connectionString, eventHubName, CreateOptions(clientOptions)) + EventProcessorClientOptions clientOptions) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, connectionString, eventHubName, CreateOptions(clientOptions)) { Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); StorageManager = CreateStorageManager(checkpointStore); @@ -391,7 +391,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore, string fullyQualifiedNamespace, string eventHubName, TokenCredential credential, - EventProcessorClientOptions clientOptions = default) : base(ReadBatchSize, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions)) + EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions)) { Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); StorageManager = CreateStorageManager(checkpointStore); @@ -405,6 +405,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore, /// The name of the consumer group this processor is associated with. Events are read in the context of this group. /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub to associate the processor with. + /// The maximum number of events that will be read from the Event Hubs service and held in a local memory cache when reading is active and events are being emitted to an enumerator for processing. /// An Azure identity credential to satisfy base class requirements; this credential may not be null but will only be used in the case that has not been overridden. /// The set of options to use for this processor. /// @@ -416,8 +417,9 @@ internal EventProcessorClient(StorageManager storageManager, string consumerGroup, string fullyQualifiedNamespace, string eventHubName, + int cacheEventCount, TokenCredential credential, - EventProcessorOptions clientOptions) : base(ReadBatchSize, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, clientOptions) + EventProcessorOptions clientOptions) : base(cacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, clientOptions) { Argument.AssertNotNull(storageManager, nameof(storageManager)); @@ -988,7 +990,7 @@ private void EnsureNotRunningAndInvoke(Action action) /// private static EventProcessorOptions CreateOptions(EventProcessorClientOptions clientOptions) { - clientOptions ??= new EventProcessorClientOptions(); + clientOptions ??= DefaultClientOptions; return new EventProcessorOptions { @@ -996,7 +998,9 @@ private static EventProcessorOptions CreateOptions(EventProcessorClientOptions c RetryOptions = clientOptions.RetryOptions.Clone(), Identifier = clientOptions.Identifier, MaximumWaitTime = clientOptions.MaximumWaitTime, - TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties + TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties, + LoadBalancingStrategy = clientOptions.LoadBalancingStrategy, + PrefetchCount = clientOptions.PrefetchCount }; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs old mode 100755 new mode 100644 index 21dba38e35e6..3fcf22e7d72b --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs @@ -19,6 +19,12 @@ public class EventProcessorClientOptions /// The maximum amount of time to wait for an event to become available before emitting an null value. private TimeSpan? _maximumWaitTime = null; + /// The event catch count to use when reading events. + private int _cacheEventCount = 100; + + /// The prefetch count to use when reading events. + private int _prefetchCount = 300; + /// The set of options to use for configuring the connection to the Event Hubs service. private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions(); @@ -48,6 +54,16 @@ public class EventProcessorClientOptions /// public bool TrackLastEnqueuedEventProperties { get; set; } = true; + /// + /// The strategy that an event processor will use to make decisions about + /// partition ownership when performing load balancing to share work with + /// other event processors. + /// + /// + /// + /// + public LoadBalancingStrategy LoadBalancingStrategy { get; set; } = LoadBalancingStrategy.Balanced; + /// /// The maximum amount of time to wait for an event to become available for a given partition before emitting /// an empty event. @@ -81,6 +97,81 @@ public TimeSpan? MaximumWaitTime } } + /// + /// The maximum number of events that will be read from the Event Hubs service and held in a local memory + /// cache when reading is active and events are being emitted to an enumerator for processing. + /// + /// + /// + /// The is a control that developers can use to help tune performance for the specific + /// needs of an application, given its expected size of events, throughput needs, and expected scenarios for using + /// Event Hubs. + /// + /// + /// + /// The size of this cache has an influence on the efficiency of reading events from the Event Hubs service. The + /// larger the size of the cache, the more efficiently service operations can be buffered in the background to + /// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O. + /// + /// For scenarios where the size of events is small and many events are flowing through the system, using a larger + /// and may help improve throughput. For scenarios where + /// the size of events is larger or when processing of events is expected to be a heavier and slower operation, using + /// a smaller size and may help manage resource use without + /// incurring a non-trivial cost to throughput. + /// + /// Regardless of the values, it is generally recommended that the be at least 2-3 + /// times as large as the to allow for efficient buffering of service operations. + /// + /// + public int CacheEventCount + { + get => _cacheEventCount; + + set + { + Argument.AssertAtLeast(value, 1, nameof(CacheEventCount)); + _cacheEventCount = value; + } + } + + /// + /// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to + /// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than + /// readers needing to wait for service operations to complete. + /// + /// + /// + /// The is a control that developers can use to help tune performance for the specific + /// needs of an application, given its expected size of events, throughput needs, and expected scenarios for using + /// Event Hubs. + /// + /// + /// + /// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The + /// larger the size of the cache, the more efficiently service operations can be buffered in the background to + /// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O. + /// + /// For scenarios where the size of events is small and many events are flowing through the system, using a larger + /// and may help improve throughput. For scenarios where + /// the size of events is larger or when processing of events is expected to be a heavier and slower operation, using + /// a smaller size and may help manage resource use without + /// incurring a non-trivial cost to throughput. + /// + /// Regardless of the values, it is generally recommended that the be at least 2-3 + /// times as large as the to allow for efficient buffering of service operations. + /// + /// + public int PrefetchCount + { + get => _prefetchCount; + + set + { + Argument.AssertAtLeast(value, 0, nameof(PrefetchCount)); + _prefetchCount = value; + } + } + /// /// Gets or sets the options used for configuring the connection to the Event Hubs service. /// @@ -151,7 +242,10 @@ internal EventProcessorClientOptions Clone() => { Identifier = Identifier, TrackLastEnqueuedEventProperties = TrackLastEnqueuedEventProperties, + LoadBalancingStrategy = LoadBalancingStrategy, _maximumWaitTime = _maximumWaitTime, + _cacheEventCount = _cacheEventCount, + _prefetchCount = _prefetchCount, _connectionOptions = ConnectionOptions.Clone(), _retryOptions = RetryOptions.Clone() }; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj old mode 100755 new mode 100644 index 2d4629bb7524..5a853a4d5ebc --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj @@ -30,6 +30,11 @@ + + + + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs old mode 100755 new mode 100644 index 76b905fbdb7b..f0574bd250fc --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs @@ -49,7 +49,7 @@ public async Task UpdateCheckpointAsyncCreatesScope() var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var mockContext = new Mock("65"); var mockLogger = new Mock(); - var mockProcessor = new Mock(Mock.Of(), "cg", "host", "hub", Mock.Of(), null) { CallBase = true }; + var mockProcessor = new Mock(Mock.Of(), "cg", "host", "hub", 50, Mock.Of(), null) { CallBase = true }; mockProcessor .Protected() diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs old mode 100755 new mode 100644 index 669d75335cf9..eea8e8f69864 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs @@ -579,7 +579,7 @@ internal TestEventProcessorClient(StorageManager storageManager, string eventHubName, TokenCredential credential, Func connectionFactory, - EventProcessorOptions options) : base(storageManager, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + EventProcessorOptions options) : base(storageManager, consumerGroup, fullyQualifiedNamespace, eventHubName, 100, credential, options) { InjectedConnectionFactory = connectionFactory; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs old mode 100755 new mode 100644 index 1ac245bc0563..8d501432ed73 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs @@ -3,6 +3,7 @@ using System; using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Processor; using NUnit.Framework; namespace Azure.Messaging.EventHubs.Tests @@ -27,7 +28,10 @@ public void CloneProducesACopy() { Identifier = Guid.NewGuid().ToString(), TrackLastEnqueuedEventProperties = false, + LoadBalancingStrategy = LoadBalancingStrategy.Greedy, MaximumWaitTime = TimeSpan.FromMinutes(65), + CacheEventCount = 1, + PrefetchCount = 0, RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(1), Delay = TimeSpan.FromMinutes(4) }, ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets } }; @@ -38,7 +42,10 @@ public void CloneProducesACopy() Assert.That(clone.Identifier, Is.EqualTo(options.Identifier), "The identifier of the clone should match."); Assert.That(clone.TrackLastEnqueuedEventProperties, Is.EqualTo(options.TrackLastEnqueuedEventProperties), "The tracking of last event information of the clone should match."); + Assert.That(clone.LoadBalancingStrategy, Is.EqualTo(options.LoadBalancingStrategy), "The load balancing strategy of the clone should match."); Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time of the clone should match."); + Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache size of the clone should match."); + Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match."); Assert.That(clone.ConnectionOptions.TransportType, Is.EqualTo(options.ConnectionOptions.TransportType), "The connection options of the clone should copy properties."); Assert.That(clone.ConnectionOptions, Is.Not.SameAs(options.ConnectionOptions), "The connection options of the clone should be a copy, not the same instance."); Assert.That(clone.RetryOptions.IsEquivalentTo(options.RetryOptions), Is.True, "The retry options of the clone should be considered equal."); @@ -73,6 +80,39 @@ public void MaximumWaitTimeAllowsNull() Assert.That(() => options.MaximumWaitTime = null, Throws.Nothing); } + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void CacheEventCountIsValidated() + { + Assert.That(() => new EventProcessorClientOptions { CacheEventCount = 0 }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchCountIsValidated() + { + Assert.That(() => new EventProcessorClientOptions { PrefetchCount = -1 }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchCountAllowsZero() + { + Assert.That(() => new EventProcessorClientOptions { PrefetchCount = 0 }, Throws.Nothing); + } + /// /// Verifies functionality of the /// property. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs old mode 100755 new mode 100644 index a2c024e70f0a..30f814ddbd64 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs @@ -165,7 +165,7 @@ void assertOptionsMatch(EventProcessorOptions expected, description = "{{ internal testing constructor }}"; expectedOptions = new EventProcessorOptions(); - processorClient = new EventProcessorClient(Mock.Of(), "consumerGroup", "namespace", "theHub", Mock.Of(), expectedOptions); + processorClient = new EventProcessorClient(Mock.Of(), "consumerGroup", "namespace", "theHub", 100, Mock.Of(), expectedOptions); actualOptions = GetBaseOptions(processorClient); assertOptionsMatch(expectedOptions, actualOptions, description); } @@ -1406,7 +1406,9 @@ public void ClientOptionsCanBeTranslated() RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 }, Identifier = "OMG, HAI!", MaximumWaitTime = TimeSpan.FromDays(54), - TrackLastEnqueuedEventProperties = true + TrackLastEnqueuedEventProperties = true, + LoadBalancingStrategy = LoadBalancingStrategy.Greedy, + PrefetchCount = 9990 }; var defaultOptions = new EventProcessorOptions(); @@ -1419,24 +1421,15 @@ public void ClientOptionsCanBeTranslated() Assert.That(processorOptions.RetryOptions.MaximumRetries, Is.EqualTo(clientOptions.RetryOptions.MaximumRetries), "The retry options should have been set."); Assert.That(processorOptions.Identifier, Is.EqualTo(clientOptions.Identifier), "The identifier should have been set."); Assert.That(processorOptions.MaximumWaitTime, Is.EqualTo(clientOptions.MaximumWaitTime), "The maximum wait time should have been set."); - Assert.That(processorOptions.TrackLastEnqueuedEventProperties, Is.EqualTo(clientOptions.TrackLastEnqueuedEventProperties), "The flack for last event tracking should have been set."); + Assert.That(processorOptions.TrackLastEnqueuedEventProperties, Is.EqualTo(clientOptions.TrackLastEnqueuedEventProperties), "The flag for last event tracking should have been set."); + Assert.That(processorOptions.LoadBalancingStrategy, Is.EqualTo(clientOptions.LoadBalancingStrategy), "The load balancing strategy should have been set."); + Assert.That(processorOptions.PrefetchCount, Is.EqualTo(clientOptions.PrefetchCount), "The prefetch count should have been set."); Assert.That(processorOptions.DefaultStartingPosition, Is.EqualTo(defaultOptions.DefaultStartingPosition), "The default starting position should not have been set."); Assert.That(processorOptions.LoadBalancingUpdateInterval, Is.EqualTo(defaultOptions.LoadBalancingUpdateInterval), "The load balancing interval should not have been set."); Assert.That(processorOptions.PartitionOwnershipExpirationInterval, Is.EqualTo(defaultOptions.PartitionOwnershipExpirationInterval), "The partition ownership interval should not have been set."); - Assert.That(processorOptions.PrefetchCount, Is.EqualTo(defaultOptions.PrefetchCount), "The prefetch count should not have been set."); } - /// - /// Converts an Event Hubs connection into a factory function, returning the . - /// - /// - /// The connection to return from the factory. - /// - /// A factory function, returning the . - /// - private static Func ToConnectionFactory(EventHubConnection connection) => () => connection; - /// /// Retrieves the StorageManager for the processor client using its private accessor. /// @@ -1500,7 +1493,7 @@ internal TestEventProcessorClient(StorageManager storageManager, string eventHubName, TokenCredential credential, EventHubConnection connection, - EventProcessorOptions options) : base(storageManager, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + EventProcessorOptions options) : base(storageManager, consumerGroup, fullyQualifiedNamespace, eventHubName, 100, credential, options) { InjectedConnection = connection; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs old mode 100755 new mode 100644 index 3e325df78298..049ee1e12ed3 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs @@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Primitives { /// - /// Handles all load balancing concerns for an EventProcessorClient including claiming, stealing, and relinquishing ownership. + /// Handles all load balancing concerns for an event processor including claiming, stealing, and relinquishing ownership. /// /// internal class PartitionLoadBalancer diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md b/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md index d4d9221ef5c1..62be1ccf95a9 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md @@ -59,17 +59,17 @@ In order to allow for a single focus and clear responsibility, the core function - The [EventProcessorClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventprocessorclient?view=azure-dotnet) is responsible for reading and processing events for all partitions of an Event Hub. It will collaborate with other instances for the same Event Hub and consumer group pairing to balance work between them. A high degree of fault tolerance is built-in, allowing the processor to be resilient in the face of errors. The `EventProcessorClient` can be found in the new [Azure.Messaging.EventHubs.Processor](https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor/) package which replaces the older [Microsoft.Azure.EventHubs.Processor](https://www.nuget.org/packages/Microsoft.Azure.EventHubs.Processor/) package. - One of the key features of the `EventProcessorClient` is enabling tracking of which events have been processed by interacting with a durable storage provider. This process is commonly referred to as [checkpointing](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#checkpointing) and the persisted state as a checkpoint. This version of the `EventProcessorClient` only supports Azure Storage Blobs. + One of the key features of the `EventProcessorClient` is enabling tracking of which events have been processed by interacting with a durable storage provider. This process is commonly referred to as [checkpointing](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#checkpointing) and the persisted state as a checkpoint. This version of the `EventProcessorClient` supports only Azure Storage Blobs as a backing store. **_Important note on checkpoints:_** The [EventProcessorClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventprocessorclient?view=azure-dotnet) does not support legacy checkpoint data created using the v4 `EventProcessorHost`. In order to allow for a unified format for checkpoint data across languages, a more efficient approach to data storage, and improvements to the algorithm used for managing partition ownership, breaking changes were necessary. An approach for migrating the v4 `EventProcessorHost` checkpoints can be found in the [migration samples](#migrating-event-processor-checkpoints) below. #### Specialized -- The [PartitionReceiver](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.primitives.partitionreceiver?view=azure-dotnet) is responsible for reading events from a specific partition of an Event Hub, with a greater level of control over communication with the Event Hubs service than is offered by other event consumers. More detail on the design and philosophy for the `PartitionReceiver` can be found in the [design document](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/design/partition-receiver-proposal.md). +- The [PartitionReceiver](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.primitives.partitionreceiver?view=azure-dotnet) is responsible for reading events from a specific partition of an Event Hub, with a greater level of control over communication with the Event Hubs service than is offered by other event consumers. More detail on the design and philosophy for the `PartitionReceiver` can be found in its [design document](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/design/partition-receiver-proposal.md). -- The [EventProcessor](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.primitives.eventprocessor-1?view=azure-dotnet) provides a base for creating a custom processor for for reading and processing events for all partitions of an Event Hub. The `EventProcessor` fills a similar role as the EventProcessorClient, with cooperative load balancing and resiliency as its core features. However, it also offers native batch processing, the ability to customize checkpoint storage, a greater level of control over communication with the Event Hubs service, and a less opinionated API. The caveat is that this comes with additional complexity and exists as of an abstract base, which needs to be extended and the core “handler” activities implemented via override. +- The [EventProcessor](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.primitives.eventprocessor-1?view=azure-dotnet) provides a base for creating a custom processor for for reading and processing events for all partitions of an Event Hub. The `EventProcessor` fills a similar role as the EventProcessorClient, with cooperative load balancing and resiliency as its core features. However, it also offers native batch processing, the ability to customize checkpoint storage, a greater level of control over communication with the Event Hubs service, and a less opinionated API. The caveat is that this comes with additional complexity and exists as of an abstract base, which needs to be extended and the core "handler" activities implemented via override. - Generally speaking, the `EventProcessorClient` was designed to provide a familiar API to that of the `EventHubConsumerClient` and offer an intuitive "step-up" experience for developers exploring Event Hubs as they advance to production scenarios. For a large portion of our library users, that covers their needs well. There's definitely a point, however, where an application requires more control to handle higher throughput or unique needs - that's where the `EventProcessor` is intended to help. More on the design and philosophy behind this can be found [design document](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/design/event-processor%7BT%7D-proposal.md). + Generally speaking, the `EventProcessorClient` was designed to provide a familiar API to that of the `EventHubConsumerClient` and offer an intuitive "step-up" experience for developers exploring Event Hubs as they advance to production scenarios. For a large portion of our library users, that covers their needs well. There's definitely a point, however, where an application requires more control to handle higher throughput or unique needs - that's where the `EventProcessor` is intended to help. More on the design and philosophy behind this type can be found in its [design document](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/design/event-processor%7BT%7D-proposal.md). ### Client constructors diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs old mode 100755 new mode 100644 index d95500eda1dc..cc7314f3b01f --- a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs @@ -230,8 +230,10 @@ public partial struct PartitionEvent public partial class ReadEventOptions { public ReadEventOptions() { } + public int CacheEventCount { get { throw null; } set { } } public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } } public long? OwnerLevel { get { throw null; } set { } } + public int PrefetchCount { get { throw null; } set { } } public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } @@ -258,6 +260,7 @@ public EventProcessorOptions() { } public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } } public Azure.Messaging.EventHubs.Consumer.EventPosition DefaultStartingPosition { get { throw null; } set { } } public string Identifier { get { throw null; } set { } } + public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } } public System.TimeSpan LoadBalancingUpdateInterval { get { throw null; } set { } } public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } } public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } } @@ -364,6 +367,11 @@ public PartitionReceiverOptions() { } } namespace Azure.Messaging.EventHubs.Processor { + public enum LoadBalancingStrategy + { + Balanced = 0, + Greedy = 1, + } public partial class PartitionClosingEventArgs { public PartitionClosingEventArgs(string partitionId, Azure.Messaging.EventHubs.Processor.ProcessingStoppedReason reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs old mode 100755 new mode 100644 index 60a1b6e9cac2..ded33a6b593b --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs @@ -37,9 +37,6 @@ public class EventHubConsumerClient : IAsyncDisposable /// The name of the default consumer group in the Event Hubs service. public const string DefaultConsumerGroupName = "$Default"; - /// The size of event batch requested by the background publishing operation used for subscriptions. - private const int BackgroundPublishReceiveBatchSize = 50; - /// The maximum wait time for receiving an event batch for the background publishing operation used for subscriptions. private readonly TimeSpan BackgroundPublishingWaitTime = TimeSpan.FromMilliseconds(250); @@ -391,8 +388,10 @@ public virtual async IAsyncEnumerable ReadEventsFromPartitionAsy try { - eventChannel = CreateEventChannel((BackgroundPublishReceiveBatchSize * 4)); - cancelPublishingAsync = await PublishPartitionEventsToChannelAsync(partitionId, startingPosition, options.TrackLastEnqueuedEventProperties, options.OwnerLevel, eventChannel, cancellationSource).ConfigureAwait(false); + var channelSize = options.CacheEventCount * 4L; + + eventChannel = CreateEventChannel((int)Math.Min(channelSize, int.MaxValue)); + cancelPublishingAsync = await PublishPartitionEventsToChannelAsync(partitionId, startingPosition, options.TrackLastEnqueuedEventProperties, options.CacheEventCount, options.OwnerLevel, (uint)options.PrefetchCount, eventChannel, cancellationSource).ConfigureAwait(false); } catch (Exception ex) { @@ -538,7 +537,9 @@ public virtual async IAsyncEnumerable ReadEventsAsync(bool start // Determine the partitions for the Event Hub and create the shared channel. var partitions = await GetPartitionIdsAsync(cancellationToken).ConfigureAwait(false); - eventChannel = CreateEventChannel((BackgroundPublishReceiveBatchSize * partitions.Length * 2)); + + var channelSize = options.CacheEventCount * partitions.Length * 2L; + eventChannel = CreateEventChannel((int)Math.Min(channelSize, int.MaxValue)); // Start publishing for all partitions. @@ -546,7 +547,7 @@ public virtual async IAsyncEnumerable ReadEventsAsync(bool start for (var index = 0; index < partitions.Length; ++index) { - publishingTasks[index] = PublishPartitionEventsToChannelAsync(partitions[index], startingPosition, options.TrackLastEnqueuedEventProperties, options.OwnerLevel, eventChannel, cancellationSource); + publishingTasks[index] = PublishPartitionEventsToChannelAsync(partitions[index], startingPosition, options.TrackLastEnqueuedEventProperties, options.CacheEventCount, options.OwnerLevel, (uint)options.PrefetchCount, eventChannel, cancellationSource); } // Capture the callbacks to cancel publishing for all events. @@ -712,7 +713,9 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau /// The identifier of the partition from which events should be read. /// The position within the partition's event stream that reading should begin from. /// Indicates whether information on the last enqueued event on the partition is sent as events are received. + /// The batch size to use when receiving events. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. + /// The count of events requested eagerly and queued without regard to whether a read was requested. /// The channel to which events should be published. /// A cancellation source which can be used for signaling publication to stop. /// @@ -726,7 +729,9 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau private async Task> PublishPartitionEventsToChannelAsync(string partitionId, EventPosition startingPosition, bool trackLastEnqueuedEventProperties, + int receiveBatchSize, long? ownerLevel, + uint prefetchCount, Channel channel, CancellationTokenSource publishingCancellationSource) { @@ -788,7 +793,7 @@ async Task performCleanup() try { - transportConsumer = Connection.CreateTransportConsumer(ConsumerGroup, partitionId, startingPosition, RetryPolicy, trackLastEnqueuedEventProperties, ownerLevel); + transportConsumer = Connection.CreateTransportConsumer(ConsumerGroup, partitionId, startingPosition, RetryPolicy, trackLastEnqueuedEventProperties, ownerLevel, prefetchCount); if (!ActiveConsumers.TryAdd(publisherId, transportConsumer)) { @@ -815,6 +820,7 @@ void exceptionCallback(Exception ex) transportConsumer, channel, new PartitionContext(partitionId, transportConsumer), + receiveBatchSize, exceptionCallback, publishingCancellationSource.Token ); @@ -838,6 +844,7 @@ void exceptionCallback(Exception ex) /// The consumer to use for receiving events. /// The channel to which received events should be published. /// The context that represents the partition from which events being received. + /// The batch size to use when receiving events. /// An action to be invoked when an exception is encountered during publishing. /// The to signal the request to cancel the background publishing. /// @@ -846,6 +853,7 @@ void exceptionCallback(Exception ex) private Task StartBackgroundChannelPublishingAsync(TransportConsumer transportConsumer, Channel channel, PartitionContext partitionContext, + int receiveBatchSize, Action notifyException, CancellationToken cancellationToken) => Task.Run(async () => @@ -865,7 +873,7 @@ private Task StartBackgroundChannelPublishingAsync(TransportConsumer transportCo if (receivedItems == default) { - receivedItems = await transportConsumer.ReceiveAsync(BackgroundPublishReceiveBatchSize, BackgroundPublishingWaitTime, cancellationToken).ConfigureAwait(false); + receivedItems = await transportConsumer.ReceiveAsync(receiveBatchSize, BackgroundPublishingWaitTime, cancellationToken).ConfigureAwait(false); } foreach (EventData item in receivedItems) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs old mode 100755 new mode 100644 index f096e350b7a0..b1976610177c --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs @@ -17,6 +17,12 @@ public class ReadEventOptions /// The maximum amount of time to wait to for an event to be available before emitting an empty item; if null, empty items will not be emitted. private TimeSpan? _maximumWaitTime = null; + /// The event catch count to use when reading events. + private int _cacheEventCount = 100; + + /// The prefetch count to use when reading events. + private int _prefetchCount = 300; + /// /// When populated, the owner level indicates that a reading is intended to be performed exclusively for events in the /// requested partition and for the associated consumer group. To do so, reading will attempt to assert ownership @@ -84,6 +90,81 @@ public TimeSpan? MaximumWaitTime } } + /// + /// The maximum number of events that will be read from the Event Hubs service and held in a local memory + /// cache when reading is active and events are being emitted to an enumerator for processing. + /// + /// + /// + /// The is a control that developers can use to help tune performance for the specific + /// needs of an application, given its expected size of events, throughput needs, and expected scenarios for using + /// Event Hubs. + /// + /// + /// + /// The size of this cache has an influence on the efficiency of reading events from the Event Hubs service. The + /// larger the size of the cache, the more efficiently service operations can be buffered in the background to + /// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O. + /// + /// For scenarios where the size of events is small and many events are flowing through the system, using a larger + /// and may help improve throughput. For scenarios where + /// the size of events is larger or when processing of events is expected to be a heavier and slower operation, using + /// a smaller size and may help manage resource use without + /// incurring a non-trivial cost to throughput. + /// + /// Regardless of the values, it is generally recommended that the be at least 2-3 + /// times as large as the to allow for efficient buffering of service operations. + /// + /// + public int CacheEventCount + { + get => _cacheEventCount; + + set + { + Argument.AssertAtLeast(value, 1, nameof(CacheEventCount)); + _cacheEventCount = value; + } + } + + /// + /// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to + /// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than + /// readers needing to wait for service operations to complete. + /// + /// + /// + /// The is a control that developers can use to help tune performance for the specific + /// needs of an application, given its expected size of events, throughput needs, and expected scenarios for using + /// Event Hubs. + /// + /// + /// + /// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The + /// larger the size of the cache, the more efficiently service operations can be buffered in the background to + /// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O. + /// + /// For scenarios where the size of events is small and many events are flowing through the system, using a larger + /// and may help improve throughput. For scenarios where + /// the size of events is larger or when processing of events is expected to be a heavier and slower operation, using + /// a smaller size and may help manage resource use without + /// incurring a non-trivial cost to throughput. + /// + /// Regardless of the values, it is generally recommended that the be at least 2-3 + /// times as large as the to allow for efficient buffering of service operations. + /// + /// + public int PrefetchCount + { + get => _prefetchCount; + + set + { + Argument.AssertAtLeast(value, 0, nameof(PrefetchCount)); + _prefetchCount = value; + } + } + /// /// Determines whether the specified is equal to this instance. /// @@ -124,7 +205,9 @@ internal ReadEventOptions Clone() => { OwnerLevel = OwnerLevel, TrackLastEnqueuedEventProperties = TrackLastEnqueuedEventProperties, - MaximumWaitTime = MaximumWaitTime + _maximumWaitTime = _maximumWaitTime, + _cacheEventCount = _cacheEventCount, + _prefetchCount = _prefetchCount, }; } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs old mode 100755 new mode 100644 index 749d28c79ea2..429e17aa7cbc --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs @@ -6,6 +6,7 @@ using Azure.Core; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Processor; namespace Azure.Messaging.EventHubs.Primitives { @@ -93,9 +94,9 @@ public TimeSpan? MaximumWaitTime } /// - /// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to - /// whether a processing is currently active, intended to help maximize throughput by allowing the event processor to read - /// from a local cache rather than waiting on a service request. + /// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to + /// whether the processor is currently active, intended to help maximize throughput by buffering service operations rather than + /// readers needing to wait for service operations to complete. /// /// /// @@ -104,6 +105,21 @@ public TimeSpan? MaximumWaitTime /// Event Hubs. /// /// + /// + /// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The + /// larger the size of the cache, the more efficiently service operations can be buffered in the background to + /// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O. + /// + /// For scenarios where the size of events is small and many events are flowing through the system, requesting more + /// events in a batch and using a higher may help improve throughput. For scenarios where + /// the size of events is larger or when processing of events is expected to be a heavier and slower operation, requesting + /// fewer events in a batch and using a smaller may help manage resource use without + /// incurring a non-trivial cost to throughput. + /// + /// Regardless of the values, it is generally recommended that the be at least 2-3 + /// times as large as the number of events in a batch to allow for efficient buffering of service operations. + /// + /// public int PrefetchCount { get => _prefetchCount; @@ -191,6 +207,16 @@ public TimeSpan PartitionOwnershipExpirationInterval /// public EventPosition DefaultStartingPosition { get; set; } = EventPosition.Earliest; + /// + /// The strategy that an event processor will use to make decisions about + /// partition ownership when performing load balancing to share work with + /// other event processors. + /// + /// + /// + /// + public LoadBalancingStrategy LoadBalancingStrategy { get; set; } = LoadBalancingStrategy.Balanced; + /// /// Determines whether the specified is equal to this instance. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Processor/LoadBalancingStrategy.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Processor/LoadBalancingStrategy.cs new file mode 100644 index 000000000000..bf0686699fbf --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Processor/LoadBalancingStrategy.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Messaging.EventHubs.Processor +{ + /// + /// The strategy that an event processor will use to make decisions about + /// partition ownership when performing load balancing to share work with + /// other event processors. + /// + /// + public enum LoadBalancingStrategy + { + /// + /// An event processor will take a measured approach to requesting + /// partition ownership when balancing work with other processors, slowly + /// claiming partitions until a stabilized distribution is achieved. + /// + /// When using this strategy, it may take longer for all partitions of + /// an Event Hub to be owned by a processor when processing first starts, the + /// number of active processors changes, or when partitions are scaled. The + /// Balanced strategy will reduce contention for a partition, ensuring that once + /// it is claimed, processing will be more likely to be steady and consistent. + /// + /// + Balanced, + + /// + /// An event processor will attempt to claim ownership of its fair share of + /// partitions aggressively when balancing work with other processors. + /// + /// When using this strategy, all partitions of an Event Hub will be claimed + /// quickly when processing first starts, the number of active processors changes, or + /// when partitions are scaled. The Greedy strategy is likely to cause competition for + /// ownership of a given partition, causing it to see sporadic processing and some amount of + /// duplicate events until balance has been reached and work is distributed equally among the + /// active processors. + /// + /// + Greedy + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs old mode 100755 new mode 100644 index d3db19f096fb..90fa89430f83 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs @@ -884,20 +884,22 @@ public async Task ReadEventsFromPartitionAsyncPublishesEventsWithMultipleIterato [Test] public async Task ReadEventsFromPartitionAsyncPublishesEventsWithOneIteratorAndMultipleBatches() { + var batchSize = 100; var events = new List(); var transportConsumer = new PublishingTransportConsumerMock(events); var mockConnection = new MockConnection(() => transportConsumer); var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, mockConnection); + var readOptions = new ReadEventOptions { CacheEventCount = batchSize }; var receivedEvents = new List(); events.AddRange( - Enumerable.Range(0, (GetBackgroundPublishReceiveBatchSize(consumer) * 3)) + Enumerable.Range(0, (batchSize * 3)) .Select(index => new EventData(Encoding.UTF8.GetBytes($"Event Number { index }"))) ); using var cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(60)); - await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync("0", EventPosition.FromSequenceNumber(123), cancellation.Token)) + await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync("0", EventPosition.FromSequenceNumber(123), readOptions, cancellation.Token)) { receivedEvents.Add(partitionEvent.Data); @@ -919,19 +921,20 @@ public async Task ReadEventsFromPartitionAsyncPublishesEventsWithOneIteratorAndM [Test] public async Task ReadEventsFromPartitionAsyncPublishesEventsWithMultipleIteratorsAndMultipleBatches() { - var options = new ReadEventOptions { MaximumWaitTime = TimeSpan.FromMilliseconds(5) }; var events = new List(); var partition = "0"; + var batchSize = 50; var position = EventPosition.FromSequenceNumber(453); var mockConnection = new MockConnection(() => new PublishingTransportConsumerMock(events)); var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, mockConnection); + var options = new ReadEventOptions { MaximumWaitTime = TimeSpan.FromMilliseconds(5), CacheEventCount = batchSize }; var firstSubscriberEvents = new List(); var secondSubscriberEvents = new List(); var firstCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var secondCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); events.AddRange( - Enumerable.Range(0, (GetBackgroundPublishReceiveBatchSize(consumer) * 3)) + Enumerable.Range(0, (batchSize * 3)) .Select(index => new EventData(Encoding.UTF8.GetBytes($"Event Number { index }"))) ); @@ -2105,7 +2108,7 @@ public async Task StartBackgroundChannelPublishingAsyncToleratesRetriableExcepti }); using var cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(15)); - var publishingTask = InvokeStartBackgroundChannelPublishingAsync(consumer, mockTransportConsumer, mockChannel, partitionContext, ex => capturedException = ex, cancellation.Token); + var publishingTask = InvokeStartBackgroundChannelPublishingAsync(consumer, mockTransportConsumer, mockChannel, partitionContext, 50, ex => capturedException = ex, cancellation.Token); // Allow publishing to continue until the timeout is reached or until the right number of events was // written. If publishing sends duplicate events, there will be a mismatch when comparing the event @@ -2179,7 +2182,7 @@ public async Task StartBackgroundChannelPublishingAsyncStopsForNonRetriableExcep }); using var cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(2)); - var publishingTask = InvokeStartBackgroundChannelPublishingAsync(consumer, mockTransportConsumer, mockChannel, partitionContext, ex => capturedException = ex, cancellation.Token); + var publishingTask = InvokeStartBackgroundChannelPublishingAsync(consumer, mockTransportConsumer, mockChannel, partitionContext, 100, ex => capturedException = ex, cancellation.Token); // Allow publishing to continue until the timeout is reached or until the right number of events was // written. If publishing sends duplicate events, there will be a mismatch when comparing the event @@ -2230,26 +2233,13 @@ private static Task InvokeStartBackgroundChannelPublishingAsync(EventHubConsumer TransportConsumer transportConsumer, Channel channel, PartitionContext partitionContext, + int receiveBatchSize, Action notifyException, CancellationToken cancellationToken) => (Task) typeof(EventHubConsumerClient) .GetMethod("StartBackgroundChannelPublishingAsync", BindingFlags.Instance | BindingFlags.NonPublic) - .Invoke(consumer, new object[] { transportConsumer, channel, partitionContext, notifyException, cancellationToken }); - - /// - /// Retrieves the number of background publish event batch size for a consumer, using its private field. - /// - /// - /// The consumer to retrieve the channels for. - /// - /// The size of the batch that is received when publishing events in the background. - /// - private int GetBackgroundPublishReceiveBatchSize(EventHubConsumerClient consumer) => - (int) - typeof(EventHubConsumerClient) - .GetField("BackgroundPublishReceiveBatchSize", BindingFlags.Static | BindingFlags.NonPublic) - .GetValue(consumer); + .Invoke(consumer, new object[] { transportConsumer, channel, partitionContext, receiveBatchSize, notifyException, cancellationToken }); /// /// Allows for observation of operations performed by the consumer for testing purposes. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs old mode 100755 new mode 100644 index f4ee582cb8f9..3cb9a118ba92 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs @@ -27,7 +27,9 @@ public void CloneProducesACopy() { OwnerLevel = 99, TrackLastEnqueuedEventProperties = false, - MaximumWaitTime = TimeSpan.FromMinutes(65) + MaximumWaitTime = TimeSpan.FromMinutes(65), + CacheEventCount = 1, + PrefetchCount = 0 }; ReadEventOptions clone = options.Clone(); @@ -35,7 +37,9 @@ public void CloneProducesACopy() Assert.That(clone.OwnerLevel, Is.EqualTo(options.OwnerLevel), "The owner level of the clone should match."); Assert.That(clone.TrackLastEnqueuedEventProperties, Is.EqualTo(options.TrackLastEnqueuedEventProperties), "The tracking of last event information of the clone should match."); - Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The default maximum wait time of the clone should match."); + Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time of the clone should match."); + Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache count of the clone should match."); + Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match."); } /// @@ -48,5 +52,50 @@ public void MaximumWaitTimeIsValidated() { Assert.That(() => new ReadEventOptions { MaximumWaitTime = TimeSpan.FromMilliseconds(-1) }, Throws.InstanceOf()); } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void MaximumWaitTimeAllowsNull() + { + var options = new ReadEventOptions(); + Assert.That(() => options.MaximumWaitTime = null, Throws.Nothing); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void CacheEventCountIsValidated() + { + Assert.That(() => new ReadEventOptions { CacheEventCount = 0 }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchCountIsValidated() + { + Assert.That(() => new ReadEventOptions { PrefetchCount = -1 }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchCountAllowsZero() + { + Assert.That(() => new ReadEventOptions { PrefetchCount = 0 }, Throws.Nothing); + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs old mode 100755 new mode 100644 index 1d4757d15af5..fb29bc48ffd5 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs @@ -91,6 +91,18 @@ public void MaximumWaitTimeIsValidated(int waitTimeSeconds) Assert.That(() => new EventProcessorOptions { MaximumWaitTime = TimeSpan.FromSeconds(waitTimeSeconds) }, Throws.InstanceOf()); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void MaximumWaitTimeAllowsNull() + { + var options = new EventProcessorOptions(); + Assert.That(() => options.MaximumWaitTime = null, Throws.Nothing); + } + /// /// Verifies functionality of the /// property. @@ -105,6 +117,17 @@ public void PrefetchCountIsValidated(int count) Assert.That(() => new EventProcessorOptions { PrefetchCount = count }, Throws.InstanceOf()); } + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchCountAllowsZero() + { + Assert.That(() => new EventProcessorOptions { PrefetchCount = 0 }, Throws.Nothing); + } + /// /// Verifies functionality of the /// property.