Skip to content

Commit

Permalink
Arrange pool slots in pages. The previous way had weaknesses: If a si…
Browse files Browse the repository at this point in the history
…ngle connection takes very long for 2 requests and then finishes those two request right after another, the connection slot gets put on the stack twice at the top. Now, we take from pages where each page contains all the slots, and put back into that very page.
  • Loading branch information
danielcweber committed Nov 13, 2023
1 parent 07cdaca commit a9b970e
Showing 1 changed file with 57 additions and 66 deletions.
123 changes: 57 additions & 66 deletions src/Providers.Core/Factory/GremlinqClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,51 @@ public void Dispose()
}
}

private ImmutableStack<Slot>? _slots;
private readonly struct SlotLease : IDisposable
{
public SlotLease(ConcurrentStack<Slot> slots, Slot slot)
{
Slot = slot;
Slots = slots;
}

public void Dispose()
{
Slots.Push(Slot);
}

public PoolGremlinqClientFactory<TBaseFactory>.PoolGremlinqClient.Slot Slot { get; }
public ConcurrentStack<PoolGremlinqClientFactory<TBaseFactory>.PoolGremlinqClient.Slot> Slots { get; }
}

private readonly IGremlinqClientFactory _baseFactory;
private readonly IGremlinQueryEnvironment _environment;

private readonly Slot[] _allSlots;
private readonly ConcurrentStack<Slot>[] _pages;

public PoolGremlinqClient(IGremlinqClientFactory baseFactory, uint poolSize, uint maxInProcessPerConnection, IGremlinQueryEnvironment environment)
{
_baseFactory = baseFactory;
_environment = environment;

var slotStack = ImmutableStack<Slot>.Empty;
_allSlots = new Slot[poolSize];
_pages = new ConcurrentStack<Slot>[maxInProcessPerConnection];

var slots = Enumerable.Range(0, (int)poolSize)
.Select(x => new Slot(this))
.ToArray();
for (var i = 0 ; i < poolSize; i++)
{
_allSlots[i] = new Slot(this);
}

for (var i = 0; i < maxInProcessPerConnection; i++)
{
foreach (var slot in slots)
_pages[i] = new ();

for (var j = 0 ; j < poolSize; j++)
{
slotStack = slotStack.Push(slot);
_pages[i].Push(_allSlots[j]);
}
}

_slots = slotStack;
}

public IAsyncEnumerable<ResponseMessage<T>> SubmitAsync<T>(RequestMessage message)
Expand All @@ -103,86 +123,57 @@ public IAsyncEnumerable<ResponseMessage<T>> SubmitAsync<T>(RequestMessage messag

static async IAsyncEnumerable<ResponseMessage<T>> Core(RequestMessage message, PoolGremlinqClient @this, [EnumeratorCancellation] CancellationToken ct = default)
{
if (@this.TryGetSlot() is { } slot)
using (var lease = @this.GetSlot())
{
try
{
var client = slot.GetClient();
var slot = lease.Slot;

var client = slot.GetClient();

while (true)
while (true)
{
await using (var e = client.SubmitAsync<T>(message).WithCancellation(ct).GetAsyncEnumerator())
{
await using (var e = client.SubmitAsync<T>(message).WithCancellation(ct).GetAsyncEnumerator())
while (true)
{
while (true)
try
{
try
{
if (!await e.MoveNextAsync())
break;
}
catch
{
slot.Invalidate(client);

throw;
}

yield return e.Current;
if (!await e.MoveNextAsync())
break;
}
}
catch
{
slot.Invalidate(client);

yield break;
throw;
}

yield return e.Current;
}
}
}
finally
{
@this.ReturnSlot(slot);

yield break;
}
}
}
}

public void Dispose()
{
if (Interlocked.Exchange(ref _slots, null) is { } slots)
foreach (var slot in _allSlots)
{
foreach (var slot in slots)
{
slot.Dispose();
}
slot.Dispose();
}
}

private Slot? TryGetSlot()
private SlotLease GetSlot()
{
while (true)
for (var i = 0; i < _pages.Length; i++)
{
var currentSlots = _slots;

if (currentSlots?.IsEmpty is false)
{
var newSlots = currentSlots.Pop(out var slot);

if (Interlocked.CompareExchange(ref _slots, newSlots, currentSlots) == currentSlots)
return slot;
}

return null;
if (_pages[i].TryPop(out var slot))
return new SlotLease(_pages[i], slot);
}
}

private void ReturnSlot(Slot slot)
{
while (true)
{
var maybeCurrentSlots = _slots;

if (maybeCurrentSlots is { } currentSlots)
{
if (Interlocked.CompareExchange(ref _slots, currentSlots.Push(slot), currentSlots) == currentSlots)
return;
}
}
throw new InvalidOperationException();
}
}

Expand Down

0 comments on commit a9b970e

Please sign in to comment.