Skip to content

Commit

Permalink
Bulk: Add TimerWheel to Bulk to improve latency (#1640)
Browse files Browse the repository at this point in the history
* using timerwheel

* tests
  • Loading branch information
ealsur authored and kirankumarkolli committed Jul 11, 2020
1 parent 248888c commit 6f76ae6
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 60 deletions.
24 changes: 7 additions & 17 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ namespace Microsoft.Azure.Cosmos
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

Expand All @@ -28,16 +26,16 @@ namespace Microsoft.Azure.Cosmos
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimerInSeconds = 1;
private const int MinimumDispatchTimerInSeconds = 1;
private const int TimerWheelBucketCount = 20;
private readonly static TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50);

private readonly ContainerInternal cosmosContainer;
private readonly CosmosClientContext cosmosClientContext;
private readonly int maxServerRequestBodyLength;
private readonly int maxServerRequestOperationCount;
private readonly int dispatchTimerInSeconds;
private readonly ConcurrentDictionary<string, BatchAsyncStreamer> streamersByPartitionKeyRange = new ConcurrentDictionary<string, BatchAsyncStreamer>();
private readonly ConcurrentDictionary<string, SemaphoreSlim> limitersByPartitionkeyRange = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly TimerPool timerPool;
private readonly TimerWheel timerWheel;
private readonly RetryOptions retryOptions;
private readonly int defaultMaxDegreeOfConcurrency = 50;

Expand All @@ -52,8 +50,7 @@ public BatchAsyncContainerExecutor(
ContainerInternal cosmosContainer,
CosmosClientContext cosmosClientContext,
int maxServerRequestOperationCount,
int maxServerRequestBodyLength,
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimerInSeconds)
int maxServerRequestBodyLength)
{
if (cosmosContainer == null)
{
Expand All @@ -70,17 +67,11 @@ public BatchAsyncContainerExecutor(
throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength));
}

if (dispatchTimerInSeconds < 1)
{
throw new ArgumentOutOfRangeException(nameof(dispatchTimerInSeconds));
}

this.cosmosContainer = cosmosContainer;
this.cosmosClientContext = cosmosClientContext;
this.maxServerRequestBodyLength = maxServerRequestBodyLength;
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = new TimerPool(BatchAsyncContainerExecutor.MinimumDispatchTimerInSeconds);
this.timerWheel = TimerWheel.CreateTimerWheel(BatchAsyncContainerExecutor.TimerWheelResolution, BatchAsyncContainerExecutor.TimerWheelBucketCount);
this.retryOptions = cosmosClientContext.ClientOptions.GetConnectionPolicy().RetryOptions;
}

Expand Down Expand Up @@ -116,7 +107,7 @@ public void Dispose()
limiter.Value.Dispose();
}

this.timerPool.Dispose();
this.timerWheel.Dispose();
}

internal virtual async Task ValidateOperationAsync(
Expand Down Expand Up @@ -272,8 +263,7 @@ private BatchAsyncStreamer GetOrAddStreamerForPartitionKeyRange(string partition
BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(
this.maxServerRequestOperationCount,
this.maxServerRequestBodyLength,
this.dispatchTimerInSeconds,
this.timerPool,
this.timerWheel,
limiter,
this.defaultMaxDegreeOfConcurrency,
this.cosmosClientContext.SerializerCore,
Expand Down
32 changes: 14 additions & 18 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncStreamer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

/// <summary>
/// Handles operation queueing and dispatching.
Expand All @@ -20,26 +19,27 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncBatcher"/>
internal class BatchAsyncStreamer : IDisposable
{
private static readonly TimeSpan congestionControllerDelay = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan batchTimeout = TimeSpan.FromMilliseconds(100);

private readonly object dispatchLimiter = new object();
private readonly int maxBatchOperationCount;
private readonly int maxBatchByteSize;
private readonly BatchAsyncBatcherExecuteDelegate executor;
private readonly BatchAsyncBatcherRetryDelegate retrier;
private readonly int dispatchTimerInSeconds;
private readonly CosmosSerializerCore serializerCore;
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

private readonly int congestionIncreaseFactor = 1;
private readonly int congestionControllerDelayInSeconds = 1;
private readonly int congestionDecreaseFactor = 5;
private readonly int maxDegreeOfConcurrency;
private readonly TimerWheel timerWheel;

private volatile BatchAsyncBatcher currentBatcher;
private TimerPool timerPool;
private PooledTimer currentTimer;
private TimerWheelTimer currentTimer;
private Task timerTask;

private PooledTimer congestionControlTimer;
private TimerWheelTimer congestionControlTimer;
private Task congestionControlTask;
private SemaphoreSlim limiter;

Expand All @@ -51,8 +51,7 @@ internal class BatchAsyncStreamer : IDisposable
public BatchAsyncStreamer(
int maxBatchOperationCount,
int maxBatchByteSize,
int dispatchTimerInSeconds,
TimerPool timerPool,
TimerWheel timerWheel,
SemaphoreSlim limiter,
int maxDegreeOfConcurrency,
CosmosSerializerCore serializerCore,
Expand All @@ -69,11 +68,6 @@ public BatchAsyncStreamer(
throw new ArgumentOutOfRangeException(nameof(maxBatchByteSize));
}

if (dispatchTimerInSeconds < 1)
{
throw new ArgumentOutOfRangeException(nameof(dispatchTimerInSeconds));
}

if (executor == null)
{
throw new ArgumentNullException(nameof(executor));
Expand Down Expand Up @@ -103,8 +97,7 @@ public BatchAsyncStreamer(
this.maxBatchByteSize = maxBatchByteSize;
this.executor = executor;
this.retrier = retrier;
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = timerPool;
this.timerWheel = timerWheel;
this.serializerCore = serializerCore;
this.currentBatcher = this.CreateBatchAsyncBatcher();
this.ResetTimer();
Expand Down Expand Up @@ -155,16 +148,19 @@ public void Dispose()

private void ResetTimer()
{
this.currentTimer = this.timerPool.GetPooledTimer(this.dispatchTimerInSeconds);
this.currentTimer = this.timerWheel.CreateTimer(BatchAsyncStreamer.batchTimeout);
this.timerTask = this.currentTimer.StartTimerAsync().ContinueWith((task) =>
{
this.DispatchTimer();
if (task.IsCompleted)
{
this.DispatchTimer();
}
}, this.cancellationTokenSource.Token);
}

private void StartCongestionControlTimer()
{
this.congestionControlTimer = this.timerPool.GetPooledTimer(this.congestionControllerDelayInSeconds);
this.congestionControlTimer = this.timerWheel.CreateTimer(BatchAsyncStreamer.congestionControllerDelay);
this.congestionControlTask = this.congestionControlTimer.StartTimerAsync().ContinueWith(async (task) =>
{
await this.RunCongestionControlAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task RetryOnSplit()
},
string.Empty);
mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(routingMap));
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes, 1);
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes);
TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation);

Mock.Get(mockContainer.Object)
Expand Down Expand Up @@ -117,7 +117,7 @@ public async Task RetryOnNameStale()
},
string.Empty);
mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(routingMap));
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes, 1);
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes);
TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation);

Mock.Get(mockContainer.Object)
Expand Down Expand Up @@ -177,7 +177,7 @@ public async Task RetryOn429()
},
string.Empty);
mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(routingMap));
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes, 1);
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes);
TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation);

Mock.Get(mockContainer.Object)
Expand Down Expand Up @@ -236,7 +236,7 @@ public async Task DoesNotRecalculatePartitionKeyRangeOnNoSplits()
},
string.Empty);
mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(routingMap));
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes, 1);
BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes);
TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation);

Mock.Get(mockContainer.Object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ namespace Microsoft.Azure.Cosmos.Tests
[TestClass]
public class BatchAsyncStreamerTests
{
private const int DispatchTimerInSeconds = 5;
private const int MaxBatchByteSize = 100000;
private const int defaultMaxDegreeOfConcurrency = 10;
private static Exception expectedException = new Exception();
private ItemBatchOperation ItemBatchOperation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null, "0");
private TimerPool TimerPool = new TimerPool(1);
private TimerWheel TimerWheel = TimerWheel.CreateTimerWheel(TimeSpan.FromMilliseconds(50), 20);
private SemaphoreSlim limiter = new SemaphoreSlim(1, defaultMaxDegreeOfConcurrency);

// Executor just returns a reponse matching the Id with Etag
Expand Down Expand Up @@ -79,50 +78,41 @@ private BatchAsyncBatcherExecuteDelegate Executor
[DataRow(-1)]
public void ValidatesSize(int size)
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(size, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, this.Retrier);
}

[DataTestMethod]
[ExpectedException(typeof(ArgumentOutOfRangeException))]
[DataRow(0)]
[DataRow(-1)]
public void ValidatesDispatchTimer(int dispatchTimerInSeconds)
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, dispatchTimerInSeconds, this.TimerPool, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(size, MaxBatchByteSize, this.TimerWheel, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, this.Retrier);
}

[TestMethod]
[ExpectedException(typeof(ArgumentNullException))]
public void ValidatesExecutor()
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, this.limiter, 1, MockCosmosUtil.Serializer, null, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, this.TimerWheel, this.limiter, 1, MockCosmosUtil.Serializer, null, this.Retrier);
}

[TestMethod]
[ExpectedException(typeof(ArgumentNullException))]
public void ValidatesRetrier()
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, null);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, this.TimerWheel, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, null);
}

[TestMethod]
[ExpectedException(typeof(ArgumentNullException))]
public void ValidatesSerializer()
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, this.limiter, 1, null, this.Executor, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, this.TimerWheel, this.limiter, 1, null, this.Executor, this.Retrier);
}

[TestMethod]
[ExpectedException(typeof(ArgumentNullException))]
public void ValidatesLimiter()
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, null, 1, null, this.Executor, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(1, MaxBatchByteSize, this.TimerWheel, null, 1, null, this.Executor, this.Retrier);
}

[TestMethod]
public async Task ExceptionsOnBatchBubbleUpAsync()
{
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, this.limiter, 1, MockCosmosUtil.Serializer, this.ExecutorWithFailure, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, this.TimerWheel, this.limiter, 1, MockCosmosUtil.Serializer, this.ExecutorWithFailure, this.Retrier);
ItemBatchOperationContext context = AttachContext(this.ItemBatchOperation);
batchAsyncStreamer.Add(this.ItemBatchOperation);
Exception capturedException = await Assert.ThrowsExceptionAsync<Exception>(() => context.OperationTask);
Expand All @@ -133,7 +123,7 @@ public async Task ExceptionsOnBatchBubbleUpAsync()
public async Task TimerDispatchesAsync()
{
// Bigger batch size than the amount of operations, timer should dispatch
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, this.TimerWheel, this.limiter, 1, MockCosmosUtil.Serializer, this.Executor, this.Retrier);
ItemBatchOperationContext context = AttachContext(this.ItemBatchOperation);
batchAsyncStreamer.Add(this.ItemBatchOperation);
TransactionalBatchOperationResult result = await context.OperationTask;
Expand All @@ -145,7 +135,7 @@ public async Task TimerDispatchesAsync()
public async Task ValidatesCongestionControlAsync()
{
SemaphoreSlim newLimiter = new SemaphoreSlim(1, defaultMaxDegreeOfConcurrency);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, 1, this.TimerPool, newLimiter, defaultMaxDegreeOfConcurrency, MockCosmosUtil.Serializer, this.Executor, this.Retrier);
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, this.TimerWheel, newLimiter, defaultMaxDegreeOfConcurrency, MockCosmosUtil.Serializer, this.Executor, this.Retrier);

Assert.AreEqual(newLimiter.CurrentCount, 1);

Expand Down Expand Up @@ -173,8 +163,7 @@ public async Task DispatchesAsync()
BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(
2,
MaxBatchByteSize,
DispatchTimerInSeconds,
this.TimerPool,
this.TimerWheel,
this.limiter,
1,
MockCosmosUtil.Serializer,
Expand Down

0 comments on commit 6f76ae6

Please sign in to comment.