diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs index 4fbf27b89056..23fa767b1cf1 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs @@ -254,11 +254,12 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; // Use blob prefix support available in EPH starting in 2.2.6 - EventProcessorHost host = new EventProcessorHost( + EventProcessorHost host = new EventProcessorHost(consumerGroup: consumerGroup, + connectionString: creds.EventHubConnectionString, eventHubName: eventHubName, - consumerGroupName: consumerGroup, - eventHubConnectionString: creds.EventHubConnectionString, - exceptionHandler: _exceptionHandler); + options: this.EventProcessorOptions, + eventBatchMaximumCount: _maxBatchSize, + invokeProcessorAfterReceiveTimeout: InvokeProcessorAfterReceiveTimeout, exceptionHandler: _exceptionHandler); return host; } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs index 653f9f0d636e..75639ba2f554 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs @@ -30,7 +30,6 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca private readonly BlobsCheckpointStore _checkpointStore; private readonly EventHubOptions _options; private readonly ILogger _logger; - private bool _started; private Lazy _scaleMonitor; @@ -70,17 +69,12 @@ void IDisposable.Dispose() public async Task StartAsync(CancellationToken cancellationToken) { - await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.MaxBatchSize, _options.InvokeProcessorAfterReceiveTimeout, _checkpointStore, _options.EventProcessorOptions).ConfigureAwait(false); - _started = true; + await _eventProcessorHost.StartProcessingAsync(this, _checkpointStore, cancellationToken).ConfigureAwait(false); } public async Task StopAsync(CancellationToken cancellationToken) { - if (_started) - { - await _eventProcessorHost.UnregisterEventProcessorAsync().ConfigureAwait(false); - } - _started = false; + await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false); } IEventProcessor IEventProcessorFactory.CreateEventProcessor() @@ -93,37 +87,27 @@ public IScaleMonitor GetMonitor() return _scaleMonitor.Value; } - /// - /// Wrapper for un-mockable checkpoint APIs to aid in unit testing - /// - internal interface ICheckpointer - { - Task CheckpointAsync(ProcessorPartitionContext context); - } - // We get a new instance each time Start() is called. // We'll get a listener per partition - so they can potentialy run in parallel even on a single machine. - internal class EventProcessor : IEventProcessor, IDisposable, ICheckpointer + internal class EventProcessor : IEventProcessor, IDisposable { private readonly ITriggeredFunctionExecutor _executor; private readonly bool _singleDispatch; private readonly ILogger _logger; private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - private readonly ICheckpointer _checkpointer; private readonly int _batchCheckpointFrequency; private int _batchCounter; private bool _disposed; - public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null) + public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch) { - _checkpointer = checkpointer ?? this; _executor = executor; _singleDispatch = singleDispatch; _batchCheckpointFrequency = options.BatchCheckpointFrequency; _logger = logger; } - public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReason reason) + public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason) { // signal cancellation for any in progress executions _cts.Cancel(); @@ -132,13 +116,13 @@ public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReaso return Task.CompletedTask; } - public Task OpenAsync(ProcessorPartitionContext context) + public Task OpenAsync(EventProcessorHostPartition context) { _logger.LogDebug(GetOperationDetails(context, "OpenAsync")); return Task.CompletedTask; } - public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error) + public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error) { string errorDetails = $"Processing error (Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}')."; @@ -147,7 +131,7 @@ public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error return Task.CompletedTask; } - public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumerable messages) + public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages) { var triggerInput = new EventHubTriggerInput { @@ -209,8 +193,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera // code, and capture/log/persist failed events, since they won't be retried. if (messages.Any()) { - context.CheckpointEvent = messages.Last(); - await CheckpointAsync(context).ConfigureAwait(false); + await CheckpointAsync(messages.Last(), context).ConfigureAwait(false); } } @@ -222,12 +205,12 @@ private async Task TryExecuteWithLoggingAsync(TriggeredFunctionData input, Event } } - private async Task CheckpointAsync(ProcessorPartitionContext context) + private async Task CheckpointAsync(EventData checkpointEvent, EventProcessorHostPartition context) { bool checkpointed = false; if (_batchCheckpointFrequency == 1) { - await _checkpointer.CheckpointAsync(context).ConfigureAwait(false); + await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false); checkpointed = true; } else @@ -236,7 +219,7 @@ private async Task CheckpointAsync(ProcessorPartitionContext context) if (++_batchCounter >= _batchCheckpointFrequency) { _batchCounter = 0; - await _checkpointer.CheckpointAsync(context).ConfigureAwait(false); + await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false); checkpointed = true; } } @@ -264,11 +247,6 @@ public void Dispose() Dispose(true); } - async Task ICheckpointer.CheckpointAsync(ProcessorPartitionContext context) - { - await context.CheckpointAsync().ConfigureAwait(false); - } - private static Dictionary GetLinksScope(EventData message) { if (TryGetLinkedActivity(message, out var link)) @@ -319,7 +297,7 @@ private static bool TryGetLinkedActivity(EventData message, out Activity link) return false; } - private static string GetOperationDetails(ProcessorPartitionContext context, string operation) + private static string GetOperationDetails(EventProcessorHostPartition context, string operation) { StringWriter sw = new StringWriter(); using (JsonTextWriter writer = new JsonTextWriter(sw) { Formatting = Formatting.None }) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 5ff36cc9a2c7..066d708668c4 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -13,155 +13,124 @@ namespace Microsoft.Azure.WebJobs.EventHubs.Processor { - internal class EventProcessorHost + internal class EventProcessorHost : EventProcessor { - public string EventHubName { get; } - public string ConsumerGroupName { get; } - public string EventHubConnectionString { get; } - private Processor CurrentProcessor { get; set; } - private Action ExceptionHandler { get; } - - public EventProcessorHost(string eventHubName, string consumerGroupName, string eventHubConnectionString, Action exceptionHandler) + private readonly bool _invokeProcessorAfterReceiveTimeout; + private readonly Action _exceptionHandler; + private readonly ConcurrentDictionary _leaseInfos; + private IEventProcessorFactory _processorFactory; + private BlobsCheckpointStore _checkpointStore; + + /// + /// Mocking constructor + /// + protected EventProcessorHost() { - EventHubName = eventHubName; - ConsumerGroupName = consumerGroupName; - EventHubConnectionString = eventHubConnectionString; - ExceptionHandler = exceptionHandler; } - public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory factory, int maxBatchSize, bool invokeProcessorAfterReceiveTimeout, BlobsCheckpointStore checkpointStore, EventProcessorOptions options) + public EventProcessorHost(string consumerGroup, + string connectionString, + string eventHubName, + EventProcessorOptions options, + int eventBatchMaximumCount, + bool invokeProcessorAfterReceiveTimeout, + Action exceptionHandler) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) { - if (CurrentProcessor != null) - { - throw new InvalidOperationException("Processor has already been started"); - } - - CurrentProcessor = new Processor(maxBatchSize, - ConsumerGroupName, - EventHubConnectionString, - EventHubName, - options, - factory, - invokeProcessorAfterReceiveTimeout, - ExceptionHandler, - checkpointStore - ); - await CurrentProcessor.StartProcessingAsync().ConfigureAwait(false); + _invokeProcessorAfterReceiveTimeout = invokeProcessorAfterReceiveTimeout; + _exceptionHandler = exceptionHandler; + _leaseInfos = new ConcurrentDictionary(); } - public async Task UnregisterEventProcessorAsync() + protected override async Task> ClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) { - if (CurrentProcessor == null) - { - throw new InvalidOperationException("Processor has not been started"); - } - - await CurrentProcessor.StopProcessingAsync().ConfigureAwait(false); - CurrentProcessor = null; + return await _checkpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken).ConfigureAwait(false); } - internal class Partition : EventProcessorPartition + protected override async Task> ListCheckpointsAsync(CancellationToken cancellationToken) { - public IEventProcessor Processor { get; set; } - public ProcessorPartitionContext Context { get; set; } + return await _checkpointStore.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); } - internal class Processor : EventProcessor + protected override async Task> ListOwnershipAsync(CancellationToken cancellationToken) { - private IEventProcessorFactory ProcessorFactory { get; } - private bool InvokeProcessorAfterRecieveTimeout { get; } - private Action ExceptionHandler { get; } - private ConcurrentDictionary LeaseInfos { get; } - private BlobsCheckpointStore CheckpointStore { get; } + return await _checkpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); + } - public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobsCheckpointStore checkpointStore) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) + internal virtual async Task CheckpointAsync(string partitionId, EventData checkpointEvent, CancellationToken cancellationToken = default) + { + await _checkpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint() { - ProcessorFactory = processorFactory; - InvokeProcessorAfterRecieveTimeout = invokeProcessorAfterRecieveTimeout; - ExceptionHandler = exceptionHandler; - LeaseInfos = new ConcurrentDictionary(); - CheckpointStore = checkpointStore; - } + PartitionId = partitionId, + ConsumerGroup = ConsumerGroup, + EventHubName = EventHubName, + FullyQualifiedNamespace = FullyQualifiedNamespace + }, checkpointEvent, cancellationToken).ConfigureAwait(false); + } - protected override async Task> ClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) + internal virtual LeaseInfo GetLeaseInfo(string partitionId) + { + if (_leaseInfos.TryGetValue(partitionId, out LeaseInfo lease)) { - return await CheckpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken).ConfigureAwait(false); + return lease; } - protected override async Task> ListCheckpointsAsync(CancellationToken cancellationToken) - { - return await CheckpointStore.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); - } + return null; + } - protected override async Task> ListOwnershipAsync(CancellationToken cancellationToken) + protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorHostPartition partition, string operationDescription, CancellationToken cancellationToken) + { + if (partition != null) { - return await CheckpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); + return partition.EventProcessor.ProcessErrorAsync(partition, exception); } - internal virtual async Task CheckpointAsync(string partitionId, EventData checkpointEvent, CancellationToken cancellationToken = default) + try { - await CheckpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint() - { - PartitionId = partitionId, - ConsumerGroup = ConsumerGroup, - EventHubName = EventHubName, - FullyQualifiedNamespace = FullyQualifiedNamespace - }, checkpointEvent, cancellationToken).ConfigureAwait(false); + _exceptionHandler(new ExceptionReceivedEventArgs(Identifier, operationDescription, null, exception)); } - - internal virtual LeaseInfo GetLeaseInfo(string partitionId) + catch { - if (LeaseInfos.TryGetValue(partitionId, out LeaseInfo lease)) { - return lease; - } - - return null; + // ignore } - protected override Task OnProcessingErrorAsync(Exception exception, Partition partition, string operationDescription, CancellationToken cancellationToken) - { - if (partition != null) - { - return partition.Processor.ProcessErrorAsync(partition.Context, exception); - } - - try - { - ExceptionHandler(new ExceptionReceivedEventArgs(Identifier, operationDescription, null, exception)); - } - catch (Exception) - { - // Best effort logging. - } + return Task.CompletedTask; + } + protected override Task OnProcessingEventBatchAsync(IEnumerable events, EventProcessorHostPartition partition, CancellationToken cancellationToken) + { + if ((events == null || !events.Any()) && !_invokeProcessorAfterReceiveTimeout) + { return Task.CompletedTask; } - protected override Task OnProcessingEventBatchAsync(IEnumerable events, Partition partition, CancellationToken cancellationToken) - { - if ((events == null || !events.Any()) && !InvokeProcessorAfterRecieveTimeout) - { - return Task.CompletedTask; - } - - return partition.Processor.ProcessEventsAsync(partition.Context, events); - } + return partition.EventProcessor.ProcessEventsAsync(partition, events); + } - protected override Task OnInitializingPartitionAsync(Partition partition, CancellationToken cancellationToken) - { - partition.Processor = ProcessorFactory.CreateEventProcessor(); - partition.Context = new ProcessorPartitionContext(partition.PartitionId, this, ReadLastEnqueuedEventProperties); + protected override Task OnInitializingPartitionAsync(EventProcessorHostPartition partition, CancellationToken cancellationToken) + { + partition.ProcessorHost = this; + partition.EventProcessor = _processorFactory.CreateEventProcessor(); + partition.ReadLastEnqueuedEventPropertiesFunc = ReadLastEnqueuedEventProperties; + + // Since we are re-initializing this partition, any cached information we have about the parititon will be incorrect. + // Clear it out now, if there is any, we'll refresh it in ListCheckpointsAsync, which EventProcessor will call before starting to pump messages. + _leaseInfos.TryRemove(partition.PartitionId, out _); + return partition.EventProcessor.OpenAsync(partition); + } - // Since we are re-initializing this partition, any cached information we have about the parititon will be incorrect. - // Clear it out now, if there is any, we'll refresh it in ListCheckpointsAsync, which EventProcessor will call before starting to pump messages. - LeaseInfos.TryRemove(partition.PartitionId, out _); - return partition.Processor.OpenAsync(partition.Context); - } + protected override Task OnPartitionProcessingStoppedAsync(EventProcessorHostPartition partition, ProcessingStoppedReason reason, CancellationToken cancellationToken) + { + return partition.EventProcessor.CloseAsync(partition, reason); + } - protected override Task OnPartitionProcessingStoppedAsync(Partition partition, ProcessingStoppedReason reason, CancellationToken cancellationToken) - { - return partition.Processor.CloseAsync(partition.Context, reason); - } + public async Task StartProcessingAsync( + IEventProcessorFactory processorFactory, + BlobsCheckpointStore checkpointStore, + CancellationToken cancellationToken) + { + _processorFactory = processorFactory; + _checkpointStore = checkpointStore; + await StartProcessingAsync(cancellationToken).ConfigureAwait(false); } } -} +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHostPartition.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHostPartition.cs new file mode 100644 index 000000000000..c6d4b9f29dd2 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHostPartition.cs @@ -0,0 +1,52 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Primitives; + +namespace Microsoft.Azure.WebJobs.EventHubs.Processor +{ + internal class EventProcessorHostPartition : EventProcessorPartition + { + public EventProcessorHostPartition() + { + } + + public EventProcessorHostPartition(string partitionId) + { + PartitionId = partitionId; + } + + public string Owner => ProcessorHost.Identifier; + public string EventHubPath => ProcessorHost.EventHubName; + public LeaseInfo Lease => ProcessorHost.GetLeaseInfo(PartitionId); + + public LastEnqueuedEventProperties? LastEnqueuedEventProperties + { + get + { + try + { + return ReadLastEnqueuedEventPropertiesFunc?.Invoke(PartitionId); + } + catch (EventHubsException e) when (e.Reason == EventHubsException.FailureReason.ClientClosed) + { + // If the connection is closed, just return default value. This could be called before our connection is established (e.g. the context is passed to OnPartitionIntializingAsync, but the + // connection for that partion has not been made yet, so the above call will fail). + return default; + } + } + } + public IEventProcessor EventProcessor { get; set; } + public EventProcessorHost ProcessorHost { get; set; } + public Func ReadLastEnqueuedEventPropertiesFunc { get; set; } + + public async Task CheckpointAsync(EventData checkpointEvent) + { + await ProcessorHost.CheckpointAsync(PartitionId, checkpointEvent).ConfigureAwait(false); + } + } +} diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs index dacd2c00e1d1..fddf25887750 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs @@ -5,16 +5,15 @@ using System.Collections.Generic; using System.Threading.Tasks; using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; namespace Microsoft.Azure.WebJobs.EventHubs.Processor { internal interface IEventProcessor { - Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReason reason); - Task OpenAsync(ProcessorPartitionContext context); - Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error); - Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumerable messages); + Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason); + Task OpenAsync(EventProcessorHostPartition context); + Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error); + Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages); } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/ProcessorPartitionContext.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/ProcessorPartitionContext.cs deleted file mode 100644 index 0c102ef8baf0..000000000000 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/ProcessorPartitionContext.cs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; - -namespace Microsoft.Azure.WebJobs.EventHubs.Processor -{ - internal class ProcessorPartitionContext : PartitionContext - { - public string Owner { get => Processor.Identifier; } - public string EventHubPath { get => Processor.EventHubName; } - public LeaseInfo Lease { get => Processor.GetLeaseInfo(PartitionId); } - public LastEnqueuedEventProperties? LastEnqueuedEventProperties - { - get - { - try - { - return ReadLastEnqueuedEventPropertiesFunc(PartitionId); - } - catch (EventHubsException e) when (e.Reason == EventHubsException.FailureReason.ClientClosed) - { - // If the connection is closed, just return default value. This could be called before our connection is established (e.g. the context is passed to OnPartitionIntializingAsync, but the - // connection for that partion has not been made yet, so the above call will fail). - return default; - } - } - } - private EventProcessorHost.Processor Processor { get; } - private Func ReadLastEnqueuedEventPropertiesFunc { get; } - internal EventData CheckpointEvent { get; set; } - - public ProcessorPartitionContext(string partitionId, EventProcessorHost.Processor processor, Func readLastEnqueuedEventPropertiesFunc) : base(partitionId) - { - Processor = processor; - ReadLastEnqueuedEventPropertiesFunc = readLastEnqueuedEventPropertiesFunc; - } - - public async Task CheckpointAsync() - { - await Processor.CheckpointAsync(PartitionId, CheckpointEvent).ConfigureAwait(false); - } - - public override LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() - { - return ReadLastEnqueuedEventPropertiesFunc(PartitionId); - } - } -} diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs index 3b1711263acf..4b6348b2b2fd 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs @@ -5,6 +5,8 @@ using Azure.Messaging.EventHubs.Consumer; using System.Collections.Generic; using System.Globalization; +using Azure.Messaging.EventHubs.Primitives; +using Microsoft.Azure.WebJobs.EventHubs.Processor; namespace Microsoft.Azure.WebJobs.EventHubs { @@ -17,7 +19,7 @@ internal sealed class EventHubTriggerInput internal EventData[] Events { get; set; } - internal PartitionContext PartitionContext { get; set; } + internal EventProcessorPartition PartitionContext { get; set; } public bool IsSingleDispatch { @@ -55,7 +57,7 @@ public EventData GetSingleEventData() return this.Events[this._selector]; } - public Dictionary GetTriggerDetails(PartitionContext context) + public Dictionary GetTriggerDetails(EventProcessorPartition context) { if (Events.Length == 0) { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 794be1dd6981..6920dbd99450 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Listeners; @@ -37,15 +38,18 @@ public async Task ProcessEvents_SingleDispatch_CheckpointsCorrectly(int batchChe { BatchCheckpointFrequency = batchCheckpointFrequency }; - var checkpointer = new Mock(MockBehavior.Strict); - checkpointer.Setup(p => p.CheckpointAsync(partitionContext)).Callback(c => + var processor = new Mock(MockBehavior.Strict); + processor.Setup(p => p.GetLeaseInfo(partitionContext.PartitionId)).Returns((LeaseInfo)null); + processor.Setup(p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny())).Callback(() => { checkpoints++; }).Returns(Task.CompletedTask); + partitionContext.ProcessorHost = processor.Object; + var loggerMock = new Mock(); var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true, checkpointer.Object); + var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true); for (int i = 0; i < 100; i++) { @@ -68,12 +72,16 @@ public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchC { BatchCheckpointFrequency = batchCheckpointFrequency }; - var checkpointer = new Mock(MockBehavior.Strict); - checkpointer.Setup(p => p.CheckpointAsync(partitionContext)).Returns(Task.CompletedTask); + + var processor = new Mock(MockBehavior.Strict); + processor.Setup(p => p.GetLeaseInfo(partitionContext.PartitionId)).Returns((LeaseInfo)null); + processor.Setup(p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny())).Returns(Task.CompletedTask); + partitionContext.ProcessorHost = processor.Object; + var loggerMock = new Mock(); var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, false, checkpointer.Object); + var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, false); for (int i = 0; i < 100; i++) { @@ -81,7 +89,9 @@ public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchC await eventProcessor.ProcessEventsAsync(partitionContext, events); } - checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Exactly(expected)); + processor.Verify( + p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), + Times.Exactly(expected)); } /// @@ -94,8 +104,11 @@ public async Task ProcessEvents_Failure_Checkpoints() { var partitionContext = EventHubTests.GetPartitionContext(); var options = new EventHubOptions(); - var checkpointer = new Mock(MockBehavior.Strict); - checkpointer.Setup(p => p.CheckpointAsync(partitionContext)).Returns(Task.CompletedTask); + + var processor = new Mock(MockBehavior.Strict); + processor.Setup(p => p.GetLeaseInfo(partitionContext.PartitionId)).Returns((LeaseInfo)null); + processor.Setup(p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny())).Returns(Task.CompletedTask); + partitionContext.ProcessorHost = processor.Object; List events = new List(); List results = new List(); @@ -116,11 +129,13 @@ public async Task ProcessEvents_Failure_Checkpoints() var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true, checkpointer.Object); + var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true); await eventProcessor.ProcessEventsAsync(partitionContext, events); - checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Once); + processor.Verify( + p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), + Times.Once); } [Test] @@ -128,14 +143,21 @@ public async Task CloseAsync_Shutdown_DoesNotCheckpoint() { var partitionContext = EventHubTests.GetPartitionContext(); var options = new EventHubOptions(); - var checkpointer = new Mock(MockBehavior.Strict); + + var processor = new Mock(MockBehavior.Strict); + processor.Setup(p => p.GetLeaseInfo(partitionContext.PartitionId)).Returns((LeaseInfo)null); + processor.Setup(p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny())).Returns(Task.CompletedTask); + partitionContext.ProcessorHost = processor.Object; + var executor = new Mock(MockBehavior.Strict); var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true, checkpointer.Object); + var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true); await eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown); - checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Never); + processor.Verify( + p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), + Times.Never); } [Test] @@ -143,10 +165,9 @@ public async Task ProcessErrorsAsync_LoggedAsError() { var partitionContext = EventHubTests.GetPartitionContext(partitionId: "123", eventHubPath: "abc", owner: "def"); var options = new EventHubOptions(); - var checkpointer = new Mock(MockBehavior.Strict); var executor = new Mock(MockBehavior.Strict); var testLogger = new TestLogger("Test"); - var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true, checkpointer.Object); + var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true); var ex = new InvalidOperationException("My InvalidOperationException!"); @@ -162,10 +183,9 @@ public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation() { var partitionContext = EventHubTests.GetPartitionContext(partitionId: "123", eventHubPath: "abc", owner: "def"); var options = new EventHubOptions(); - var checkpointer = new Mock(MockBehavior.Strict); var executor = new Mock(MockBehavior.Strict); var testLogger = new TestLogger("Test"); - var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true, checkpointer.Object); + var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true); var disconnectedEx = new EventHubsException(true, "My ReceiverDisconnectedException!", EventHubsException.FailureReason.ConsumerDisconnected); @@ -193,10 +213,12 @@ public void GetMonitor_ReturnsExpectedValue() var eventHubName = "EventHubName"; var consumerGroup = "ConsumerGroup"; var testLogger = new TestLogger("Test"); - var host = new EventProcessorHost( + var host = new EventProcessorHost(consumerGroup, + "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", eventHubName, - consumerGroup, - "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", null); + new EventProcessorOptions(), + 3, + false, null); var consumerClientMock = new Mock(); consumerClientMock.SetupGet(c => c.ConsumerGroup).Returns(consumerGroup); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs index 7fdf92089844..33aa82cbe9c6 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs @@ -247,19 +247,19 @@ public void InitializeFromHostMetadata() // Assert.AreEqual(21, options.PartitionManagerOptions.RenewInterval.TotalSeconds); } - internal static ProcessorPartitionContext GetPartitionContext(string partitionId = "0", string eventHubPath = "path", + internal static EventProcessorHostPartition GetPartitionContext(string partitionId = "0", string eventHubPath = "path", string consumerGroupName = "group", string owner = null) { - var processor = new EventProcessorHost.Processor(Int32.MaxValue, - consumerGroupName, + var processor = new EventProcessorHost(consumerGroupName, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", eventHubPath, new EventProcessorOptions(), - null, - false, - null, - Mock.Of()); - return new ProcessorPartitionContext(partitionId, processor, s => default); + Int32.MaxValue, + false, null); + return new EventProcessorHostPartition(partitionId) + { + ProcessorHost = processor + }; } } }