From 3343c795a476c850a2e2e5724344eee499ba4fb6 Mon Sep 17 00:00:00 2001 From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com> Date: Wed, 6 Nov 2024 22:32:40 +0000 Subject: [PATCH] Fixed batch exception handling Fixes #575 --- .../ExecutionPlan/BaseDmlNode.cs | 141 ++++++++++-------- 1 file changed, 77 insertions(+), 64 deletions(-) diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs index 5e6b7e7f..ec69a71e 100644 --- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs +++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs @@ -60,6 +60,15 @@ public void Dispose() } } + class ParallelThreadState + { + public IOrganizationService Service { get; set; } + + public ExecuteMultipleRequest EMR { get; set; } + + public bool Error { get; set; } + } + /// /// The SQL string that the query was converted from /// @@ -439,79 +448,78 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions #endif Interlocked.Increment(ref threadCount); - return new { Service = service, EMR = default(ExecuteMultipleRequest) }; + return new ParallelThreadState { Service = service, EMR = default(ExecuteMultipleRequest), Error = false }; }, (entity, loopState, index, threadLocalState) => { - if (options.CancellationToken.IsCancellationRequested) + try { - loopState.Stop(); - return threadLocalState; - } + if (options.CancellationToken.IsCancellationRequested) + { + loopState.Stop(); + return threadLocalState; + } - var request = requestGenerator(entity); + var request = requestGenerator(entity); - if (BypassCustomPluginExecution) - request.Parameters["BypassCustomPluginExecution"] = true; + if (BypassCustomPluginExecution) + request.Parameters["BypassCustomPluginExecution"] = true; - if (BatchSize == 1) - { - var newCount = Interlocked.Increment(ref inProgressCount); - var progress = (double)newCount / entities.Count; + if (BatchSize == 1) + { + var newCount = Interlocked.Increment(ref inProgressCount); + var progress = (double)newCount / entities.Count; - if (threadCount < 2) - options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})..."); - else - options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)..."); + if (threadCount < 2) + options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})..."); + else + options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)..."); - while (true) - { - try + while (true) { - var response = dataSource.Execute(threadLocalState.Service, request); - Interlocked.Increment(ref count); + try + { + var response = dataSource.Execute(threadLocalState.Service, request); + Interlocked.Increment(ref count); - responseHandler?.Invoke(response); - break; - } - catch (FaultException ex) - { - if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables - ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds. - ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later. - ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52. + responseHandler?.Invoke(response); + break; + } + catch (FaultException ex) { - // In case throttling isn't handled by normal retry logic in the service client - var retryAfterSeconds = 2; + if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables + ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds. + ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later. + ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52. + { + // In case throttling isn't handled by normal retry logic in the service client + var retryAfterSeconds = 2; - if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _))) - retryAfterSeconds = Convert.ToInt32(retryAfter); + if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _))) + retryAfterSeconds = Convert.ToInt32(retryAfter); - Thread.Sleep(retryAfterSeconds * 1000); - continue; - } + Thread.Sleep(retryAfterSeconds * 1000); + continue; + } - if (FilterErrors(context, request, ex.Detail)) - { - if (ContinueOnError) - fault = fault ?? ex.Detail; - else - throw; - } + if (FilterErrors(context, request, ex.Detail)) + { + if (ContinueOnError) + fault = fault ?? ex.Detail; + else + throw; + } - Interlocked.Increment(ref errorCount); - break; + Interlocked.Increment(ref errorCount); + break; + } } } - } - else - { - if (threadLocalState.EMR == null) + else { - threadLocalState = new + if (threadLocalState.EMR == null) { - threadLocalState.Service, - EMR = new ExecuteMultipleRequest + threadLocalState.EMR = new ExecuteMultipleRequest { Requests = new OrganizationRequestCollection(), Settings = new ExecuteMultipleSettings @@ -519,25 +527,30 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions ContinueOnError = IgnoresSomeErrors, ReturnResponses = responseHandler != null } - } - }; - } + }; + } - threadLocalState.EMR.Requests.Add(request); + threadLocalState.EMR.Requests.Add(request); - if (threadLocalState.EMR.Requests.Count == BatchSize) - { - ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault); + if (threadLocalState.EMR.Requests.Count == BatchSize) + { + ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault); - threadLocalState = new { threadLocalState.Service, EMR = default(ExecuteMultipleRequest) }; + threadLocalState.EMR = null; + } } - } - return threadLocalState; + return threadLocalState; + } + catch + { + threadLocalState.Error = true; + throw; + } }, (threadLocalState) => { - if (threadLocalState.EMR != null) + if (threadLocalState.EMR != null && !threadLocalState.Error) ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault); Interlocked.Decrement(ref threadCount);