Skip to content

Commit

Permalink
Introduce concurrent queues
Browse files Browse the repository at this point in the history
  • Loading branch information
schoims committed Dec 8, 2022
1 parent b860eb7 commit a1cb21d
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions iothub/device/src/Transport/Mqtt/SimpleWorkQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
Expand All @@ -20,14 +20,14 @@ namespace Microsoft.Azure.Devices.Client.Transport.Mqtt
internal class SimpleWorkQueue<TWork>
{
private readonly Func<IChannelHandlerContext, TWork, Task> _workerAsync;
private readonly Queue<TWork> _backlogQueue;
private readonly ConcurrentQueue<TWork> _backlogQueue;
private readonly TaskCompletionSource _completionSource;

public SimpleWorkQueue(Func<IChannelHandlerContext, TWork, Task> workerAsync)
{
_workerAsync = workerAsync;
_completionSource = new TaskCompletionSource();
_backlogQueue = new Queue<TWork>();
_backlogQueue = new ConcurrentQueue<TWork>();
}

protected States State { get; set; }
Expand Down Expand Up @@ -106,9 +106,8 @@ public virtual void Abort(Exception exception)
case States.FinalProcessing:
State = States.Aborted;

while (_backlogQueue.Any())
while (_backlogQueue.TryDequeue(out TWork workItem))
{
TWork workItem = _backlogQueue.Dequeue();
ReferenceCountUtil.Release(workItem);

var cancellableWorkItem = workItem as ICancellable;
Expand Down Expand Up @@ -141,9 +140,9 @@ private async void StartWorkQueueProcessingAsync(IChannelHandlerContext context)
try
{
while (_backlogQueue.Any()
&& State != States.Aborted)
&& State != States.Aborted
&& _backlogQueue.TryDequeue(out TWork workItem))
{
TWork workItem = _backlogQueue.Dequeue();
await DoWorkAsync(context, workItem).ConfigureAwait(false);
}

Expand Down

0 comments on commit a1cb21d

Please sign in to comment.