Skip to content

Commit

Permalink
Fix partitioned aggregate error handling for dense data
Browse files Browse the repository at this point in the history
Fixes #387
  • Loading branch information
MarkMpn committed Nov 18, 2023
1 parent 595fcd4 commit e04bf75
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ protected override IEnumerable<Entity> ExecuteInternal(NodeExecutionContext cont
var minKey = GetMinMaxKey(fetchXmlNode, context, false);
var maxKey = GetMinMaxKey(fetchXmlNode, context, true);

if (minKey.IsNull || maxKey.IsNull || minKey == maxKey)
throw new QueryExecutionException("Cannot partition query");
if (minKey.IsNull || maxKey.IsNull || minKey >= maxKey)
throw new PartitionOverflowException();

// Add the filter to the FetchXML to partition the results
fetchXmlNode.Entity.AddItem(new filter
Expand All @@ -128,9 +128,6 @@ protected override IEnumerable<Entity> ExecuteInternal(NodeExecutionContext cont
partitionParameterTypes[kvp.Key] = kvp.Value;
}

if (minKey > maxKey)
throw new QueryExecutionException("Cannot partition query");

// Split recursively, add up values below & above split value if query returns successfully, or re-split on error
// Range is > MinValue AND <= MaxValue, so start from just before first record to ensure the first record is counted
var fullRange = new Partition
Expand Down Expand Up @@ -167,7 +164,7 @@ protected override IEnumerable<Entity> ExecuteInternal(NodeExecutionContext cont
{
var partitioner = Partitioner.Create(_queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner,
new ParallelOptions { MaxDegreeOfParallelism = maxDop },
new ParallelOptions { MaxDegreeOfParallelism = maxDop, CancellationToken = context.Options.CancellationToken },
() =>
{
var ds = new Dictionary<string, DataSource>
Expand Down Expand Up @@ -284,7 +281,10 @@ private void SplitPartition(Partition partition)
// Fail if we get stuck on a particularly dense partition. If there's > 50K records in a 10 second window we probably
// won't be able to split it successfully
if (partition.MaxValue.Value < partition.MinValue.Value.AddSeconds(10))
{
_queue.CompleteAdding();
throw new PartitionOverflowException();
}

// Start splitting partitions in half. Once we've done that a few times and are still hitting the 50K limit, start
// pre-emptively splitting into smaller chunks
Expand Down

0 comments on commit e04bf75

Please sign in to comment.