Skip to content

Commit

Permalink
Remove abstraction layers from EventHubs WebJobs extensions (#17271)
Browse files Browse the repository at this point in the history
There are some layers in EventHubs WebJobs extensions that were the artifact of Track 2 refactoring. Remove them to simplify the code a bit.

Major changes:
* Merge `Processor` and `EventProcessorHost`
* Merge `ProcessorPartitionContext` and `Partition`, rename to `EventProcessorHostPartition`
* Remove ICheckpointer
  • Loading branch information
pakrym authored Dec 2, 2020
1 parent f4dc2e9 commit 51ebdb1
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,12 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName;

// Use blob prefix support available in EPH starting in 2.2.6
EventProcessorHost host = new EventProcessorHost(
EventProcessorHost host = new EventProcessorHost(consumerGroup: consumerGroup,
connectionString: creds.EventHubConnectionString,
eventHubName: eventHubName,
consumerGroupName: consumerGroup,
eventHubConnectionString: creds.EventHubConnectionString,
exceptionHandler: _exceptionHandler);
options: this.EventProcessorOptions,
eventBatchMaximumCount: _maxBatchSize,
invokeProcessorAfterReceiveTimeout: InvokeProcessorAfterReceiveTimeout, exceptionHandler: _exceptionHandler);

return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca
private readonly BlobsCheckpointStore _checkpointStore;
private readonly EventHubOptions _options;
private readonly ILogger _logger;
private bool _started;

private Lazy<EventHubsScaleMonitor> _scaleMonitor;

Expand Down Expand Up @@ -70,17 +69,12 @@ void IDisposable.Dispose()

public async Task StartAsync(CancellationToken cancellationToken)
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.MaxBatchSize, _options.InvokeProcessorAfterReceiveTimeout, _checkpointStore, _options.EventProcessorOptions).ConfigureAwait(false);
_started = true;
await _eventProcessorHost.StartProcessingAsync(this, _checkpointStore, cancellationToken).ConfigureAwait(false);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (_started)
{
await _eventProcessorHost.UnregisterEventProcessorAsync().ConfigureAwait(false);
}
_started = false;
await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false);
}

IEventProcessor IEventProcessorFactory.CreateEventProcessor()
Expand All @@ -93,37 +87,27 @@ public IScaleMonitor GetMonitor()
return _scaleMonitor.Value;
}

/// <summary>
/// Wrapper for un-mockable checkpoint APIs to aid in unit testing
/// </summary>
internal interface ICheckpointer
{
Task CheckpointAsync(ProcessorPartitionContext context);
}

// We get a new instance each time Start() is called.
// We'll get a listener per partition - so they can potentialy run in parallel even on a single machine.
internal class EventProcessor : IEventProcessor, IDisposable, ICheckpointer
internal class EventProcessor : IEventProcessor, IDisposable
{
private readonly ITriggeredFunctionExecutor _executor;
private readonly bool _singleDispatch;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly ICheckpointer _checkpointer;
private readonly int _batchCheckpointFrequency;
private int _batchCounter;
private bool _disposed;

public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null)
public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
{
_checkpointer = checkpointer ?? this;
_executor = executor;
_singleDispatch = singleDispatch;
_batchCheckpointFrequency = options.BatchCheckpointFrequency;
_logger = logger;
}

public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReason reason)
public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
{
// signal cancellation for any in progress executions
_cts.Cancel();
Expand All @@ -132,13 +116,13 @@ public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReaso
return Task.CompletedTask;
}

public Task OpenAsync(ProcessorPartitionContext context)
public Task OpenAsync(EventProcessorHostPartition context)
{
_logger.LogDebug(GetOperationDetails(context, "OpenAsync"));
return Task.CompletedTask;
}

public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error)
public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error)
{
string errorDetails = $"Processing error (Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}').";

Expand All @@ -147,7 +131,7 @@ public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error
return Task.CompletedTask;
}

public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumerable<EventData> messages)
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages)
{
var triggerInput = new EventHubTriggerInput
{
Expand Down Expand Up @@ -209,8 +193,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
// code, and capture/log/persist failed events, since they won't be retried.
if (messages.Any())
{
context.CheckpointEvent = messages.Last();
await CheckpointAsync(context).ConfigureAwait(false);
await CheckpointAsync(messages.Last(), context).ConfigureAwait(false);
}
}

Expand All @@ -222,12 +205,12 @@ private async Task TryExecuteWithLoggingAsync(TriggeredFunctionData input, Event
}
}

private async Task CheckpointAsync(ProcessorPartitionContext context)
private async Task CheckpointAsync(EventData checkpointEvent, EventProcessorHostPartition context)
{
bool checkpointed = false;
if (_batchCheckpointFrequency == 1)
{
await _checkpointer.CheckpointAsync(context).ConfigureAwait(false);
await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false);
checkpointed = true;
}
else
Expand All @@ -236,7 +219,7 @@ private async Task CheckpointAsync(ProcessorPartitionContext context)
if (++_batchCounter >= _batchCheckpointFrequency)
{
_batchCounter = 0;
await _checkpointer.CheckpointAsync(context).ConfigureAwait(false);
await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false);
checkpointed = true;
}
}
Expand Down Expand Up @@ -264,11 +247,6 @@ public void Dispose()
Dispose(true);
}

async Task ICheckpointer.CheckpointAsync(ProcessorPartitionContext context)
{
await context.CheckpointAsync().ConfigureAwait(false);
}

private static Dictionary<string, object> GetLinksScope(EventData message)
{
if (TryGetLinkedActivity(message, out var link))
Expand Down Expand Up @@ -319,7 +297,7 @@ private static bool TryGetLinkedActivity(EventData message, out Activity link)
return false;
}

private static string GetOperationDetails(ProcessorPartitionContext context, string operation)
private static string GetOperationDetails(EventProcessorHostPartition context, string operation)
{
StringWriter sw = new StringWriter();
using (JsonTextWriter writer = new JsonTextWriter(sw) { Formatting = Formatting.None })
Expand Down
Loading

0 comments on commit 51ebdb1

Please sign in to comment.