From 9aedd3e2044982c7a37fd680d035f112dc037283 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Mon, 8 Apr 2024 17:16:00 +0200 Subject: [PATCH 1/6] Ensure we eagerly return the outbound array to the array pool --- elastic-ingest-dotnet.sln | 7 ++ .../Elastic.Channels.Continuous/Program.cs | 17 +-- examples/playground/Program.cs | 81 ++++++++++++++ examples/playground/playground.csproj | 18 ++++ src/Elastic.Channels/BufferedChannelBase.cs | 100 ++++++++++-------- src/Elastic.Channels/ChannelOptionsBase.cs | 3 + .../ElasticsearchChannelBase.Bootstrap.cs | 2 + .../ElasticsearchChannelBase.cs | 1 + ...gest.Elasticsearch.IntegrationTests.csproj | 2 +- 9 files changed, 178 insertions(+), 53 deletions(-) create mode 100644 examples/playground/Program.cs create mode 100644 examples/playground/playground.csproj diff --git a/elastic-ingest-dotnet.sln b/elastic-ingest-dotnet.sln index 5144a12..0140dc6 100644 --- a/elastic-ingest-dotnet.sln +++ b/elastic-ingest-dotnet.sln @@ -68,6 +68,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Elastic.Ingest.Elasticsearc EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Performance.Common", "benchmarks\Performance.Common\Performance.Common.csproj", "{077B0564-26D1-4FF1-98A5-633EAA9BE051}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "playground", "examples\playground\playground.csproj", "{C0E73713-EA13-403C-BE43-F2164AB6CF73}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -138,6 +140,10 @@ Global {077B0564-26D1-4FF1-98A5-633EAA9BE051}.Debug|Any CPU.Build.0 = Debug|Any CPU {077B0564-26D1-4FF1-98A5-633EAA9BE051}.Release|Any CPU.ActiveCfg = Release|Any CPU {077B0564-26D1-4FF1-98A5-633EAA9BE051}.Release|Any CPU.Build.0 = Release|Any CPU + {C0E73713-EA13-403C-BE43-F2164AB6CF73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C0E73713-EA13-403C-BE43-F2164AB6CF73}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C0E73713-EA13-403C-BE43-F2164AB6CF73}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C0E73713-EA13-403C-BE43-F2164AB6CF73}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -159,6 +165,7 @@ Global {1609BC01-FEA6-4818-9C86-9DB7B82FCB48} = {1C174D50-20D2-4006-9681-FDDB39351D99} {DE05C98F-C410-4ED0-A77B-36B66C784105} = {1C174D50-20D2-4006-9681-FDDB39351D99} {077B0564-26D1-4FF1-98A5-633EAA9BE051} = {1C174D50-20D2-4006-9681-FDDB39351D99} + {C0E73713-EA13-403C-BE43-F2164AB6CF73} = {B67CBB46-74C1-47EB-9E41-D55C5E0E0D85} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {87AD3256-8FD8-4C94-AB66-5ADBAC939722} diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs index e5ab816..7aa6b89 100644 --- a/examples/Elastic.Channels.Continuous/Program.cs +++ b/examples/Elastic.Channels.Continuous/Program.cs @@ -5,6 +5,12 @@ using Elastic.Channels; using Elastic.Channels.Diagnostics; +var ctxs = new CancellationTokenSource(); +Console.CancelKeyPress += (sender, eventArgs) => { + ctxs.Cancel(); + eventArgs.Cancel = true; +}; + var options = new NoopBufferedChannel.NoopChannelOptions { BufferOptions = new BufferOptions() @@ -19,19 +25,18 @@ }; var channel = new DiagnosticsBufferedChannel(options); -for (long i = 0; i < long.MaxValue; i++) +await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) => { var e = new NoopBufferedChannel.NoopEvent { Id = i }; var written = false; - var ready = await channel.WaitToWriteAsync(); + //Console.Write('.'); + var ready = await channel.WaitToWriteAsync(ctx); if (ready) written = channel.TryWrite(e); - if (!written || channel.BufferMismatches > 0) + if (!written) { Console.WriteLine(); Console.WriteLine(channel); Console.WriteLine(i); Environment.Exit(1); } - -} - +}); diff --git a/examples/playground/Program.cs b/examples/playground/Program.cs new file mode 100644 index 0000000..08626dd --- /dev/null +++ b/examples/playground/Program.cs @@ -0,0 +1,81 @@ +using Elastic.Channels; +using Elastic.Elasticsearch.Ephemeral; +using Elastic.Ingest.Elasticsearch; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Transport; + +var random = new Random(); +var ctxs = new CancellationTokenSource(); +var parallelOpts = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }; +const int numDocs = 1_000_000; +var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs / 100, ExportMaxConcurrency = 1, OutboundBufferMaxSize = 10_000 }; +var config = new EphemeralClusterConfiguration("8.13.0"); +using var cluster = new EphemeralCluster(config); +using var channel = SetupElasticsearchChannel(); + +Console.CancelKeyPress += (sender, eventArgs) => +{ + ctxs.Cancel(); + cluster.Dispose(); + eventArgs.Cancel = true; +}; + + +using var started = cluster.Start(); + +var memoryBefore = GC.GetTotalMemory(false); + +await PushToChannel(channel); + +// This is not really indicative because the channel is still draining at this point in time +var memoryAfter = GC.GetTotalMemory(false); +Console.WriteLine($"Memory before: {memoryBefore} bytes"); +Console.WriteLine($"Memory after: {memoryAfter} bytes"); +var memoryUsed = memoryAfter - memoryBefore; +Console.WriteLine($"Memory used: {memoryUsed} bytes"); + +Console.WriteLine($"Press any key..."); +Console.ReadKey(); + + +async Task PushToChannel(DataStreamChannel c) +{ + if (c == null) throw new ArgumentNullException(nameof(c)); + + await c.BootstrapElasticsearchAsync(BootstrapMethod.Failure); + await Parallel.ForEachAsync(Enumerable.Range(0, numDocs), parallelOpts, async (i, ctx) => + { + await DoChannelWrite(i, ctx); + }); + + async Task DoChannelWrite(int i, CancellationToken cancellationToken) + { + var message = $"Logging information {i} - Random value: {random.NextDouble()}"; + var doc = new EcsDocument { Timestamp = DateTimeOffset.UtcNow, Message = message }; + if (await c.WaitToWriteAsync(cancellationToken) && c.TryWrite(doc)) + return; + + Console.WriteLine("Failed To write"); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } +} + +DataStreamChannel SetupElasticsearchChannel() +{ + var transportConfiguration = new TransportConfiguration(new Uri("http://localhost:9200")); + var c = new DataStreamChannel( + new DataStreamChannelOptions(new DistributedTransport(transportConfiguration)) + { + BufferOptions = bufferOptions, + CancellationToken = ctxs.Token + }); + + return c; +} + +public class EcsDocument +{ + public DateTimeOffset Timestamp { init; get; } + public string Message { init; get; } = null!; +} + diff --git a/examples/playground/playground.csproj b/examples/playground/playground.csproj new file mode 100644 index 0000000..4724cce --- /dev/null +++ b/examples/playground/playground.csproj @@ -0,0 +1,18 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index c001e10..164aa03 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -74,7 +74,9 @@ protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } /// protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) { - TokenSource = new CancellationTokenSource(); + TokenSource = options.CancellationToken.HasValue + ? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value) + : new CancellationTokenSource(); Options = options; var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray(); @@ -123,16 +125,16 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(maxOut, BufferOptions.OutboundBufferMaxLifetime); _outTask = Task.Factory.StartNew(async () => - await ConsumeOutboundEventsAsync().ConfigureAwait(false), - CancellationToken.None, - TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, - TaskScheduler.Default); + await ConsumeOutboundEventsAsync().ConfigureAwait(false), + CancellationToken.None, + TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, + TaskScheduler.Default); _inTask = Task.Factory.StartNew(async () => - await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false), - CancellationToken.None, - TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, - TaskScheduler.Default); + await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false), + CancellationToken.None, + TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, + TaskScheduler.Default); } /// @@ -144,7 +146,9 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime) /// The channel options currently in use public TChannelOptions Options { get; } - private CancellationTokenSource TokenSource { get; } + /// An overall cancellation token that may be externally provided + protected CancellationTokenSource TokenSource { get; } + private Channel> OutChannel { get; } private Channel InChannel { get; } private BufferOptions BufferOptions => Options.BufferOptions; @@ -173,6 +177,7 @@ public override bool TryWrite(TEvent item) /// public async Task WaitToWriteManyAsync(IEnumerable events, CancellationToken ctx = default) { + ctx = ctx == default ? TokenSource.Token : ctx; var allWritten = true; foreach (var e in events) { @@ -187,7 +192,7 @@ public bool TryWriteMany(IEnumerable events) { var written = true; - foreach (var @event in events) + foreach (var @event in events) { written = TryWrite(@event); } @@ -226,7 +231,7 @@ private async Task ConsumeOutboundEventsAsync() var taskList = new List(maxConsumers); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) - // ReSharper disable once RemoveRedundantBraces + // ReSharper disable once RemoveRedundantBraces { if (TokenSource.Token.IsCancellationRequested) break; if (_signal is { IsSet: true }) break; @@ -255,47 +260,50 @@ private async Task ConsumeOutboundEventsAsync() private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer buffer) { - var maxRetries = Options.BufferOptions.ExportMaxRetries; - for (var i = 0; i <= maxRetries && items.Count > 0; i++) + using (buffer) { - if (TokenSource.Token.IsCancellationRequested) break; - if (_signal is { IsSet: true }) break; + var maxRetries = Options.BufferOptions.ExportMaxRetries; + for (var i = 0; i <= maxRetries && items.Count > 0; i++) + { + if (TokenSource.Token.IsCancellationRequested) break; + if (_signal is { IsSet: true }) break; - _callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count); - TResponse? response = null; + _callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count); + TResponse? response = null; - // delay if we still have items and we are not at the end of the max retry cycle - var atEndOfRetries = i == maxRetries; - try - { - response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false); - _callbacks.ExportResponseCallback?.Invoke(response, buffer); - } - catch (Exception e) - { - _callbacks.ExportExceptionCallback?.Invoke(e); - if (atEndOfRetries) - break; - } + // delay if we still have items and we are not at the end of the max retry cycle + var atEndOfRetries = i == maxRetries; + try + { + response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false); + _callbacks.ExportResponseCallback?.Invoke(response, buffer); + } + catch (Exception e) + { + _callbacks.ExportExceptionCallback?.Invoke(e); + if (atEndOfRetries) + break; + } - items = response == null - ? EmptyArraySegments.Empty - : RetryBuffer(response, items, buffer); - if (items.Count > 0 && i == 0) - _callbacks.ExportRetryableCountCallback?.Invoke(items.Count); + items = response == null + ? EmptyArraySegments.Empty + : RetryBuffer(response, items, buffer); + if (items.Count > 0 && i == 0) + _callbacks.ExportRetryableCountCallback?.Invoke(items.Count); - if (items.Count > 0 && !atEndOfRetries) - { - await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); - _callbacks.ExportRetryCallback?.Invoke(items); + if (items.Count > 0 && !atEndOfRetries) + { + await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); + _callbacks.ExportRetryCallback?.Invoke(items); + } + // otherwise if retryable items still exist and the user wants to be notified notify the user + else if (items.Count > 0 && atEndOfRetries) + _callbacks.ExportMaxRetriesCallback?.Invoke(items); } - // otherwise if retryable items still exist and the user wants to be notified notify the user - else if (items.Count > 0 && atEndOfRetries) - _callbacks.ExportMaxRetriesCallback?.Invoke(items); + _callbacks.ExportBufferCallback?.Invoke(); + if (_signal is { IsSet: false }) + _signal.Signal(); } - _callbacks.ExportBufferCallback?.Invoke(); - if (_signal is { IsSet: false }) - _signal.Signal(); } private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan maxInterval) diff --git a/src/Elastic.Channels/ChannelOptionsBase.cs b/src/Elastic.Channels/ChannelOptionsBase.cs index 41c8061..bf8a749 100644 --- a/src/Elastic.Channels/ChannelOptionsBase.cs +++ b/src/Elastic.Channels/ChannelOptionsBase.cs @@ -29,6 +29,9 @@ public abstract class ChannelOptionsBase : IChannelCallbacks< /// public bool DisableDiagnostics { get; set; } + /// Provide an external cancellation token + public CancellationToken? CancellationToken { get; set; } + /// /// Optionally provides a custom write implementation to a channel. Concrete channel implementations are not required to adhere to this config /// diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bootstrap.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bootstrap.cs index 71238f4..579cebd 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bootstrap.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bootstrap.cs @@ -33,6 +33,8 @@ public virtual async Task BootstrapElasticsearchAsync(BootstrapMethod boot { if (bootstrapMethod == BootstrapMethod.None) return true; + ctx = ctx == default ? TokenSource.Token : ctx; + var name = TemplateName; var match = TemplateWildcard; if (await IndexTemplateExistsAsync(name, ctx).ConfigureAwait(false)) return false; diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index 6539e5d..7405066 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -67,6 +67,7 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) => /// protected override Task ExportAsync(ITransport transport, ArraySegment page, CancellationToken ctx = default) { + ctx = ctx == default ? TokenSource.Token : ctx; #if NETSTANDARD2_1 // Option is obsolete to prevent external users to set it. #pragma warning disable CS0618 diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/Elastic.Ingest.Elasticsearch.IntegrationTests.csproj b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/Elastic.Ingest.Elasticsearch.IntegrationTests.csproj index 9fa458a..80f436f 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/Elastic.Ingest.Elasticsearch.IntegrationTests.csproj +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/Elastic.Ingest.Elasticsearch.IntegrationTests.csproj @@ -8,7 +8,7 @@ - + From ca3cb1cb7489e5156d4b4b91a5f115a081a65793 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Mon, 8 Apr 2024 17:28:08 +0200 Subject: [PATCH 2/6] localize using buffer to ExportBufferAsync --- src/Elastic.Channels/BufferedChannelBase.cs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 164aa03..a223c85 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -238,20 +238,17 @@ private async Task ConsumeOutboundEventsAsync() while (OutChannel.Reader.TryRead(out var buffer)) { - using (buffer) + var items = buffer.GetArraySegment(); + await _throttleTasks.WaitAsync().ConfigureAwait(false); + var t = ExportBufferAsync(items, buffer); + taskList.Add(t); + + if (taskList.Count >= maxConsumers) { - var items = buffer.GetArraySegment(); - await _throttleTasks.WaitAsync().ConfigureAwait(false); - var t = ExportBufferAsync(items, buffer); - taskList.Add(t); - - if (taskList.Count >= maxConsumers) - { - var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false); - taskList.Remove(completedTask); - } - _throttleTasks.Release(); + var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false); + taskList.Remove(completedTask); } + _throttleTasks.Release(); } } await Task.WhenAll(taskList).ConfigureAwait(false); From 43e1122b61f2a5ad53aed34e38fa051109a20981 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 9 Apr 2024 10:30:52 +0200 Subject: [PATCH 3/6] license header --- examples/playground/Program.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/playground/Program.cs b/examples/playground/Program.cs index 08626dd..4db875c 100644 --- a/examples/playground/Program.cs +++ b/examples/playground/Program.cs @@ -1,4 +1,8 @@ -using Elastic.Channels; +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Channels; using Elastic.Elasticsearch.Ephemeral; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.DataStreams; From 23b391f245271d2b6a22d1ba8bc53112bda14a20 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 9 Apr 2024 10:38:39 +0200 Subject: [PATCH 4/6] remove BOM --- examples/playground/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/playground/Program.cs b/examples/playground/Program.cs index 4db875c..f7580a6 100644 --- a/examples/playground/Program.cs +++ b/examples/playground/Program.cs @@ -1,4 +1,4 @@ -// Licensed to Elasticsearch B.V under one or more agreements. +// Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information From 25326b5a4f4c8a5bbdd213ee49fa2d59f8b7be43 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 9 Apr 2024 10:52:31 +0200 Subject: [PATCH 5/6] add Jsonpropertynames --- examples/playground/Program.cs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/examples/playground/Program.cs b/examples/playground/Program.cs index f7580a6..34e64ab 100644 --- a/examples/playground/Program.cs +++ b/examples/playground/Program.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Text.Json.Serialization; using Elastic.Channels; using Elastic.Elasticsearch.Ephemeral; using Elastic.Ingest.Elasticsearch; @@ -72,6 +73,12 @@ DataStreamChannel SetupElasticsearchChannel() { BufferOptions = bufferOptions, CancellationToken = ctxs.Token + , ExportResponseCallback = (c, t) => + { + var error = c.Items.Select(i=>i.Error).FirstOrDefault(e => e != null); + if (error == null) return; + Console.WriteLine(error.ToString()); + } }); return c; @@ -79,7 +86,10 @@ DataStreamChannel SetupElasticsearchChannel() public class EcsDocument { + [JsonPropertyName("@timestamp")] public DateTimeOffset Timestamp { init; get; } + + [JsonPropertyName("message")] public string Message { init; get; } = null!; } From 721935b1152c537c44627d8e703c2ba2639f6018 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 9 Apr 2024 11:14:16 +0200 Subject: [PATCH 6/6] Add better defaults for MaxConcurrency and allow injection of CancellationToken --- examples/playground/Program.cs | 2 +- src/Elastic.Channels/BufferOptions.cs | 9 +- src/Elastic.Channels/BufferedChannelBase.cs | 95 +++++++++++---------- 3 files changed, 57 insertions(+), 49 deletions(-) diff --git a/examples/playground/Program.cs b/examples/playground/Program.cs index 34e64ab..62a6824 100644 --- a/examples/playground/Program.cs +++ b/examples/playground/Program.cs @@ -13,7 +13,7 @@ var ctxs = new CancellationTokenSource(); var parallelOpts = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }; const int numDocs = 1_000_000; -var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs / 100, ExportMaxConcurrency = 1, OutboundBufferMaxSize = 10_000 }; +var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs, OutboundBufferMaxSize = 10_000 }; var config = new EphemeralClusterConfiguration("8.13.0"); using var cluster = new EphemeralCluster(config); using var channel = SetupElasticsearchChannel(); diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs index 4e70f24..9c17cb5 100644 --- a/src/Elastic.Channels/BufferOptions.cs +++ b/src/Elastic.Channels/BufferOptions.cs @@ -34,9 +34,14 @@ public class BufferOptions /// /// The maximum number of consumers allowed to poll for new events on the channel. - /// Defaults to 1, increase to introduce concurrency. + /// Defaults to the lesser of: + /// + /// / + /// OR + /// + /// , increase to introduce concurrency. /// - public int ExportMaxConcurrency { get; set; } = 1; + public int? ExportMaxConcurrency { get; set; } /// /// The times to retry an export if yields items to retry. diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index a223c85..b20a941 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -60,6 +60,7 @@ public abstract class BufferedChannelBase { private readonly Task _inTask; private readonly Task _outTask; + private readonly int _maxConcurrency; private readonly SemaphoreSlim _throttleTasks; private readonly CountdownEvent? _signal; @@ -92,10 +93,17 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(listeners); - var maxConsumers = Math.Max(1, BufferOptions.ExportMaxConcurrency); - _throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers); - _signal = options.BufferOptions.WaitHandle; var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize); + // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize + var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize)); + var defaultMaxConcurrency = (int)Math.Ceiling(maxIn / (double)maxOut); + _maxConcurrency = + BufferOptions.ExportMaxConcurrency.HasValue + ? BufferOptions.ExportMaxConcurrency.Value + : Math.Min(defaultMaxConcurrency, Environment.ProcessorCount * 2); + + _throttleTasks = new SemaphoreSlim(_maxConcurrency, _maxConcurrency); + _signal = options.BufferOptions.WaitHandle; InChannel = Channel.CreateBounded(new BoundedChannelOptions(maxIn) { SingleReader = false, @@ -107,8 +115,6 @@ protected BufferedChannelBase(TChannelOptions options, ICollection>( new BoundedChannelOptions(maxOut) { @@ -227,8 +233,7 @@ private async Task ConsumeOutboundEventsAsync() { _callbacks.OutboundChannelStartedCallback?.Invoke(); - var maxConsumers = Options.BufferOptions.ExportMaxConcurrency; - var taskList = new List(maxConsumers); + var taskList = new List(_maxConcurrency); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) // ReSharper disable once RemoveRedundantBraces @@ -239,11 +244,11 @@ private async Task ConsumeOutboundEventsAsync() while (OutChannel.Reader.TryRead(out var buffer)) { var items = buffer.GetArraySegment(); - await _throttleTasks.WaitAsync().ConfigureAwait(false); + await _throttleTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false); var t = ExportBufferAsync(items, buffer); taskList.Add(t); - if (taskList.Count >= maxConsumers) + if (taskList.Count >= _maxConcurrency) { var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false); taskList.Remove(completedTask); @@ -257,50 +262,48 @@ private async Task ConsumeOutboundEventsAsync() private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer buffer) { - using (buffer) + using var outboundBuffer = buffer; + var maxRetries = Options.BufferOptions.ExportMaxRetries; + for (var i = 0; i <= maxRetries && items.Count > 0; i++) { - var maxRetries = Options.BufferOptions.ExportMaxRetries; - for (var i = 0; i <= maxRetries && items.Count > 0; i++) - { - if (TokenSource.Token.IsCancellationRequested) break; - if (_signal is { IsSet: true }) break; + if (TokenSource.Token.IsCancellationRequested) break; + if (_signal is { IsSet: true }) break; - _callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count); - TResponse? response = null; + _callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count); + TResponse? response = null; - // delay if we still have items and we are not at the end of the max retry cycle - var atEndOfRetries = i == maxRetries; - try - { - response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false); - _callbacks.ExportResponseCallback?.Invoke(response, buffer); - } - catch (Exception e) - { - _callbacks.ExportExceptionCallback?.Invoke(e); - if (atEndOfRetries) - break; - } + // delay if we still have items and we are not at the end of the max retry cycle + var atEndOfRetries = i == maxRetries; + try + { + response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false); + _callbacks.ExportResponseCallback?.Invoke(response, outboundBuffer); + } + catch (Exception e) + { + _callbacks.ExportExceptionCallback?.Invoke(e); + if (atEndOfRetries) + break; + } - items = response == null - ? EmptyArraySegments.Empty - : RetryBuffer(response, items, buffer); - if (items.Count > 0 && i == 0) - _callbacks.ExportRetryableCountCallback?.Invoke(items.Count); + items = response == null + ? EmptyArraySegments.Empty + : RetryBuffer(response, items, outboundBuffer); + if (items.Count > 0 && i == 0) + _callbacks.ExportRetryableCountCallback?.Invoke(items.Count); - if (items.Count > 0 && !atEndOfRetries) - { - await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); - _callbacks.ExportRetryCallback?.Invoke(items); - } - // otherwise if retryable items still exist and the user wants to be notified notify the user - else if (items.Count > 0 && atEndOfRetries) - _callbacks.ExportMaxRetriesCallback?.Invoke(items); + if (items.Count > 0 && !atEndOfRetries) + { + await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); + _callbacks.ExportRetryCallback?.Invoke(items); } - _callbacks.ExportBufferCallback?.Invoke(); - if (_signal is { IsSet: false }) - _signal.Signal(); + // otherwise if retryable items still exist and the user wants to be notified notify the user + else if (items.Count > 0 && atEndOfRetries) + _callbacks.ExportMaxRetriesCallback?.Invoke(items); } + _callbacks.ExportBufferCallback?.Invoke(); + if (_signal is { IsSet: false }) + _signal.Signal(); } private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan maxInterval)