Skip to content

Commit

Permalink
More intuitive thread scaling & retries
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Nov 14, 2024
1 parent eca9902 commit 2531aa1
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 84 deletions.
178 changes: 122 additions & 56 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,35 @@ public void Dispose()
}
}

class ParallelLoopState : DynamicParallelLoopState
{
private int _successfulExecution;

public override void RequestReduceThreadCount()
{
if (Interlocked.CompareExchange(ref _successfulExecution, 0, 0) == 0)
return;

base.RequestReduceThreadCount();
}

public override bool ResetReduceThreadCount()
{
if (base.ResetReduceThreadCount())
{
Interlocked.Exchange(ref _successfulExecution, 0);
return true;
}

return false;
}

public void IncrementSuccessfulExecution()
{
Interlocked.Increment(ref _successfulExecution);
}
}

class ParallelThreadState
{
public IOrganizationService Service { get; set; }
Expand Down Expand Up @@ -456,7 +485,7 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
_retryQueue = new ConcurrentQueue<OrganizationRequest>();
_delayedUntil = new ConcurrentDictionary<ParallelThreadState, DateTime>();

new DynamicParallel<Entity, ParallelThreadState>(
new DynamicParallel<Entity, ParallelThreadState, ParallelLoopState>(
entities,
_parallelOptions,
() =>
Expand Down Expand Up @@ -488,32 +517,28 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
},
async (entity, loopState, threadLocalState) =>
{
var executed = false;

try
{
if (options.CancellationToken.IsCancellationRequested)
{
loopState.Stop();
return DynamicParallelVote.DecreaseThreads;
return;
}

// TODO: Take any requests from the retry queue and execute them first

// Generate the request to insert/update/delete this record
var request = requestGenerator(entity);

if (BypassCustomPluginExecution)
request.Parameters["BypassCustomPluginExecution"] = true;

if (threadLocalState.NextBatchSize == 1)
{
DynamicParallelVote? vote = null;

await UpdateNextBatchSize(threadLocalState, async () =>
{
vote = await ExecuteSingleRequest(request, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
executed = await ExecuteSingleRequest(request, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
});

if (vote != null)
return vote;
}
else
{
Expand All @@ -533,35 +558,75 @@ await UpdateNextBatchSize(threadLocalState, async () =>
threadLocalState.EMR.Requests.Add(request);

if (threadLocalState.EMR.Requests.Count < threadLocalState.NextBatchSize)
return null;

DynamicParallelVote? vote = null;
return;

await UpdateNextBatchSize(threadLocalState, async () =>
{
vote = await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
executed = await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
});

threadLocalState.EMR = null;

if (vote != null)
return vote;
}

return DynamicParallelVote.IncreaseThreads;
}
catch
{
threadLocalState.Error = true;
throw;
}

// Take any requests from the retry queue and execute them
while (executed && _retryQueue.TryDequeue(out var retryReq))
{
if (options.CancellationToken.IsCancellationRequested)
{
loopState.Stop();
return;
}

if (loopState.IsStopped)
return;

if (retryReq is ExecuteMultipleRequest emr)
executed = await ProcessBatch(emr, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
else
executed = await ExecuteSingleRequest(retryReq, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
}

if (executed)
loopState.RequestIncreaseThreadCount();
},
async (threadLocalState) =>
async (loopState, threadLocalState) =>
{
// If we've got a partial batch, execute it now
if (threadLocalState.EMR != null && !threadLocalState.Error)
await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);

// If this is the final thread, execute any remaining requests in the retry queue
if (!loopState.IsStopped &&
!options.CancellationToken.IsCancellationRequested &&
Interlocked.CompareExchange(ref _threadCount, 1, 1) == 1)
{
// Take any requests from the retry queue and execute them
while (_retryQueue.TryDequeue(out var retryReq))
{
if (options.CancellationToken.IsCancellationRequested)
{
loopState.Stop();
return;
}

if (loopState.IsStopped)
return;

if (retryReq is ExecuteMultipleRequest emr)
await ProcessBatch(emr, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
else
await ExecuteSingleRequest(retryReq, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
}
}

Interlocked.Decrement(ref _threadCount);
ShowProgress(options, operationNames, meta);

if (threadLocalState.Service != dataSource.Connection && threadLocalState.Service is IDisposable disposableClient)
disposableClient.Dispose();
Expand Down Expand Up @@ -632,20 +697,22 @@ protected class BulkApiErrorDetail
public int StatusCode { get; set; }
}

private async Task<DynamicParallelVote?> ExecuteSingleRequest(OrganizationRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action<OrganizationResponse> responseHandler, ParallelThreadState threadState)
private async Task<bool> ExecuteSingleRequest(OrganizationRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action<OrganizationResponse> responseHandler, ParallelThreadState threadState, ParallelLoopState loopState)
{
var newCount = Interlocked.Increment(ref _inProgressCount);
ShowProgress(options, newCount, operationNames, meta);
Interlocked.Increment(ref _inProgressCount);

for (var retry = 0; ; retry++)
{
try
{
ShowProgress(options, operationNames, meta);
var response = dataSource.Execute(org, req);
Interlocked.Increment(ref _successCount);

loopState.IncrementSuccessfulExecution();

responseHandler?.Invoke(response);
break;
return true;
}
catch (FaultException<OrganizationServiceFault> ex)
{
Expand All @@ -654,7 +721,7 @@ protected class BulkApiErrorDetail
// Handle service protection limit retries ourselves to manage multi-threading
// Wait for the recommended retry time, then add the request to the queue for retrying
// Terminate this thread so we don't continue to overload the server
newCount = Interlocked.Decrement(ref _inProgressCount);
Interlocked.Decrement(ref _inProgressCount);

// The server can report too-long delays. Wait the full 5 minutes to start with,
// then reduce to 2 minutes then 1 minute
Expand All @@ -666,14 +733,18 @@ protected class BulkApiErrorDetail
retryDelay = TimeSpan.FromMinutes(1);

_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
ShowProgress(options, newCount, operationNames, meta);
ShowProgress(options, operationNames, meta);

loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
_delayedUntil.TryRemove(threadState, out _);
ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
return DynamicParallelVote.DecreaseThreads;
return false;
}

loopState.IncrementSuccessfulExecution();

if (IsRetryableFault(ex?.Detail))
{
// Retry the request after a short delay
Expand All @@ -690,21 +761,19 @@ protected class BulkApiErrorDetail
}

Interlocked.Increment(ref _errorCount);
break;
return true;
}
}

return null;
}

private void ShowProgress(IQueryExecutionOptions options, int newCount, OperationNames operationNames, EntityMetadata meta)
private void ShowProgress(IQueryExecutionOptions options, OperationNames operationNames, EntityMetadata meta)
{
var progress = (double)newCount / _entityCount;
var progress = (double)_inProgressCount / _entityCount;
var threadCountMessage = _threadCount < 2 ? "" : $" ({_threadCount:N0} threads)";
var operationName = operationNames.InProgressUppercase;

var delayedUntilValues = _delayedUntil.Values.ToArray();
if (delayedUntilValues.Length == _threadCount)
if (delayedUntilValues.Length > 0 && delayedUntilValues.Length == _threadCount)
{
operationName = operationNames.CompletedUppercase;
threadCountMessage += $" (paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
Expand All @@ -714,23 +783,25 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
threadCountMessage += $" ({delayedUntilValues.Length:N0} threads paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
}

if (_successCount + _errorCount + 1 >= newCount)
options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {newCount:N0} of {_entityCount:N0}{threadCountMessage}...");
if (_successCount + _errorCount + 1 >= _inProgressCount)
options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_inProgressCount:N0} of {_entityCount:N0}{threadCountMessage}...");
else
options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_successCount + _errorCount + 1:N0} - {newCount:N0} of {_entityCount:N0}{threadCountMessage}...");
options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_successCount + _errorCount + 1:N0} - {_inProgressCount:N0} of {_entityCount:N0}{threadCountMessage}...");
}

private async Task<DynamicParallelVote?> ProcessBatch(ExecuteMultipleRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action<OrganizationResponse> responseHandler, ParallelThreadState threadState)
private async Task<bool> ProcessBatch(ExecuteMultipleRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action<OrganizationResponse> responseHandler, ParallelThreadState threadState, ParallelLoopState loopState)
{
var newCount = Interlocked.Add(ref _inProgressCount, req.Requests.Count);
ShowProgress(options, newCount, operationNames, meta);
Interlocked.Add(ref _inProgressCount, req.Requests.Count);

for (var retry = 0; !options.CancellationToken.IsCancellationRequested; retry++)
{
try
{
ShowProgress(options, operationNames, meta);
var resp = ExecuteMultiple(dataSource, org, meta, req);

loopState.IncrementSuccessfulExecution();

if (responseHandler != null)
{
foreach (var item in resp.Responses)
Expand Down Expand Up @@ -782,12 +853,7 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
retryReq.Requests.Add(req.Requests[errorItem.RequestIndex]);

// Wait and retry
var retryDelay = TimeSpan.FromSeconds(2);

if (retryableErrors[0].Fault.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryDelay is TimeSpan ts)
retryDelay = ts;

await Task.Delay(retryDelay, options.CancellationToken);
await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
req = retryReq;
continue;
}
Expand All @@ -813,12 +879,7 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
retryReq.Requests.Add(req.Requests[i]);

// Wait and retry
var retryDelay = TimeSpan.FromSeconds(2);

if (firstRetryableError.Fault.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
retryDelay = ts;

await Task.Delay(retryDelay, options.CancellationToken);
await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
req = retryReq;
continue;
}
Expand All @@ -829,12 +890,15 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
catch (FaultException<OrganizationServiceFault> ex)
{
if (!ex.IsThrottlingException(out var retryDelay))
{
loopState.IncrementSuccessfulExecution();
throw;
}

// Handle service protection limit retries ourselves to manage multi-threading
// Wait for the recommended retry time, then add the request to the queue for retrying
// Terminate this thread so we don't continue to overload the server
newCount = Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
Interlocked.Add(ref _inProgressCount, -req.Requests.Count);

// The server can report too-long delays. Wait the full 5 minutes to start with,
// then reduce to 2 minutes then 1 minute
Expand All @@ -846,16 +910,18 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
retryDelay = TimeSpan.FromMinutes(1);

_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
ShowProgress(options, newCount, operationNames, meta);
ShowProgress(options, operationNames, meta);

loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
_delayedUntil.TryRemove(threadState, out _);
ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
return DynamicParallelVote.DecreaseThreads;
return false;
}
}

return null;
return true;
}

protected virtual bool FilterErrors(NodeExecutionContext context, OrganizationRequest request, OrganizationServiceFault fault)
Expand Down
Loading

0 comments on commit 2531aa1

Please sign in to comment.