Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk: Refactors async task handling and reduces allocations #2440

Merged
merged 19 commits into from
May 6, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(

await this.ValidateOperationAsync(operation, itemRequestOptions, cancellationToken);

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, NoOpTrace.Singleton, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
operation.AttachContext(context);
Expand Down Expand Up @@ -183,7 +183,7 @@ private async Task ReBatchAsync(
{
using (ITrace retryTrace = trace.StartChild("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Info))
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, retryTrace, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
Expand All @@ -192,13 +192,18 @@ private async Task ReBatchAsync(

private async Task<string> ResolvePartitionKeyRangeIdAsync(
ItemBatchOperation operation,
ITrace trace,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
PartitionKeyDefinition partitionKeyDefinition = await this.cosmosContainer.GetPartitionKeyDefinitionAsync(cancellationToken);
ContainerProperties cachedContainerPropertiesAsync = await this.cosmosContainer.GetCachedContainerPropertiesAsync(
ealsur marked this conversation as resolved.
Show resolved Hide resolved
forceRefresh: false,
trace: trace,
cancellationToken: cancellationToken);
PartitionKeyDefinition partitionKeyDefinition = cachedContainerPropertiesAsync?.PartitionKey;
CollectionRoutingMap collectionRoutingMap = await this.cosmosContainer.GetRoutingMapAsync(cancellationToken);

Debug.Assert(operation.RequestOptions?.Properties?.TryGetValue(WFConstants.BackendHeaders.EffectivePartitionKeyString, out object epkObj) == null, "EPK is not supported");
Debug.Assert(operation.RequestOptions?.Properties?.TryGetValue(WFConstants.BackendHeaders.EffectivePartitionKeyString, out object _) == null, "EPK is not supported");
Documents.Routing.PartitionKeyInternal partitionKeyInternal = await this.GetPartitionKeyInternalAsync(operation, cancellationToken);
operation.PartitionKeyJson = partitionKeyInternal.ToJsonString();
string effectivePartitionKeyString = partitionKeyInternal.GetEffectivePartitionKeyString(partitionKeyDefinition);
Expand Down
15 changes: 9 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncStreamer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,16 @@ public void Dispose()
private void ResetTimer()
{
this.currentTimer = this.timerWheel.CreateTimer(BatchAsyncStreamer.batchTimeout);
this.timerTask = this.currentTimer.StartTimerAsync().ContinueWith((task) =>
this.timerTask = this.GetTimerTaskAsync();
}

private async Task GetTimerTaskAsync()
{
await this.currentTimer.StartTimerAsync();
if (!this.cancellationTokenSource.IsCancellationRequested)
{
if (task.IsCompleted)
{
this.DispatchTimer();
}
}, this.cancellationTokenSource.Token);
this.DispatchTimer();
}
}

private void StartCongestionControlTimer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class ItemBatchOperationContext : IDisposable

private readonly IDocumentClientRetryPolicy retryPolicy;

private readonly TaskCompletionSource<TransactionalBatchOperationResult> taskCompletionSource = new TaskCompletionSource<TransactionalBatchOperationResult>();
private readonly TaskCompletionSource<TransactionalBatchOperationResult> taskCompletionSource = new TaskCompletionSource<TransactionalBatchOperationResult>(TaskCreationOptions.RunContinuationsAsynchronously);

public ItemBatchOperationContext(
string partitionKeyRangeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public override async Task InitializeAsync()

public override async Task AddOrUpdateLeaseAsync(DocumentServiceLease lease)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

if (!this.currentlyOwnedPartitions.TryAdd(lease.CurrentLeaseToken, tcs))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal TimerWheelTimerCore(

this.timerWheel = timerWheel ?? throw new ArgumentNullException(nameof(timerWheel));
this.Timeout = timeoutPeriod;
this.taskCompletionSource = new TaskCompletionSource<object>();
this.taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
this.memberLock = new object();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ public class MockedItemBenchmarkHelper
/// </summary>
public MockedItemBenchmarkHelper(
bool useCustomSerializer = false,
bool includeDiagnosticsToString = false)
bool includeDiagnosticsToString = false,
bool useBulk = false)
{
this.TestClient = MockDocumentClient.CreateMockCosmosClient(useCustomSerializer);
this.TestClient = MockDocumentClient.CreateMockCosmosClient(useCustomSerializer, (builder) => builder.WithBulkExecution(useBulk));
this.TestContainer = this.TestClient.GetDatabase("myDB").GetContainer("myColl");
this.IncludeDiagnosticsToString = includeDiagnosticsToString;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// ----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ----------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Performance.Tests.Benchmarks
{
using System.Threading.Tasks;

public interface IItemBulkBenchmark
{
public Task CreateItem();

public Task UpsertItem();

public Task ReadItem();

public Task UpdateItem();

public Task DeleteItem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// ----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ----------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Performance.Tests.Benchmarks
{
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;

[Config(typeof(SdkBenchmarkConfiguration))]
public class MockedItemBulkBenchmark : IItemBulkBenchmark
{
public static readonly IItemBulkBenchmark[] IterParameters = new IItemBulkBenchmark[]
{
new MockedItemStreamBulkBenchmark(),
new MockedItemOfTBulkBenchmark() { BenchmarkHelper = new MockedItemBenchmarkHelper(useBulk: true) },
new MockedItemOfTBulkBenchmark() { BenchmarkHelper = new MockedItemBenchmarkHelper(useCustomSerializer: true, useBulk: true) },
new MockedItemOfTBulkBenchmark() { BenchmarkHelper = new MockedItemBenchmarkHelper(useCustomSerializer: false, includeDiagnosticsToString: true, useBulk: true) },
};

[Params(ScenarioType.Stream, ScenarioType.OfT, ScenarioType.OfTWithDiagnosticsToString, ScenarioType.OfTCustom)]
public ScenarioType Type
{
get;
set;
}

private IItemBulkBenchmark CurrentBenchmark => MockedItemBulkBenchmark.IterParameters[(int)this.Type];

[Benchmark]
[BenchmarkCategory("GateBenchmark")]
public async Task CreateItem()
{
await this.CurrentBenchmark.CreateItem();
}

[Benchmark]
[BenchmarkCategory("GateBenchmark")]
public async Task DeleteItem()
{
await this.CurrentBenchmark.DeleteItem();
}

[Benchmark]
[BenchmarkCategory("GateBenchmark")]
public async Task ReadItem()
{
await this.CurrentBenchmark.ReadItem();
}

[Benchmark]
[BenchmarkCategory("GateBenchmark")]
public async Task UpdateItem()
{
await this.CurrentBenchmark.UpdateItem();
}

[Benchmark]
[BenchmarkCategory("GateBenchmark")]
public async Task UpsertItem()
{
await this.CurrentBenchmark.UpsertItem();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// ----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ----------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Performance.Tests.Benchmarks
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

public class MockedItemOfTBulkBenchmark : IItemBulkBenchmark
{
public MockedItemBenchmarkHelper BenchmarkHelper { get; set; }

public async Task CreateItem()
{
ItemResponse<ToDoActivity> response = await this.BenchmarkHelper.TestContainer.CreateItemAsync<ToDoActivity>(
ealsur marked this conversation as resolved.
Show resolved Hide resolved
this.BenchmarkHelper.TestItem,
MockedItemBenchmarkHelper.ExistingPartitionId);

if ((int)response.StatusCode > 300 || response.Resource == null)
{
throw new Exception();
}

this.BenchmarkHelper.IncludeDiagnosticToStringHelper(response.Diagnostics);
}

public async Task UpsertItem()
{
ItemResponse<ToDoActivity> response = await this.BenchmarkHelper.TestContainer.UpsertItemAsync<ToDoActivity>(
this.BenchmarkHelper.TestItem,
MockedItemBenchmarkHelper.ExistingPartitionId);

if ((int)response.StatusCode > 300 || response.Resource == null)
{
throw new Exception();
}

this.BenchmarkHelper.IncludeDiagnosticToStringHelper(response.Diagnostics);
}

public async Task ReadItem()
{
ItemResponse<ToDoActivity> response = await this.BenchmarkHelper.TestContainer.ReadItemAsync<ToDoActivity>(
MockedItemBenchmarkHelper.ExistingItemId,
MockedItemBenchmarkHelper.ExistingPartitionId);

if ((int)response.StatusCode > 300 || response.Resource == null)
{
throw new Exception();
}

this.BenchmarkHelper.IncludeDiagnosticToStringHelper(response.Diagnostics);
}

public async Task UpdateItem()
{
ItemResponse<ToDoActivity> response = await this.BenchmarkHelper.TestContainer.ReplaceItemAsync<ToDoActivity>(
this.BenchmarkHelper.TestItem,
MockedItemBenchmarkHelper.ExistingItemId,
MockedItemBenchmarkHelper.ExistingPartitionId);

if ((int)response.StatusCode > 300 || response.Resource == null)
{
throw new Exception();
}

this.BenchmarkHelper.IncludeDiagnosticToStringHelper(response.Diagnostics);
}

public async Task DeleteItem()
{
ItemResponse<ToDoActivity> response = await this.BenchmarkHelper.TestContainer.DeleteItemAsync<ToDoActivity>(
MockedItemBenchmarkHelper.ExistingItemId,
MockedItemBenchmarkHelper.ExistingPartitionId);

if ((int)response.StatusCode > 300 || response.Resource == null)
{
throw new Exception();
}

this.BenchmarkHelper.IncludeDiagnosticToStringHelper(response.Diagnostics);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// ----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ----------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Performance.Tests.Benchmarks
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

public class MockedItemStreamBulkBenchmark : IItemBulkBenchmark
{
private readonly MockedItemBenchmarkHelper benchmarkHelper;

public MockedItemStreamBulkBenchmark()
{
this.benchmarkHelper = new MockedItemBenchmarkHelper(useBulk: true);
}

public async Task CreateItem()
{
using (MemoryStream ms = this.benchmarkHelper.GetItemPayloadAsStream())
using (ResponseMessage response = await this.benchmarkHelper.TestContainer.CreateItemStreamAsync(
ms,
new Cosmos.PartitionKey(MockedItemBenchmarkHelper.ExistingItemId)))
{
if ((int)response.StatusCode > 300 || response.Content == null)
{
throw new Exception();
}
}
}

public async Task UpsertItem()
{
using (MemoryStream ms = this.benchmarkHelper.GetItemPayloadAsStream())
using (ResponseMessage response = await this.benchmarkHelper.TestContainer.UpsertItemStreamAsync(
ms,
new Cosmos.PartitionKey(MockedItemBenchmarkHelper.ExistingItemId)))
{
if ((int)response.StatusCode > 300 || response.Content == null)
{
throw new Exception();
}
}
}

public async Task ReadItem()
{
using (ResponseMessage response = await this.benchmarkHelper.TestContainer.ReadItemStreamAsync(
MockedItemBenchmarkHelper.ExistingItemId,
new Cosmos.PartitionKey(MockedItemBenchmarkHelper.ExistingItemId)))
{
if (response.StatusCode == System.Net.HttpStatusCode.NotFound || response.Content == null)
{
throw new Exception();
}
}
}

public async Task ReadItemWithDiagnosticToString()
{
using (ResponseMessage response = await this.benchmarkHelper.TestContainer.ReadItemStreamAsync(
MockedItemBenchmarkHelper.ExistingItemId,
new Cosmos.PartitionKey(MockedItemBenchmarkHelper.ExistingItemId)))
{
if (response.StatusCode == System.Net.HttpStatusCode.NotFound || response.Content == null)
{
throw new Exception();
}

string diagnostics = response.Diagnostics.ToString();
if (string.IsNullOrEmpty(diagnostics))
{
throw new Exception();
}
}
}

public async Task UpdateItem()
{
using (MemoryStream ms = this.benchmarkHelper.GetItemPayloadAsStream())
using (ResponseMessage response = await this.benchmarkHelper.TestContainer.ReplaceItemStreamAsync(
ms,
MockedItemBenchmarkHelper.ExistingItemId,
new Cosmos.PartitionKey(MockedItemBenchmarkHelper.ExistingItemId)))
{
if (response.StatusCode == System.Net.HttpStatusCode.NotFound || response.Content == null)
{
throw new Exception();
}
}
}

public async Task DeleteItem()
{
using (ResponseMessage response = await this.benchmarkHelper.TestContainer.DeleteItemStreamAsync(
MockedItemBenchmarkHelper.ExistingItemId,
new Cosmos.PartitionKey(MockedItemBenchmarkHelper.ExistingItemId)))
{
if (response.StatusCode == System.Net.HttpStatusCode.NotFound)
{
throw new Exception();
}
}
}
}
}
Loading