Skip to content

Commit

Permalink
Simplified DiskWriterQueue with blocking concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
ltetak committed Jan 25, 2024
1 parent 262415a commit 33f85d5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 53 deletions.
2 changes: 0 additions & 2 deletions LiteDB/Engine/Disk/DiskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable<PageBuffer> pages)
count++;
}

_queue.Value.Run();

return count;
}

Expand Down
73 changes: 22 additions & 51 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;
Expand All @@ -18,14 +17,17 @@ internal class DiskWriterQueue : IDisposable

// async thread controls
private Task _task;
private bool _shouldClose = false;

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();

private int _running = 0;
private readonly object _queueSync = new object();
private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false);
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

public DiskWriterQueue(Stream stream)
{
_stream = stream;
_task = Task.Run(ExecuteQueue);
}

/// <summary>
Expand All @@ -40,27 +42,11 @@ public DiskWriterQueue(Stream stream)
public void EnqueuePage(PageBuffer page)
{
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

_queue.Enqueue(page);
}

/// <summary>
/// If queue contains pages and are not running, starts run queue again now
/// </summary>
public void Run()
{
lock (_queue)
lock (_queueSync)
{
if (_queue.Count == 0) return;

var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 0)
{
// Schedule a new thread to process the pages in the queue.
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
_task = Task.Run(ExecuteQueue);
}
_queueIsEmpty.Reset();
_queue.Enqueue(page);
_queueHasItems.Set();
}
}

Expand All @@ -69,16 +55,7 @@ public void Run()
/// </summary>
public void Wait()
{
lock (_queue)
{
if (_task != null)
{
_task.Wait();
}

Run();
}

_queueIsEmpty.Wait();
ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
}

Expand All @@ -87,35 +64,25 @@ public void Wait()
/// </summary>
private void ExecuteQueue()
{
do
while (true)
{
if (_queue.TryDequeue(out var page))
{
WritePageToStream(page);
}

while (page == null)
else
{
_stream.FlushToDisk();
Volatile.Write(ref _running, 0);

if (!_queue.Any()) return;

// Another item was added to the queue after we detected it was empty.
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 1)
lock (_queueSync)
{
// A new thread was already scheduled for execution, this thread can return.
return;
if (_queue.Count > 0) continue;
_queueIsEmpty.Set();
}

// This thread will continue to process the queue as a new thread was not scheduled.
_queue.TryDequeue(out page);
WritePageToStream(page);
_queueHasItems.Wait();
if (_shouldClose) return;
}

} while (true);
}
}

private void WritePageToStream(PageBuffer page)
Expand All @@ -137,8 +104,12 @@ public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;

// run all items in queue before dispose
this.Wait();
_task?.Wait();
_task = null;
}
}
}

0 comments on commit 33f85d5

Please sign in to comment.