Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Load Balancer Options #13047

Merged
merged 1 commit into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.TestFramework",
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{797FF941-76FD-45FD-AC17-A73DFE2BA621}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary until the Event Hubs core package is shipped and we can return to a package dependency.

EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -35,12 +37,17 @@ Global
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.Build.0 = Release|Any CPU
{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
{65ECFE8C-4D8C-454B-AC63-4559FBE5AF7A} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
public partial class EventProcessorClientOptions
{
public EventProcessorClientOptions() { }
public int CacheEventCount { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
public string Identifier { get { throw null; } set { } }
public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
<Folder Include="Properties\" />
</ItemGroup>

<!-- Project reference is necessary until the Event Hubs Core library 5.2.1-preview.1 is available in package form.-->
<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
</ItemGroup>

<ItemGroup>
<!--<PackageReference Include="Azure.Messaging.EventHubs" />-->
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ namespace Azure.Messaging.EventHubs
[SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal is managed internally as part of the Stop operation.")]
public class EventProcessorClient : EventProcessor<EventProcessorPartition>
{
/// <summary>The number of events to request as the maximum size for batches read from a partition.</summary>
private const int ReadBatchSize = 15;

/// <summary>The delegate to invoke when attempting to update a checkpoint using an empty event.</summary>
private static readonly Func<CancellationToken, Task> EmptyEventUpdateCheckpoint = cancellationToken => throw new InvalidOperationException(Resources.CannotCreateCheckpointForEmptyEvent);

/// <summary>The set of default options for the processor.</summary>
private static readonly EventProcessorClientOptions DefaultClientOptions = new EventProcessorClientOptions();

/// <summary>The default starting position for the processor.</summary>
private readonly EventPosition DefaultStartingPosition = new EventProcessorOptions().DefaultStartingPosition;

/// <summary>The set of default starting positions for partitions being processed; these are collected at initialization and are surfaced as checkpoints to override defaults on a partition-specific basis.</summary>
private readonly ConcurrentDictionary<string, EventPosition> PartitionStartingPositionDefaults = new ConcurrentDictionary<string, EventPosition>();

/// <summary>The primitive for synchronizing access during start and set handler operations.</summary>
private readonly SemaphoreSlim ProcessorStatusGuard = new SemaphoreSlim(1, 1);

/// <summary>The active default starting position for the processor.</summary>
private readonly EventPosition DefaultStartingPosition = new EventProcessorOptions().DefaultStartingPosition;

/// <summary>The handler to be called just before event processing starts for a given partition.</summary>
private Func<PartitionInitializingEventArgs, Task> _partitionInitializingAsync;

Expand Down Expand Up @@ -364,7 +364,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore,
string consumerGroup,
string connectionString,
string eventHubName,
EventProcessorClientOptions clientOptions) : base(ReadBatchSize, consumerGroup, connectionString, eventHubName, CreateOptions(clientOptions))
EventProcessorClientOptions clientOptions) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, connectionString, eventHubName, CreateOptions(clientOptions))
{
Argument.AssertNotNull(checkpointStore, nameof(checkpointStore));
StorageManager = CreateStorageManager(checkpointStore);
Expand All @@ -391,7 +391,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore,
string fullyQualifiedNamespace,
string eventHubName,
TokenCredential credential,
EventProcessorClientOptions clientOptions = default) : base(ReadBatchSize, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions))
EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions))
{
Argument.AssertNotNull(checkpointStore, nameof(checkpointStore));
StorageManager = CreateStorageManager(checkpointStore);
Expand All @@ -405,6 +405,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore,
/// <param name="consumerGroup">The name of the consumer group this processor is associated with. Events are read in the context of this group.</param>
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
/// <param name="cacheEventCount">The maximum number of events that will be read from the Event Hubs service and held in a local memory cache when reading is active and events are being emitted to an enumerator for processing.</param>
/// <param name="credential">An Azure identity credential to satisfy base class requirements; this credential may not be <c>null</c> but will only be used in the case that <see cref="CreateConnection" /> has not been overridden.</param>
/// <param name="clientOptions">The set of options to use for this processor.</param>
///
Expand All @@ -416,8 +417,9 @@ internal EventProcessorClient(StorageManager storageManager,
string consumerGroup,
string fullyQualifiedNamespace,
string eventHubName,
int cacheEventCount,
TokenCredential credential,
EventProcessorOptions clientOptions) : base(ReadBatchSize, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, clientOptions)
EventProcessorOptions clientOptions) : base(cacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, clientOptions)
{
Argument.AssertNotNull(storageManager, nameof(storageManager));

Expand Down Expand Up @@ -988,15 +990,17 @@ private void EnsureNotRunningAndInvoke(Action action)
///
private static EventProcessorOptions CreateOptions(EventProcessorClientOptions clientOptions)
{
clientOptions ??= new EventProcessorClientOptions();
clientOptions ??= DefaultClientOptions;

return new EventProcessorOptions
{
ConnectionOptions = clientOptions.ConnectionOptions.Clone(),
RetryOptions = clientOptions.RetryOptions.Clone(),
Identifier = clientOptions.Identifier,
MaximumWaitTime = clientOptions.MaximumWaitTime,
TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties
TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties,
LoadBalancingStrategy = clientOptions.LoadBalancingStrategy,
PrefetchCount = clientOptions.PrefetchCount
};
}

Expand Down
94 changes: 94 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ public class EventProcessorClientOptions
/// <summary>The maximum amount of time to wait for an event to become available before emitting an <c>null</c> value.</summary>
private TimeSpan? _maximumWaitTime = null;

/// <summary>The event catch count to use when reading events.</summary>
private int _cacheEventCount = 100;

/// <summary>The prefetch count to use when reading events.</summary>
private int _prefetchCount = 300;

/// <summary>The set of options to use for configuring the connection to the Event Hubs service.</summary>
private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions();

Expand Down Expand Up @@ -48,6 +54,16 @@ public class EventProcessorClientOptions
///
public bool TrackLastEnqueuedEventProperties { get; set; } = true;

/// <summary>
/// The strategy that an event processor will use to make decisions about
/// partition ownership when performing load balancing to share work with
/// other event processors.
/// </summary>
///
/// <seealso cref="Processor.LoadBalancingStrategy" />
///
public LoadBalancingStrategy LoadBalancingStrategy { get; set; } = LoadBalancingStrategy.Balanced;

/// <summary>
/// The maximum amount of time to wait for an event to become available for a given partition before emitting
/// an empty event.
Expand Down Expand Up @@ -81,6 +97,81 @@ public TimeSpan? MaximumWaitTime
}
}

/// <summary>
/// The maximum number of events that will be read from the Event Hubs service and held in a local memory
/// cache when reading is active and events are being emitted to an enumerator for processing.
/// </summary>
///
/// <value>
/// The <see cref="CacheEventCount" /> is a control that developers can use to help tune performance for the specific
/// needs of an application, given its expected size of events, throughput needs, and expected scenarios for using
/// Event Hubs.
/// </value>
///
/// <remarks>
/// The size of this cache has an influence on the efficiency of reading events from the Event Hubs service. The
/// larger the size of the cache, the more efficiently service operations can be buffered in the background to
/// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O.
///
/// For scenarios where the size of events is small and many events are flowing through the system, using a larger
/// <see cref="CacheEventCount"/> and <see cref="PrefetchCount" /> may help improve throughput. For scenarios where
/// the size of events is larger or when processing of events is expected to be a heavier and slower operation, using
/// a smaller size <see cref="CacheEventCount"/> and <see cref="PrefetchCount"/> may help manage resource use without
/// incurring a non-trivial cost to throughput.
///
/// Regardless of the values, it is generally recommended that the <see cref="PrefetchCount" /> be at least 2-3
/// times as large as the <see cref="CacheEventCount" /> to allow for efficient buffering of service operations.
/// </remarks>
///
public int CacheEventCount
{
get => _cacheEventCount;

set
{
Argument.AssertAtLeast(value, 1, nameof(CacheEventCount));
_cacheEventCount = value;
}
}

/// <summary>
/// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to
/// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than
/// readers needing to wait for service operations to complete.
/// </summary>
///
/// <value>
/// The <see cref="PrefetchCount" /> is a control that developers can use to help tune performance for the specific
/// needs of an application, given its expected size of events, throughput needs, and expected scenarios for using
/// Event Hubs.
/// </value>
///
/// <remarks>
/// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The
/// larger the size of the cache, the more efficiently service operations can be buffered in the background to
/// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O.
///
/// For scenarios where the size of events is small and many events are flowing through the system, using a larger
/// <see cref="CacheEventCount"/> and <see cref="PrefetchCount" /> may help improve throughput. For scenarios where
/// the size of events is larger or when processing of events is expected to be a heavier and slower operation, using
/// a smaller size <see cref="CacheEventCount"/> and <see cref="PrefetchCount"/> may help manage resource use without
/// incurring a non-trivial cost to throughput.
///
/// Regardless of the values, it is generally recommended that the <see cref="PrefetchCount" /> be at least 2-3
/// times as large as the <see cref="CacheEventCount" /> to allow for efficient buffering of service operations.
/// </remarks>
///
public int PrefetchCount
{
get => _prefetchCount;

set
{
Argument.AssertAtLeast(value, 0, nameof(PrefetchCount));
_prefetchCount = value;
}
}

/// <summary>
/// Gets or sets the options used for configuring the connection to the Event Hubs service.
/// </summary>
Expand Down Expand Up @@ -151,7 +242,10 @@ internal EventProcessorClientOptions Clone() =>
{
Identifier = Identifier,
TrackLastEnqueuedEventProperties = TrackLastEnqueuedEventProperties,
LoadBalancingStrategy = LoadBalancingStrategy,
_maximumWaitTime = _maximumWaitTime,
_cacheEventCount = _cacheEventCount,
_prefetchCount = _prefetchCount,
_connectionOptions = ConnectionOptions.Clone(),
_retryOptions = RetryOptions.Clone()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<ProjectReference Include="..\samples\Azure.Messaging.EventHubs.Processor.Samples.csproj" />
</ItemGroup>

<!-- Project reference is necessary until the Event Hubs Core library 5.2.1-preview.1 is available in package form.-->
<ItemGroup>
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
</ItemGroup>

<!-- Import Event Hubs shared source -->
<Import Project="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs.Shared\src\Azure.Messaging.EventHubs.Shared.Testing.projitems" Label="Testing" />

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task UpdateCheckpointAsyncCreatesScope()
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockContext = new Mock<PartitionContext>("65");
var mockLogger = new Mock<EventProcessorClientEventSource>();
var mockProcessor = new Mock<EventProcessorClient>(Mock.Of<StorageManager>(), "cg", "host", "hub", Mock.Of<TokenCredential>(), null) { CallBase = true };
var mockProcessor = new Mock<EventProcessorClient>(Mock.Of<StorageManager>(), "cg", "host", "hub", 50, Mock.Of<TokenCredential>(), null) { CallBase = true };

mockProcessor
.Protected()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ internal TestEventProcessorClient(StorageManager storageManager,
string eventHubName,
TokenCredential credential,
Func<EventHubConnection> connectionFactory,
EventProcessorOptions options) : base(storageManager, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
EventProcessorOptions options) : base(storageManager, consumerGroup, fullyQualifiedNamespace, eventHubName, 100, credential, options)
{
InjectedConnectionFactory = connectionFactory;
}
Expand Down
Loading