Skip to content

Commit

Permalink
Improved stats feedback for large DML operations
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Nov 17, 2024
1 parent de62dc1 commit 1be3e81
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
112 changes: 112 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,17 @@ class ParallelThreadState
private int _successCount;
private int _errorCount;
private int _threadCount;
private int _maxThreadCount;
private int _pausedThreadCount;
private int _batchExecutionCount;
private ParallelOptions _parallelOptions;
private ConcurrentQueue<OrganizationRequest> _retryQueue;
private ConcurrentDictionary<ParallelThreadState, DateTime> _delayedUntil;
private OrganizationServiceFault _fault;
private int _serviceProtectionLimitHits;
private int[] _threadCountHistory;
private int[] _rpmHistory;
private float[] _batchSizeHistory;

/// <summary>
/// The SQL string that the query was converted from
Expand Down Expand Up @@ -148,12 +155,50 @@ class ParallelThreadState
/// <summary>
/// The maximum degree of parallelism to apply to this operation
/// </summary>
[DisplayName("Max Degree of Parallelism")]
[Description("The maximum number of operations that will be performed in parallel")]
public abstract int MaxDOP { get; set; }

[Category("Statistics")]
[DisplayName("Actual Degree of Parallelism")]
[BrowsableInEstimatedPlan(false)]
[Description("The number of threads that were running each minute during the operation")]
[TypeConverter(typeof(MiniChartConverter))]
#if !NETCOREAPP
[Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
#endif
public float[] ActualDOP => _threadCountHistory.Select(i => (float)i).ToArray();

[Category("Statistics")]
[DisplayName("Records Per Minute")]
[BrowsableInEstimatedPlan(false)]
[Description("The number of records that were processed each minute during the operation")]
[TypeConverter(typeof(MiniChartConverter))]
#if !NETCOREAPP
[Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
#endif
public float[] RPM => _rpmHistory.Select(i => (float)i).ToArray();

[Category("Statistics")]
[DisplayName("Actual Batch Size")]
[BrowsableInEstimatedPlan(false)]
[Description("The average number of records that were processed per batch each minute during the operation")]
[TypeConverter(typeof(MiniChartConverter))]
#if !NETCOREAPP
[Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
#endif
public float[] ActualBatchSize => _batchSizeHistory;

[Category("Statistics")]
[DisplayName("Service Protection Limit Hits")]
[BrowsableInEstimatedPlan(false)]
[Description("The number of times execution was paused due to service protection limits")]
public int ServiceProtectionLimitHits => _serviceProtectionLimitHits;

/// <summary>
/// The number of requests that will be submitted in a single batch
/// </summary>
[DisplayName("Max Batch Size")]
[Description("The number of requests that will be submitted in a single batch")]
public abstract int BatchSize { get; set; }

Expand Down Expand Up @@ -458,13 +503,18 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
_successCount = 0;
_errorCount = 0;
_threadCount = 0;
_batchExecutionCount = 0;

#if NETCOREAPP
var svc = dataSource.Connection as ServiceClient;
#else
var svc = dataSource.Connection as CrmServiceClient;
#endif

var threadCountHistory = new List<int>();
var rpmHistory = new List<int>();
var batchSizeHistory = new List<float>();

var maxDop = MaxDOP;

if (!ParallelismHelper.CanParallelise(dataSource.Connection))
Expand All @@ -474,6 +524,48 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
svc = null;

var useAffinityCookie = maxDop == 1 || _entityCount < 100;
var completed = new CancellationTokenSource();

// Set up one background thread to monitor the performance for debugging
var performanceMonitor = Task.Factory.StartNew(async () =>
{
var lastSuccess = 0;
var lastBatchCount = 0;

while (!completed.Token.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromMinutes(1), completed.Token);
}
catch (TaskCanceledException)
{
break;
}

threadCountHistory.Add(_maxThreadCount);
var prevSuccess = Interlocked.Exchange(ref lastSuccess, _successCount);
var prevBatchCount = Interlocked.Exchange(ref lastBatchCount, _batchExecutionCount);
var recordCount = lastSuccess - prevSuccess;
var batchCount = lastBatchCount - prevBatchCount;
rpmHistory.Add(recordCount);
if (batchCount == 0)
batchSizeHistory.Add(0);
else
batchSizeHistory.Add((float)recordCount / batchCount);
_maxThreadCount = _threadCount;
lastSuccess = _successCount;
}

threadCountHistory.Add(_maxThreadCount);
var finalRecordCount = _successCount - lastSuccess;
var finalBatchCount = _batchExecutionCount - lastBatchCount;
rpmHistory.Add(finalRecordCount);
if (finalBatchCount == 0)
batchSizeHistory.Add(0);
else
batchSizeHistory.Add((float)finalRecordCount / finalBatchCount);
});

try
{
Expand Down Expand Up @@ -507,6 +599,8 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
#endif
Interlocked.Increment(ref _threadCount);

_maxThreadCount = Math.Max(_maxThreadCount, _threadCount);

return new ParallelThreadState
{
Service = service,
Expand Down Expand Up @@ -657,6 +751,15 @@ await UpdateNextBatchSize(threadLocalState, async () =>
else
throw ex;
}
finally
{
completed.Cancel();
performanceMonitor.ConfigureAwait(false).GetAwaiter().GetResult();

_threadCountHistory = threadCountHistory.ToArray();
_rpmHistory = rpmHistory.ToArray();
_batchSizeHistory = batchSizeHistory.ToArray();
}

recordsAffected = _successCount;
message = $"({_successCount:N0} {GetDisplayName(_successCount, meta)} {operationNames.CompletedLowercase})";
Expand All @@ -675,6 +778,9 @@ private async Task UpdateNextBatchSize(ParallelThreadState threadLocalState, Fun
// Adjust the batch size based on the time taken to try and keep the total time around 10sec
var multiplier = TimeSpan.FromSeconds(10).TotalMilliseconds / timer.Duration.TotalMilliseconds;
threadLocalState.NextBatchSize = Math.Max(1, Math.Min(BatchSize, (int)(threadLocalState.NextBatchSize * multiplier)));

// Update the statistics of the number of batches we have executed
Interlocked.Increment(ref _batchExecutionCount);
}

private bool IsRetryableFault(OrganizationServiceFault fault)
Expand Down Expand Up @@ -733,10 +839,13 @@ private async Task<bool> ExecuteSingleRequest(OrganizationRequest req, Operation
retryDelay = TimeSpan.FromMinutes(1);

_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
if (Interlocked.Increment(ref _pausedThreadCount) == 1)
Interlocked.Increment(ref _serviceProtectionLimitHits);
ShowProgress(options, operationNames, meta);

loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
Interlocked.Decrement(ref _pausedThreadCount);
_delayedUntil.TryRemove(threadState, out _);
ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
Expand Down Expand Up @@ -910,10 +1019,13 @@ private async Task<bool> ProcessBatch(ExecuteMultipleRequest req, OperationNames
retryDelay = TimeSpan.FromMinutes(1);

_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
if (Interlocked.Increment(ref _pausedThreadCount) == 1)
Interlocked.Increment(ref _serviceProtectionLimitHits);
ShowProgress(options, operationNames, meta);

loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
Interlocked.Decrement(ref _pausedThreadCount);
_delayedUntil.TryRemove(threadState, out _);
ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
Expand Down
48 changes: 48 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Drawing;
using System.Drawing.Design;
using System.Globalization;
using System.Linq;
using System.Reflection;
Expand Down Expand Up @@ -161,6 +163,10 @@ public override TypeConverter Converter
{
get
{
var propTypeConverterAttr = _prop?.GetCustomAttribute<TypeConverterAttribute>();
if (propTypeConverterAttr != null)
return (TypeConverter)Activator.CreateInstance(Type.GetType(propTypeConverterAttr.ConverterTypeName));

var type = _value?.GetType() ?? _prop.PropertyType;

if ((type.IsClass || type.IsInterface) && type != typeof(string))
Expand Down Expand Up @@ -425,4 +431,46 @@ class DictionaryKeyAttribute : Attribute
class DictionaryValueAttribute : Attribute
{
}

#if !NETCOREAPP
class MiniChartEditor : UITypeEditor
{
public override bool GetPaintValueSupported(ITypeDescriptorContext context)
{
return true;
}

public override void PaintValue(PaintValueEventArgs e)
{
var values = (float[])((ExecutionPlanNodeTypeDescriptor)e.Value).GetPropertyOwner(null);
var brush = Brushes.DarkBlue;
var barWidth = (float)e.Bounds.Width / values.Length;
var maxValue = values.Max();

for (var i = 0; i < values.Length && maxValue > 0; i++)
{
var height = e.Bounds.Height * values[i] / maxValue;
e.Graphics.FillRectangle(brush, e.Bounds.X + i * barWidth, e.Bounds.Bottom - height, barWidth, height);
}
}
}
#endif

class MiniChartConverter : DataCollectionConverter
{
public override object ConvertTo(ITypeDescriptorContext context, CultureInfo culture, object value, Type destinationType)
{
if (value is ICustomTypeDescriptor desc)
value = desc.GetPropertyOwner(null);

var values = (float[])value;
var maxValue = values.Max();
var minValue = values.Min();

if (minValue == maxValue)
return minValue.ToString();
else
return $"{minValue} - {maxValue}";
}
}
}

0 comments on commit 1be3e81

Please sign in to comment.