Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf improvement on queue/table bindings in high throughput scenarios (V1) #1657

Merged
merged 4 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class DefaultStorageAccountProvider : IStorageAccountProvider
private readonly IStorageCredentialsValidator _storageCredentialsValidator;
private readonly IStorageAccountParser _storageAccountParser;
private readonly IServiceProvider _services;
private readonly ConcurrentDictionary<string, IStorageAccount> _accounts = new ConcurrentDictionary<string, IStorageAccount>();
private readonly ConcurrentDictionary<string, Task<IStorageAccount>> _accounts = new ConcurrentDictionary<string, Task<IStorageAccount>>();

private IStorageAccount _dashboardAccount;
private bool _dashboardAccountSet;
Expand Down Expand Up @@ -188,17 +188,8 @@ private async Task<IStorageAccount> CreateAndValidateAccountAsync(string connect
return account;
}

public async Task<IStorageAccount> TryGetAccountAsync(string connectionStringName, CancellationToken cancellationToken)
{
IStorageAccount account;
if (!_accounts.TryGetValue(connectionStringName, out account))
{
// in rare cases createAndValidateAccountAsync could be called multiple times for the same account
account = await CreateAndValidateAccountAsync(connectionStringName, cancellationToken);
_accounts.AddOrUpdate(connectionStringName, (cs) => account, (cs, a) => account);
}
return account;
}
public Task<IStorageAccount> TryGetAccountAsync(string connectionStringName, CancellationToken cancellationToken) =>
_accounts.GetOrAdd(connectionStringName, s => CreateAndValidateAccountAsync(s, cancellationToken));

private IStorageAccount ParseAccount(string connectionStringName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ private ParameterDescriptor ToWriteParameterDescriptorForCollector(QueueAttribut

private ParameterDescriptor ToParameterDescriptorForCollector(QueueAttribute attr, ParameterInfo parameter, INameResolver nameResolver, FileAccess access)
{
Task<IStorageAccount> t = Task.Run(() =>
_accountProvider.GetStorageAccountAsync(attr, CancellationToken.None, nameResolver));
IStorageAccount account = t.GetAwaiter().GetResult();
// Avoid using the sync over async pattern (Async().GetAwaiter().GetResult()) whenever possible
IStorageAccount account = _accountProvider.GetStorageAccountAsync(attr, CancellationToken.None, nameResolver).GetAwaiter().GetResult();

string accountName = account.Credentials.AccountName;

Expand Down Expand Up @@ -184,7 +183,19 @@ private IAsyncCollector<IStorageQueueMessage> BuildFromQueueAttribute(QueueAttri

internal IStorageQueue GetQueue(QueueAttribute attrResolved)
{
var account = Task.Run(() => _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None)).GetAwaiter().GetResult();
// Avoid using the sync over async pattern (Async().GetAwaiter().GetResult()) whenever possible
var account = _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None).GetAwaiter().GetResult();
return GetQueue(attrResolved, account);
}

internal async Task<IStorageQueue> GetQueueAsync(QueueAttribute attrResolved)
{
var account = await _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None);
return GetQueue(attrResolved, account);
}

internal static IStorageQueue GetQueue(QueueAttribute attrResolved, IStorageAccount account)
{
var client = account.CreateQueueClient();

string queueName = attrResolved.QueueName.ToLowerInvariant();
Expand All @@ -209,7 +220,7 @@ async Task<IStorageQueue> IAsyncConverter<QueueAttribute, IStorageQueue>.Convert
QueueAttribute attrResolved,
CancellationToken cancellation)
{
IStorageQueue queue = _bindingProvider.GetQueue(attrResolved);
IStorageQueue queue = await _bindingProvider.GetQueueAsync(attrResolved);
await queue.CreateIfNotExistsAsync(CancellationToken.None);
return queue;
}
Expand Down Expand Up @@ -258,4 +269,4 @@ public QueueAsyncCollector(IStorageQueue queue, IMessageEnqueuedWatcher messageE
}
}
}
}
}
25 changes: 18 additions & 7 deletions src/Microsoft.Azure.WebJobs.Host/Tables/TableExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,27 @@ public void Initialize(ExtensionConfigContext context)
// Get the storage table from the attribute.
private IStorageTable GetTable(TableAttribute attribute)
{
var account = Task.Run(() => this._accountProvider.GetStorageAccountAsync(attribute, CancellationToken.None)).GetAwaiter().GetResult();
// Avoid using the sync over async pattern (Async().GetAwaiter().GetResult()) whenever possible
var account = this._accountProvider.GetStorageAccountAsync(attribute, CancellationToken.None).GetAwaiter().GetResult();
return GetTable(attribute, account);
}

private async Task<IStorageTable> GetTableAsync(TableAttribute attribute)
{
var account = await this._accountProvider.GetStorageAccountAsync(attribute, CancellationToken.None);
return GetTable(attribute, account);
}

private static IStorageTable GetTable(TableAttribute attribute, IStorageAccount account)
{
var tableClient = account.CreateTableClient();
return tableClient.GetTableReference(attribute.TableName);
}

private ParameterDescriptor ToParameterDescriptorForCollector(TableAttribute attribute, ParameterInfo parameter, INameResolver nameResolver)
{
Task<IStorageAccount> t = Task.Run(() =>
_accountProvider.GetStorageAccountAsync(attribute, CancellationToken.None, nameResolver));
IStorageAccount account = t.GetAwaiter().GetResult();
// Avoid using the sync over async pattern (Async().GetAwaiter().GetResult()) whenever possible
IStorageAccount account = _accountProvider.GetStorageAccountAsync(attribute, CancellationToken.None, nameResolver).GetAwaiter().GetResult();
string accountName = account.Credentials.AccountName;

return new TableParameterDescriptor
Expand Down Expand Up @@ -224,15 +235,15 @@ IStorageTable IConverter<TableAttribute, IStorageTable>.Convert(TableAttribute a

async Task<CloudTable> IAsyncConverter<TableAttribute, CloudTable>.ConvertAsync(TableAttribute attribute, CancellationToken cancellation)
{
var table = _bindingProvider.GetTable(attribute);
var table = await _bindingProvider.GetTableAsync(attribute);
await table.CreateIfNotExistsAsync(CancellationToken.None);

return table.SdkObject;
}

async Task<JObject> IAsyncConverter<TableAttribute, JObject>.ConvertAsync(TableAttribute attribute, CancellationToken cancellation)
{
var table = _bindingProvider.GetTable(attribute);
var table = await _bindingProvider.GetTableAsync(attribute);

IStorageTableOperation retrieve = table.CreateRetrieveOperation<DynamicTableEntity>(
attribute.PartitionKey, attribute.RowKey);
Expand All @@ -252,7 +263,7 @@ async Task<JObject> IAsyncConverter<TableAttribute, JObject>.ConvertAsync(TableA
// Used as an alternative to binding to IQueryable.
async Task<JArray> IAsyncConverter<TableAttribute, JArray>.ConvertAsync(TableAttribute attribute, CancellationToken cancellation)
{
var table = _bindingProvider.GetTable(attribute).SdkObject;
var table = (await _bindingProvider.GetTableAsync(attribute)).SdkObject;

string finalQuery = attribute.Filter;
if (!string.IsNullOrEmpty(attribute.PartitionKey))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,24 @@ private static void FunctionBody(CancellationToken token)
[Fact]
public void WebJobsShutdown_WhenUsingHostCall_TriggersCancellationToken()
{
using (WebJobsShutdownContext shutdownContext = new WebJobsShutdownContext())
using (JobHost host = new JobHost(_hostConfiguration))
// Run test in multithreaded environment
var oldContext = SynchronizationContext.Current;
try
{
_invokeInFunction = () => { shutdownContext.NotifyShutdown(); };
SynchronizationContext.SetSynchronizationContext(null);
using (WebJobsShutdownContext shutdownContext = new WebJobsShutdownContext())
using (JobHost host = new JobHost(_hostConfiguration))
{
_invokeInFunction = () => { shutdownContext.NotifyShutdown(); };

Task callTask = InvokeNoAutomaticTriggerFunction(host);
Task callTask = InvokeNoAutomaticTriggerFunction(host);

EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: true);
EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: true);
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(oldContext);
}
}

Expand All @@ -132,15 +142,25 @@ public void WebJobsShutdown_WhenUsingTriggeredFunction_TriggersCancellationToken
[Fact]
public void Stop_WhenUsingHostCall_DoesNotTriggerCancellationToken()
{
using (JobHost host = new JobHost(_hostConfiguration))
// Run test in multithreaded environment
var oldContext = SynchronizationContext.Current;
try
{
host.Start();
SynchronizationContext.SetSynchronizationContext(null);
using (JobHost host = new JobHost(_hostConfiguration))
{
host.Start();

Task callTask = InvokeNoAutomaticTriggerFunction(host);
Task callTask = InvokeNoAutomaticTriggerFunction(host);

host.Stop();
host.Stop();

EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: false);
EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: false);
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(oldContext);
}
}

Expand All @@ -161,13 +181,22 @@ public void Stop_WhenUsingTriggeredFunction_TriggersCancellationToken()
public void Dispose_WhenUsingHostCall_TriggersCancellationToken()
{
Task callTask;
// Run test in multithreaded environment
var oldContext = SynchronizationContext.Current;
try
{
SynchronizationContext.SetSynchronizationContext(null);
using (JobHost host = new JobHost(_hostConfiguration))
{
callTask = InvokeNoAutomaticTriggerFunction(host);
}

using (JobHost host = new JobHost(_hostConfiguration))
EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: true);
}
finally
{
callTask = InvokeNoAutomaticTriggerFunction(host);
SynchronizationContext.SetSynchronizationContext(oldContext);
}

EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: true);
}

[Fact]
Expand All @@ -184,29 +213,50 @@ public void Dispose_WhenUsingTriggeredFunction_TriggersCancellationToken()
[Fact]
public void CallCancellationToken_WhenUsingHostCall_TriggersCancellationToken()
{
using (CancellationTokenSource tokenSource = new CancellationTokenSource())
using (JobHost host = new JobHost(_hostConfiguration))
// Run test in multithreaded environment
var oldContext = SynchronizationContext.Current;
try
{
_invokeInFunction = () => { tokenSource.Cancel(); };
SynchronizationContext.SetSynchronizationContext(null);
using (CancellationTokenSource tokenSource = new CancellationTokenSource())
using (JobHost host = new JobHost(_hostConfiguration))
{
_invokeInFunction = () => { tokenSource.Cancel(); };

Task callTask = InvokeNoAutomaticTriggerFunction(host, tokenSource.Token);
Task callTask = InvokeNoAutomaticTriggerFunction(host, tokenSource.Token);

EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: true);
EvaluateNoAutomaticTriggerCancellation(callTask, expectedCancellation: true);
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(oldContext);
}
}

[Fact]
public void CallCancellationToken_WhenUsingTriggeredFunction_DoesNotTriggerCancellationToken()
{
using (CancellationTokenSource tokenSource = new CancellationTokenSource())
using (JobHost host = new JobHost(_hostConfiguration))
// Run test in multithreaded environment
var oldContext = SynchronizationContext.Current;
try
{
_invokeInFunction = () => { tokenSource.Cancel(); };
SynchronizationContext.SetSynchronizationContext(null);

using (CancellationTokenSource tokenSource = new CancellationTokenSource())
using (JobHost host = new JobHost(_hostConfiguration))
{
_invokeInFunction = () => { tokenSource.Cancel(); };

PrepareHostForTrigger(host, startHost: false);
Assert.True(host.StartAsync(tokenSource.Token).WaitUntilCompleted(DefaultTimeout));
PrepareHostForTrigger(host, startHost: false);
Assert.True(host.StartAsync(tokenSource.Token).WaitUntilCompleted(DefaultTimeout));

EvaluateTriggeredCancellation(expectedCancellation: false);
EvaluateTriggeredCancellation(expectedCancellation: false);
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(oldContext);
}
}

Expand Down