Skip to content

Commit

Permalink
Clearer progress messages for multi-threaded DML operations
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Nov 7, 2023
1 parent d4c8592 commit a6b0237
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
{
var inProgressCount = 0;
var count = 0;
var errorCount = 0;
var threadCount = 0;

#if NETCOREAPP
var svc = dataSource.Connection as ServiceClient;
Expand Down Expand Up @@ -580,6 +582,7 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
if (!useAffinityCookie && service is CrmServiceClient crmService)
crmService.EnableAffinityCookie = false;
#endif
Interlocked.Increment(ref threadCount);

return new { Service = service, EMR = default(ExecuteMultipleRequest) };
},
Expand All @@ -600,7 +603,11 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
{
var newCount = Interlocked.Increment(ref inProgressCount);
var progress = (double)newCount / entities.Count;
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");

if (threadCount < 2)
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
else
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");

try
{
Expand Down Expand Up @@ -643,7 +650,7 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio

if (threadLocalState.EMR.Requests.Count == BatchSize)
{
ProcessBatch(threadLocalState.EMR, ref count, ref inProgressCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, responseHandler, ref fault);
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, responseHandler, ref fault);

threadLocalState = new { threadLocalState.Service, EMR = default(ExecuteMultipleRequest) };
}
Expand All @@ -654,7 +661,9 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
(threadLocalState) =>
{
if (threadLocalState.EMR != null)
ProcessBatch(threadLocalState.EMR, ref count, ref inProgressCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, responseHandler, ref fault);
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, responseHandler, ref fault);

Interlocked.Decrement(ref threadCount);

if (threadLocalState.Service != dataSource.Connection && threadLocalState.Service is IDisposable disposableClient)
disposableClient.Dispose();
Expand Down Expand Up @@ -694,11 +703,12 @@ protected class BulkApiErrorDetail
public int StatusCode { get; set; }
}

private void ProcessBatch(ExecuteMultipleRequest req, ref int count, ref int inProgressCount, List<Entity> entities, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, Action<OrganizationResponse> responseHandler, ref OrganizationServiceFault fault)
private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int count, ref int inProgressCount, ref int errorCount, List<Entity> entities, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, Action<OrganizationResponse> responseHandler, ref OrganizationServiceFault fault)
{
var newCount = Interlocked.Add(ref inProgressCount, req.Requests.Count);
var progress = (double)newCount / entities.Count;
options.Progress(progress, $"{operationNames.InProgressUppercase} {GetDisplayName(0, meta)} {newCount + 1 - req.Requests.Count:N0} - {newCount:N0} of {entities.Count:N0}...");
var threadCountMessage = threadCount < 2 ? "" : $" ({threadCount:N0} threads)";
options.Progress(progress, $"{operationNames.InProgressUppercase} {GetDisplayName(0, meta)} {count + errorCount + 1:N0} - {newCount:N0} of {entities.Count:N0}{threadCountMessage}...");
var resp = ExecuteMultiple(dataSource, org, meta, req);

if (responseHandler != null)
Expand All @@ -715,6 +725,7 @@ private void ProcessBatch(ExecuteMultipleRequest req, ref int count, ref int inP
.ToList();

Interlocked.Add(ref count, req.Requests.Count - errorResponses.Count);
Interlocked.Add(ref errorCount, errorResponses.Count);

var error = errorResponses.FirstOrDefault(item => FilterErrors(item.Fault));

Expand Down

0 comments on commit a6b0237

Please sign in to comment.