Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose requests on JobPool Dispose #53

Merged
merged 10 commits into from
Mar 13, 2024
4 changes: 2 additions & 2 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ jobs:
- name: Install GitVersion
uses: gittools/actions/gitversion/setup@v0
with:
versionSpec: '6.x'
versionSpec: '5.x'
includePrerelease: true
preferLatestVersion: true

- name: Determine Version
id: gitversion
uses: gittools/actions/gitversion/execute@v0
Expand Down
48 changes: 48 additions & 0 deletions Sally7.Tests/RequestExecutor/JobPoolTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.Threading.Channels;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class JobPoolTests
{
[Fact]
public async Task RentJobIdAsync_Throws_If_Disposed_And_Depleted()
{
// Arrange
var sut = new JobPool(1);
sut.Dispose();
_ = await sut.RentJobIdAsync(CancellationToken.None); // Empty the pool

// Act
// Assert
await Should.ThrowAsync<ChannelClosedException>(() => sut.RentJobIdAsync(CancellationToken.None).AsTask());
}

[Fact]
public void ReturnJobId_Does_Not_Throw_If_Disposed()
{
// Arrange
var sut = new JobPool(1);
var jobId = sut.RentJobIdAsync(CancellationToken.None).Result;
sut.Dispose();

// Act
// Assert
sut.ReturnJobId(jobId);
}

[Fact]
public void Dispose_Calls_Dispose_On_Requests()
{
// Arrange
var sut = new JobPool(1);
var jobId = sut.RentJobIdAsync(CancellationToken.None).Result;
var request = sut.GetRequest(jobId);

// Act
sut.Dispose();

// Assert
Should.Throw<ObjectDisposedException>(() => request.GetResult());
}
}
35 changes: 35 additions & 0 deletions Sally7.Tests/RequestExecutor/RequestTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using FakeItEasy;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class RequestTests
{
[Fact]
public void Completes_On_Dispose()
{
// Arrange
var sut = new Request();
var callback = A.Fake<Action>();
sut.OnCompleted(callback);

// Act
sut.Dispose();

// Assert
A.CallTo(() => callback.Invoke()).MustHaveHappenedOnceExactly();
}

[Fact]
public async Task Throws_When_Awaited_After_Dispose()
{
// Arrange
var sut = new Request();

// Act
sut.Dispose();

// Assert
await Should.ThrowAsync<ObjectDisposedException>(async () => await sut);
}
}
19 changes: 19 additions & 0 deletions Sally7.Tests/RequestExecutor/SignalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Channels;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class SignalTests
{
[Fact]
public async Task WaitAsync_Throws_If_Disposed()
{
// Arrange
var sut = new Signal();
sut.Dispose();

// Act
// Assert
await Should.ThrowAsync<ChannelClosedException>(() => sut.WaitAsync(CancellationToken.None).AsTask());
}
}
8 changes: 8 additions & 0 deletions Sally7.Tests/Sally7.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
<TargetFramework>net6</TargetFramework>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<Using Include="Shouldly" />
<Using Include="Xunit" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="FakeItEasy" Version="8.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="Shouldly" Version="4.2.1" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
Expand Down
13 changes: 13 additions & 0 deletions Sally7/Internal/DisposableHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;

namespace Sally7.Internal;

internal static class DisposableHelper
{
public static void ThrowIf(bool condition, object instance)
mycroes marked this conversation as resolved.
Show resolved Hide resolved
{
void Throw() => throw new ObjectDisposedException(instance.ToString());
mycroes marked this conversation as resolved.
Show resolved Hide resolved

if (condition) Throw();
}
}
117 changes: 0 additions & 117 deletions Sally7/RequestExecutor/ConcurrentRequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
using System.Buffers;
using System.Diagnostics;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

Expand Down Expand Up @@ -187,120 +185,5 @@ public async ValueTask<Memory<byte>> PerformRequest(ReadOnlyMemory<byte> request
jobPool.ReturnJobId(jobId);
}
}

private class JobPool : IDisposable
{
private readonly Channel<int> jobIdPool;
private readonly Request[] requests;
private volatile bool disposed;

public JobPool(int maxNumberOfConcurrentRequests)
{
jobIdPool = Channel.CreateBounded<int>(maxNumberOfConcurrentRequests);
requests = new Request[maxNumberOfConcurrentRequests];

for (int i = 0; i < maxNumberOfConcurrentRequests; ++i)
{
if (!jobIdPool.Writer.TryWrite(i + 1))
{
Sally7Exception.ThrowFailedToInitJobPool();
}

requests[i] = new Request();
}
}

public void Dispose()
{
disposed = true;
jobIdPool.Writer.Complete();
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => jobIdPool.Reader.ReadAsync(cancellationToken);

public void ReturnJobId(int jobId)
{
if (!jobIdPool.Writer.TryWrite(jobId) && !disposed)
{
Sally7Exception.ThrowFailedToReturnJobIDToPool(jobId);
}
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => requests[jobId - 1];

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Request req = GetRequest(jobId);
req.Reset();
req.SetBuffer(buffer);
}
}

[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
private class Signal : IDisposable
{
private readonly Channel<int> channel = Channel.CreateBounded<int>(1);

public void Dispose() => channel.Writer.Complete();

public bool TryInit() => channel.Writer.TryWrite(0);

public ValueTask<int> WaitAsync(CancellationToken cancellationToken) => channel.Reader.ReadAsync(cancellationToken);

public bool TryRelease() => channel.Writer.TryWrite(0);

private bool NeedToWait => channel.Reader.Count == 0;
}

private class Request : INotifyCompletion
{
private static readonly Action Sentinel = () => { };

private Memory<byte> buffer;

public bool IsCompleted { get; private set; }
private int length;
private Action? continuation = Sentinel;

public Memory<byte> Buffer => buffer;

public void Complete(int length)
{
this.length = length;
IsCompleted = true;

var prev = continuation ?? Interlocked.CompareExchange(ref continuation, Sentinel, null);
prev?.Invoke();
}

public Memory<byte> GetResult()
{
return buffer.Slice(0, length);
}

public Request GetAwaiter() => this;

public void OnCompleted(Action continuation)
{
if (this.continuation == Sentinel ||
Interlocked.CompareExchange(ref this.continuation, continuation, null) == Sentinel)
{
continuation.Invoke();
}
}

public void Reset()
{
continuation = null;
IsCompleted = false;
}

public void SetBuffer(Memory<byte> buffer)
{
this.buffer = buffer;
}
}
}
}
67 changes: 67 additions & 0 deletions Sally7/RequestExecutor/JobPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

internal class JobPool : IDisposable
{
private readonly Channel<int> _jobIdPool;
private readonly Request[] _requests;
private bool _disposed;

public JobPool(int maxNumberOfConcurrentRequests)
{
_jobIdPool = Channel.CreateBounded<int>(maxNumberOfConcurrentRequests);
_requests = new Request[maxNumberOfConcurrentRequests];

for (int i = 0; i < maxNumberOfConcurrentRequests; ++i)
{
if (!_jobIdPool.Writer.TryWrite(i + 1))
{
Sally7Exception.ThrowFailedToInitJobPool();
}

_requests[i] = new Request();
}
}

public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();

foreach (var request in _requests)
{
request.Dispose();
}
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);

public void ReturnJobId(int jobId)
{
if (!_jobIdPool.Writer.TryWrite(jobId) && !Volatile.Read(ref _disposed))
{
Sally7Exception.ThrowFailedToReturnJobIDToPool(jobId);
}
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId)
{
DisposableHelper.ThrowIf(Volatile.Read(ref _disposed), this);

return _requests[jobId - 1];
}

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Request req = GetRequest(jobId);
req.Reset();
req.SetBuffer(buffer);
}
}
Loading
Loading