Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we eagerly return the outbound array to the array pool #52

Merged
merged 6 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions elastic-ingest-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Elastic.Ingest.Elasticsearc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Performance.Common", "benchmarks\Performance.Common\Performance.Common.csproj", "{077B0564-26D1-4FF1-98A5-633EAA9BE051}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "playground", "examples\playground\playground.csproj", "{C0E73713-EA13-403C-BE43-F2164AB6CF73}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -138,6 +140,10 @@ Global
{077B0564-26D1-4FF1-98A5-633EAA9BE051}.Debug|Any CPU.Build.0 = Debug|Any CPU
{077B0564-26D1-4FF1-98A5-633EAA9BE051}.Release|Any CPU.ActiveCfg = Release|Any CPU
{077B0564-26D1-4FF1-98A5-633EAA9BE051}.Release|Any CPU.Build.0 = Release|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -159,6 +165,7 @@ Global
{1609BC01-FEA6-4818-9C86-9DB7B82FCB48} = {1C174D50-20D2-4006-9681-FDDB39351D99}
{DE05C98F-C410-4ED0-A77B-36B66C784105} = {1C174D50-20D2-4006-9681-FDDB39351D99}
{077B0564-26D1-4FF1-98A5-633EAA9BE051} = {1C174D50-20D2-4006-9681-FDDB39351D99}
{C0E73713-EA13-403C-BE43-F2164AB6CF73} = {B67CBB46-74C1-47EB-9E41-D55C5E0E0D85}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {87AD3256-8FD8-4C94-AB66-5ADBAC939722}
Expand Down
17 changes: 11 additions & 6 deletions examples/Elastic.Channels.Continuous/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
using Elastic.Channels;
using Elastic.Channels.Diagnostics;

var ctxs = new CancellationTokenSource();
Console.CancelKeyPress += (sender, eventArgs) => {
ctxs.Cancel();
eventArgs.Cancel = true;
};

var options = new NoopBufferedChannel.NoopChannelOptions
{
BufferOptions = new BufferOptions()
Expand All @@ -19,19 +25,18 @@

};
var channel = new DiagnosticsBufferedChannel(options);
for (long i = 0; i < long.MaxValue; i++)
await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) =>
{
var e = new NoopBufferedChannel.NoopEvent { Id = i };
var written = false;
var ready = await channel.WaitToWriteAsync();
//Console.Write('.');
var ready = await channel.WaitToWriteAsync(ctx);
if (ready) written = channel.TryWrite(e);
if (!written || channel.BufferMismatches > 0)
if (!written)
{
Console.WriteLine();
Console.WriteLine(channel);
Console.WriteLine(i);
Environment.Exit(1);
}

}

});
95 changes: 95 additions & 0 deletions examples/playground/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Text.Json.Serialization;
using Elastic.Channels;
using Elastic.Elasticsearch.Ephemeral;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Transport;

var random = new Random();
var ctxs = new CancellationTokenSource();
var parallelOpts = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token };
const int numDocs = 1_000_000;
var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs, OutboundBufferMaxSize = 10_000 };
var config = new EphemeralClusterConfiguration("8.13.0");
using var cluster = new EphemeralCluster(config);
using var channel = SetupElasticsearchChannel();

Console.CancelKeyPress += (sender, eventArgs) =>
{
ctxs.Cancel();
cluster.Dispose();
eventArgs.Cancel = true;
};


using var started = cluster.Start();

var memoryBefore = GC.GetTotalMemory(false);

await PushToChannel(channel);

// This is not really indicative because the channel is still draining at this point in time
var memoryAfter = GC.GetTotalMemory(false);
Console.WriteLine($"Memory before: {memoryBefore} bytes");
Console.WriteLine($"Memory after: {memoryAfter} bytes");
var memoryUsed = memoryAfter - memoryBefore;
Console.WriteLine($"Memory used: {memoryUsed} bytes");

Console.WriteLine($"Press any key...");
Console.ReadKey();


async Task PushToChannel(DataStreamChannel<EcsDocument> c)
{
if (c == null) throw new ArgumentNullException(nameof(c));

await c.BootstrapElasticsearchAsync(BootstrapMethod.Failure);
await Parallel.ForEachAsync(Enumerable.Range(0, numDocs), parallelOpts, async (i, ctx) =>
{
await DoChannelWrite(i, ctx);
});

async Task DoChannelWrite(int i, CancellationToken cancellationToken)
{
var message = $"Logging information {i} - Random value: {random.NextDouble()}";
var doc = new EcsDocument { Timestamp = DateTimeOffset.UtcNow, Message = message };
if (await c.WaitToWriteAsync(cancellationToken) && c.TryWrite(doc))
return;

Console.WriteLine("Failed To write");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}

DataStreamChannel<EcsDocument> SetupElasticsearchChannel()
{
var transportConfiguration = new TransportConfiguration(new Uri("http://localhost:9200"));
var c = new DataStreamChannel<EcsDocument>(
new DataStreamChannelOptions<EcsDocument>(new DistributedTransport(transportConfiguration))
{
BufferOptions = bufferOptions,
CancellationToken = ctxs.Token
, ExportResponseCallback = (c, t) =>
{
var error = c.Items.Select(i=>i.Error).FirstOrDefault(e => e != null);
if (error == null) return;
Console.WriteLine(error.ToString());
}
});

return c;
}

public class EcsDocument
{
[JsonPropertyName("@timestamp")]
public DateTimeOffset Timestamp { init; get; }

[JsonPropertyName("message")]
public string Message { init; get; } = null!;
}

18 changes: 18 additions & 0 deletions examples/playground/playground.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Ingest.Elasticsearch\Elastic.Ingest.Elasticsearch.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elastic.Elasticsearch.Ephemeral" Version="0.5.0" />
</ItemGroup>

</Project>
9 changes: 7 additions & 2 deletions src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ public class BufferOptions

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
/// <para>Defaults to <c>1</c>, increase to introduce concurrency.</para>
/// <para>Defaults to the lesser of:</para>
/// <list type="bullet">
/// <item><see cref="InboundBufferMaxSize"/>/<see cref="OutboundBufferMaxSize"/></item>
/// <item>OR <see cref="Environment.ProcessorCount"/></item>
/// </list>
/// <para>, increase to introduce concurrency.</para>
/// </summary>
public int ExportMaxConcurrency { get; set; } = 1;
public int? ExportMaxConcurrency { get; set; }

/// <summary>
/// The times to retry an export if <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields items to retry.
Expand Down
74 changes: 41 additions & 33 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
{
private readonly Task _inTask;
private readonly Task _outTask;
private readonly int _maxConcurrency;
private readonly SemaphoreSlim _throttleTasks;
private readonly CountdownEvent? _signal;

Expand All @@ -74,7 +75,9 @@ protected BufferedChannelBase(TChannelOptions options) : this(options, null) { }
/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
{
TokenSource = new CancellationTokenSource();
TokenSource = options.CancellationToken.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value)
: new CancellationTokenSource();
Options = options;

var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
Expand All @@ -90,10 +93,17 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
}
_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);

var maxConsumers = Math.Max(1, BufferOptions.ExportMaxConcurrency);
_throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers);
_signal = options.BufferOptions.WaitHandle;
var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize);
// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize
var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize));
var defaultMaxConcurrency = (int)Math.Ceiling(maxIn / (double)maxOut);
_maxConcurrency =
BufferOptions.ExportMaxConcurrency.HasValue
? BufferOptions.ExportMaxConcurrency.Value
: Math.Min(defaultMaxConcurrency, Environment.ProcessorCount * 2);

_throttleTasks = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
_signal = options.BufferOptions.WaitHandle;
InChannel = Channel.CreateBounded<TEvent>(new BoundedChannelOptions(maxIn)
{
SingleReader = false,
Expand All @@ -105,8 +115,6 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = BoundedChannelFullMode.Wait
});
// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize
var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize));
OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
new BoundedChannelOptions(maxOut)
{
Expand All @@ -123,16 +131,16 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
InboundBuffer = new InboundBuffer<TEvent>(maxOut, BufferOptions.OutboundBufferMaxLifetime);

_outTask = Task.Factory.StartNew(async () =>
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);

_inTask = Task.Factory.StartNew(async () =>
await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
}

/// <summary>
Expand All @@ -144,7 +152,9 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime)
/// <summary>The channel options currently in use</summary>
public TChannelOptions Options { get; }

private CancellationTokenSource TokenSource { get; }
/// <summary> An overall cancellation token that may be externally provided </summary>
protected CancellationTokenSource TokenSource { get; }

private Channel<IOutboundBuffer<TEvent>> OutChannel { get; }
private Channel<TEvent> InChannel { get; }
private BufferOptions BufferOptions => Options.BufferOptions;
Expand Down Expand Up @@ -173,6 +183,7 @@ public override bool TryWrite(TEvent item)
/// <inheritdoc cref="IBufferedChannel{TEvent}.WaitToWriteManyAsync"/>
public async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
var allWritten = true;
foreach (var e in events)
{
Expand All @@ -187,7 +198,7 @@ public bool TryWriteMany(IEnumerable<TEvent> events)
{
var written = true;

foreach (var @event in events)
foreach (var @event in events)
{
written = TryWrite(@event);
}
Expand Down Expand Up @@ -222,31 +233,27 @@ private async Task ConsumeOutboundEventsAsync()
{
_callbacks.OutboundChannelStartedCallback?.Invoke();

var maxConsumers = Options.BufferOptions.ExportMaxConcurrency;
var taskList = new List<Task>(maxConsumers);
var taskList = new List<Task>(_maxConcurrency);

while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
// ReSharper disable once RemoveRedundantBraces
// ReSharper disable once RemoveRedundantBraces
{
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;

while (OutChannel.Reader.TryRead(out var buffer))
{
using (buffer)
var items = buffer.GetArraySegment();
await _throttleTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false);
var t = ExportBufferAsync(items, buffer);
taskList.Add(t);

if (taskList.Count >= _maxConcurrency)
{
var items = buffer.GetArraySegment();
await _throttleTasks.WaitAsync().ConfigureAwait(false);
var t = ExportBufferAsync(items, buffer);
taskList.Add(t);

if (taskList.Count >= maxConsumers)
{
var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false);
taskList.Remove(completedTask);
}
_throttleTasks.Release();
var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false);
taskList.Remove(completedTask);
}
_throttleTasks.Release();
}
}
await Task.WhenAll(taskList).ConfigureAwait(false);
Expand All @@ -255,6 +262,7 @@ private async Task ConsumeOutboundEventsAsync()

private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer<TEvent> buffer)
{
using var outboundBuffer = buffer;
var maxRetries = Options.BufferOptions.ExportMaxRetries;
for (var i = 0; i <= maxRetries && items.Count > 0; i++)
{
Expand All @@ -269,7 +277,7 @@ private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer
try
{
response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false);
_callbacks.ExportResponseCallback?.Invoke(response, buffer);
_callbacks.ExportResponseCallback?.Invoke(response, outboundBuffer);
}
catch (Exception e)
{
Expand All @@ -280,7 +288,7 @@ private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer

items = response == null
? EmptyArraySegments<TEvent>.Empty
: RetryBuffer(response, items, buffer);
: RetryBuffer(response, items, outboundBuffer);
if (items.Count > 0 && i == 0)
_callbacks.ExportRetryableCountCallback?.Invoke(items.Count);

Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Channels/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public abstract class ChannelOptionsBase<TEvent, TResponse> : IChannelCallbacks<
/// </summary>
public bool DisableDiagnostics { get; set; }

/// <summary> Provide an external cancellation token </summary>
public CancellationToken? CancellationToken { get; set; }

/// <summary>
/// Optionally provides a custom write implementation to a channel. Concrete channel implementations are not required to adhere to this config
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public virtual async Task<bool> BootstrapElasticsearchAsync(BootstrapMethod boot
{
if (bootstrapMethod == BootstrapMethod.None) return true;

ctx = ctx == default ? TokenSource.Token : ctx;

var name = TemplateName;
var match = TemplateWildcard;
if (await IndexTemplateExistsAsync(name, ctx).ConfigureAwait(false)) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
/// <inheritdoc cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.ExportAsync(Elastic.Transport.ITransport,System.ArraySegment{TEvent},System.Threading.CancellationToken)"/>
protected override Task<BulkResponse> ExportAsync(ITransport transport, ArraySegment<TEvent> page, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
#if NETSTANDARD2_1
// Option is obsolete to prevent external users to set it.
#pragma warning disable CS0618
Expand Down
Loading