Skip to content

Commit

Permalink
Implement a much simpler version of PoolGremlinqClient that still use…
Browse files Browse the repository at this point in the history
…s a minimum amount of connections if the number of simultaneous requests allows, but once it exceeds this, requests are round-robin distributed between available slots.
  • Loading branch information
danielcweber committed Nov 15, 2023
1 parent 6764c5b commit 1f39916
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 86 deletions.
4 changes: 2 additions & 2 deletions src/Core.AspNet/Extensions/GremlinqConfiguratorExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ public static TConfigurator ConfigureWebSocket<TConfigurator>(this TConfigurator
if (section["Uri"] is { } uri)
configurator = configurator.At(uri);

if (uint.TryParse(connectionPoolSection[$"{nameof(ConnectionPoolSettings.MaxInProcessPerConnection)}"], out var maxInProcessPerConnection))
if (int.TryParse(connectionPoolSection[$"{nameof(ConnectionPoolSettings.MaxInProcessPerConnection)}"], out var maxInProcessPerConnection))
{
configurator = configurator
.ConfigureClientFactory(factory => factory
.WithMaxInProcessPerConnection(maxInProcessPerConnection));
}

if (uint.TryParse(connectionPoolSection[$"{nameof(ConnectionPoolSettings.PoolSize)}"], out var poolSize))
if (int.TryParse(connectionPoolSection[$"{nameof(ConnectionPoolSettings.PoolSize)}"], out var poolSize))
{
configurator = configurator
.ConfigureClientFactory(factory => factory
Expand Down
126 changes: 52 additions & 74 deletions src/Providers.Core/Factory/GremlinqClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ public IGremlinqClient GetClient()
: throw new ObjectDisposedException(nameof(Slot));
}

Interlocked.CompareExchange(ref _subClient, _poolClient._baseFactory.Create(_poolClient._environment), null);
var newClient = _poolClient._baseFactory
.Create(_poolClient._environment)
.Throttle(_poolClient._maxInProcessPerConnection);

if (Interlocked.CompareExchange(ref _subClient, newClient, null) != null)
newClient.Dispose();

return GetClient();
}
Expand All @@ -65,55 +70,29 @@ public void Invalidate(IGremlinqClient client)

public void Dispose()
{
if (Interlocked.Exchange(ref _subClient, GremlinqClient.Disposed) is { } subClient)
subClient.Dispose();
Interlocked.Exchange(ref _subClient, GremlinqClient.Disposed)?.Dispose();
}
}

private readonly struct SlotLease : IDisposable
{
public SlotLease(ConcurrentStack<Slot> slots, Slot slot)
{
Slot = slot;
Slots = slots;
}

public void Dispose()
{
Slots.Push(Slot);
}

public PoolGremlinqClientFactory<TBaseFactory>.PoolGremlinqClient.Slot Slot { get; }
public ConcurrentStack<PoolGremlinqClientFactory<TBaseFactory>.PoolGremlinqClient.Slot> Slots { get; }
}

private readonly Slot[] _slots;
private readonly int _maxInProcessPerConnection;
private readonly IGremlinqClientFactory _baseFactory;
private readonly IGremlinQueryEnvironment _environment;

private readonly Slot[] _allSlots;
private readonly ConcurrentStack<Slot>[] _pages;
private int _maxRequestsInUse = 1;
private int _currentSlotIndex = -1;
private int _currentRequestsInUse = 0;

public PoolGremlinqClient(IGremlinqClientFactory baseFactory, uint poolSize, uint maxInProcessPerConnection, IGremlinQueryEnvironment environment)
public PoolGremlinqClient(IGremlinqClientFactory baseFactory, int poolSize, int maxInProcessPerConnection, IGremlinQueryEnvironment environment)
{
_baseFactory = baseFactory;
_environment = environment;

_allSlots = new Slot[poolSize];
_pages = new ConcurrentStack<Slot>[maxInProcessPerConnection];
_slots = new Slot[poolSize];
_maxInProcessPerConnection = maxInProcessPerConnection;

for (var i = 0 ; i < poolSize; i++)
{
_allSlots[i] = new Slot(this);
}

for (var i = 0; i < maxInProcessPerConnection; i++)
{
_pages[i] = new ();

for (var j = 0 ; j < poolSize; j++)
{
_pages[i].Push(_allSlots[j]);
}
_slots[i] = new Slot(this);
}
}

Expand All @@ -123,69 +102,68 @@ public IAsyncEnumerable<ResponseMessage<T>> SubmitAsync<T>(RequestMessage messag

static async IAsyncEnumerable<ResponseMessage<T>> Core(RequestMessage message, PoolGremlinqClient @this, [EnumeratorCancellation] CancellationToken ct = default)
{
using (var lease = @this.GetSlot())
{
var slot = lease.Slot;

var client = slot.GetClient();
var currentRequestsInUse = Interlocked.Increment(ref @this._currentRequestsInUse);

try
{
while (true)
{
await using (var e = client.SubmitAsync<T>(message).WithCancellation(ct).GetAsyncEnumerator())
var maxRequestsInUse = Volatile.Read(ref @this._maxRequestsInUse);

if (currentRequestsInUse < 0 || currentRequestsInUse <= maxRequestsInUse || Interlocked.CompareExchange(ref @this._maxRequestsInUse, Math.Min(currentRequestsInUse, @this._slots.Length), maxRequestsInUse) == maxRequestsInUse)
{
while (true)
var slot = @this._slots[Math.Abs(Interlocked.Increment(ref @this._currentSlotIndex) % maxRequestsInUse)];
var client = slot.GetClient();

await using (var e = client.SubmitAsync<T>(message).WithCancellation(ct).GetAsyncEnumerator())
{
try
{
if (!await e.MoveNextAsync())
break;
}
catch
while (true)
{
slot.Invalidate(client);

throw;
try
{
if (!await e.MoveNextAsync())
break;
}
catch
{
slot.Invalidate(client);

throw;
}

yield return e.Current;
}

yield return e.Current;
}
}

yield break;
break;
}
}
}
finally
{
Interlocked.Decrement(ref @this._currentRequestsInUse);
}
}
}

public void Dispose()
{
foreach (var slot in _allSlots)
foreach (var slot in _slots)
{
slot.Dispose();
}
}

private SlotLease GetSlot()
{
for (var i = 0; i < _pages.Length; i++)
{
if (_pages[i].TryPop(out var slot))
return new SlotLease(_pages[i], slot);
}

throw new InvalidOperationException();
}
}

private readonly uint _poolSize;
private readonly int _poolSize;
private readonly TBaseFactory _baseFactory;
private readonly uint _maxInProcessPerConnection;
private readonly int _maxInProcessPerConnection;

public PoolGremlinqClientFactory(TBaseFactory baseFactory) : this(baseFactory, 8, 16)
{
}

public PoolGremlinqClientFactory(TBaseFactory baseFactory, uint poolSize, uint maxInProcessPerConnection)
public PoolGremlinqClientFactory(TBaseFactory baseFactory, int poolSize, int maxInProcessPerConnection)
{
_poolSize = poolSize;
_baseFactory = baseFactory;
Expand All @@ -194,11 +172,11 @@ public PoolGremlinqClientFactory(TBaseFactory baseFactory, uint poolSize, uint m

public IPoolGremlinqClientFactory<TBaseFactory> ConfigureBaseFactory(Func<TBaseFactory, TBaseFactory> transformation) => new PoolGremlinqClientFactory<TBaseFactory>(transformation(_baseFactory));

public IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(uint maxInProcessPerConnection) => maxInProcessPerConnection <= 64
public IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(int maxInProcessPerConnection) => maxInProcessPerConnection > 0 && maxInProcessPerConnection <= 64
? new PoolGremlinqClientFactory<TBaseFactory>(_baseFactory, _poolSize, maxInProcessPerConnection)
: throw new ArgumentOutOfRangeException(nameof(maxInProcessPerConnection));

public IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(uint poolSize) => poolSize <= 8
public IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(int poolSize) => poolSize > 0 && poolSize <= 8
? new PoolGremlinqClientFactory<TBaseFactory>(_baseFactory, poolSize, _maxInProcessPerConnection)
: throw new ArgumentOutOfRangeException(nameof(poolSize));

Expand Down
4 changes: 2 additions & 2 deletions src/Providers.Core/Factory/IPoolGremlinqClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ public interface IPoolGremlinqClientFactory<TBaseFactory> : IGremlinqClientFacto
{
IPoolGremlinqClientFactory<TBaseFactory> ConfigureBaseFactory(Func<TBaseFactory, TBaseFactory> transformation);

IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(uint poolSize);
IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(int poolSize);

IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(uint maxInProcessPerConnection);
IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(int maxInProcessPerConnection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public static class GremlinClientExtensions
{
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient ObserveResultStatusAttributes(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Action<Gremlin.Net.Driver.Messages.RequestMessage, System.Collections.Generic.IReadOnlyDictionary<string, object>> observer) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient Throttle(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, int maxConcurrency) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient TransformRequest(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Func<Gremlin.Net.Driver.Messages.RequestMessage, System.Threading.Tasks.Task<Gremlin.Net.Driver.Messages.RequestMessage>> transformation) { }
}
public static class GremlinServerExtensions
Expand Down Expand Up @@ -33,8 +34,8 @@ public interface IPoolGremlinqClientFactory<TBaseFactory> : ExRam.Gremlinq.Provi
where TBaseFactory : ExRam.Gremlinq.Providers.Core.IGremlinqClientFactory
{
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> ConfigureBaseFactory(System.Func<TBaseFactory, TBaseFactory> transformation);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(uint maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(uint poolSize);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(int maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(int poolSize);
}
public interface IProviderConfigurator<out TSelf, TClientFactory> : ExRam.Gremlinq.Core.IGremlinQuerySourceTransformation, ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
where out TSelf : ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public static class GremlinClientExtensions
{
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient ObserveResultStatusAttributes(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Action<Gremlin.Net.Driver.Messages.RequestMessage, System.Collections.Generic.IReadOnlyDictionary<string, object>> observer) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient Throttle(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, int maxConcurrency) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient TransformRequest(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Func<Gremlin.Net.Driver.Messages.RequestMessage, System.Threading.Tasks.Task<Gremlin.Net.Driver.Messages.RequestMessage>> transformation) { }
}
public static class GremlinServerExtensions
Expand Down Expand Up @@ -33,8 +34,8 @@ public interface IPoolGremlinqClientFactory<TBaseFactory> : ExRam.Gremlinq.Provi
where TBaseFactory : ExRam.Gremlinq.Providers.Core.IGremlinqClientFactory
{
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> ConfigureBaseFactory(System.Func<TBaseFactory, TBaseFactory> transformation);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(uint maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(uint poolSize);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(int maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(int poolSize);
}
public interface IProviderConfigurator<out TSelf, TClientFactory> : ExRam.Gremlinq.Core.IGremlinQuerySourceTransformation, ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
where out TSelf : ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public static class GremlinClientExtensions
{
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient ObserveResultStatusAttributes(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Action<Gremlin.Net.Driver.Messages.RequestMessage, System.Collections.Generic.IReadOnlyDictionary<string, object>> observer) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient Throttle(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, int maxConcurrency) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient TransformRequest(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Func<Gremlin.Net.Driver.Messages.RequestMessage, System.Threading.Tasks.Task<Gremlin.Net.Driver.Messages.RequestMessage>> transformation) { }
}
public static class GremlinServerExtensions
Expand Down Expand Up @@ -33,8 +34,8 @@ public interface IPoolGremlinqClientFactory<TBaseFactory> : ExRam.Gremlinq.Provi
where TBaseFactory : ExRam.Gremlinq.Providers.Core.IGremlinqClientFactory
{
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> ConfigureBaseFactory(System.Func<TBaseFactory, TBaseFactory> transformation);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(uint maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(uint poolSize);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(int maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(int poolSize);
}
public interface IProviderConfigurator<out TSelf, TClientFactory> : ExRam.Gremlinq.Core.IGremlinQuerySourceTransformation, ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
where out TSelf : ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public static class GremlinClientExtensions
{
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient ObserveResultStatusAttributes(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Action<Gremlin.Net.Driver.Messages.RequestMessage, System.Collections.Generic.IReadOnlyDictionary<string, object>> observer) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient Throttle(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, int maxConcurrency) { }
public static ExRam.Gremlinq.Providers.Core.IGremlinqClient TransformRequest(this ExRam.Gremlinq.Providers.Core.IGremlinqClient client, System.Func<Gremlin.Net.Driver.Messages.RequestMessage, System.Threading.Tasks.Task<Gremlin.Net.Driver.Messages.RequestMessage>> transformation) { }
}
public static class GremlinServerExtensions
Expand Down Expand Up @@ -33,8 +34,8 @@ public interface IPoolGremlinqClientFactory<TBaseFactory> : ExRam.Gremlinq.Provi
where TBaseFactory : ExRam.Gremlinq.Providers.Core.IGremlinqClientFactory
{
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> ConfigureBaseFactory(System.Func<TBaseFactory, TBaseFactory> transformation);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(uint maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(uint poolSize);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithMaxInProcessPerConnection(int maxInProcessPerConnection);
ExRam.Gremlinq.Providers.Core.IPoolGremlinqClientFactory<TBaseFactory> WithPoolSize(int poolSize);
}
public interface IProviderConfigurator<out TSelf, TClientFactory> : ExRam.Gremlinq.Core.IGremlinQuerySourceTransformation, ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
where out TSelf : ExRam.Gremlinq.Core.IGremlinqConfigurator<TSelf>
Expand Down

0 comments on commit 1f39916

Please sign in to comment.