Skip to content

Commit

Permalink
Add partitioning_precision_strategy session property
Browse files Browse the repository at this point in the history
  • Loading branch information
aweisberg authored and arhimondr committed Jan 16, 2020
1 parent 652ca6a commit 9a68dfe
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -138,6 +139,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning";
public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy";
public static final String LIST_BUILT_IN_FUNCTIONS_ONLY = "list_built_in_functions_only";
public static final String PARTITIONING_PRECISION_STRATEGY = "partitioning_precision_strategy";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -684,7 +686,19 @@ public SystemSessionProperties(
LIST_BUILT_IN_FUNCTIONS_ONLY,
"Only List built-in functions in SHOW FUNCTIONS",
featuresConfig.isListBuiltInFunctionsOnly(),
false));
false),
new PropertyMetadata<>(
PARTITIONING_PRECISION_STRATEGY,
format("The strategy to use to pick when to repartition. Options are %s",
Stream.of(PartitioningPrecisionStrategy.values())
.map(PartitioningPrecisionStrategy::name)
.collect(joining(","))),
VARCHAR,
PartitioningPrecisionStrategy.class,
featuresConfig.getPartitioningPrecisionStrategy(),
false,
value -> PartitioningPrecisionStrategy.valueOf(((String) value).toUpperCase()),
PartitioningPrecisionStrategy::name));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -1164,4 +1178,10 @@ public static boolean isListBuiltInFunctionsOnly(Session session)
{
return session.getSystemProperty(LIST_BUILT_IN_FUNCTIONS_ONLY, Boolean.class);
}

public static boolean isExactPartitioningPreferred(Session session)
{
return session.getSystemProperty(PARTITIONING_PRECISION_STRATEGY, PartitioningPrecisionStrategy.class)
== PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ public class FeaturesConfig

private boolean listBuiltInFunctionsOnly = true;

private PartitioningPrecisionStrategy partitioningPrecisionStrategy = PartitioningPrecisionStrategy.AUTOMATIC;

public enum PartitioningPrecisionStrategy
{
// Let Presto decide when to repartition
AUTOMATIC,
// Use exact partitioning until Presto becomes smarter WRT to picking when to repartition
PREFER_EXACT_PARTITIONING
}

public enum JoinReorderingStrategy
{
NONE,
Expand Down Expand Up @@ -1146,4 +1156,17 @@ public FeaturesConfig setListBuiltInFunctionsOnly(boolean listBuiltInFunctionsOn
this.listBuiltInFunctionsOnly = listBuiltInFunctionsOnly;
return this;
}

public PartitioningPrecisionStrategy getPartitioningPrecisionStrategy()
{
return partitioningPrecisionStrategy;
}

@Config("partitioning-precision-strategy")
@ConfigDescription("Set strategy used to determine whether to repartition (AUTOMATIC, PREFER_EXACT)")
public FeaturesConfig setPartitioningPrecisionStrategy(PartitioningPrecisionStrategy partitioningPrecisionStrategy)
{
this.partitioningPrecisionStrategy = partitioningPrecisionStrategy;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.concurrent.Immutable;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -241,6 +242,26 @@ public boolean isPartitionedOn(Collection<VariableReferenceExpression> columns,
return true;
}

public boolean isPartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> knownConstants)
{
Set<VariableReferenceExpression> toCheck = new HashSet<>();
for (RowExpression argument : arguments) {
// partitioned on (k_1, k_2, ..., k_n) => partitioned on (k_1, k_2, ..., k_n, k_n+1, ...)
// can safely ignore all constant columns when comparing partition properties
if (argument instanceof ConstantExpression) {
continue;
}
if (!(argument instanceof VariableReferenceExpression)) {
return false;
}
if (knownConstants.contains(argument)) {
continue;
}
toCheck.add((VariableReferenceExpression) argument);
}
return ImmutableSet.copyOf(columns).equals(toCheck);
}

public boolean isEffectivelySinglePartition(Set<VariableReferenceExpression> knownConstants)
{
return isPartitionedOn(ImmutableSet.of(), knownConstants);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,34 @@ public boolean isNullsAndAnyReplicated()
return global.isNullsAndAnyReplicated();
}

public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns)
public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean exactly)
{
return isStreamPartitionedOn(columns, false);
return isStreamPartitionedOn(columns, false, exactly);
}

public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated)
public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated, boolean exactly)
{
return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
if (exactly) {
return global.isStreamPartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated);
}
else {
return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
}
}

public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns)
public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean exactly)
{
return isNodePartitionedOn(columns, false);
return isNodePartitionedOn(columns, false, exactly);
}

public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated)
public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated, boolean exactly)
{
return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
if (exactly) {
return global.isNodePartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated);
}
else {
return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
}
}

@Deprecated
Expand Down Expand Up @@ -471,6 +481,11 @@ private boolean isNodePartitionedOn(Collection<VariableReferenceExpression> colu
return nodePartitioning.isPresent() && nodePartitioning.get().isPartitionedOn(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

private boolean isNodePartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
{
return nodePartitioning.isPresent() && nodePartitioning.get().isPartitionedOnExactly(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

private boolean isCompatibleTablePartitioningWith(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session)
{
return nodePartitioning.isPresent() && nodePartitioning.get().isCompatibleWith(partitioning, metadata, session) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
Expand Down Expand Up @@ -531,6 +546,11 @@ private boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> co
return streamPartitioning.isPresent() && streamPartitioning.get().isPartitionedOn(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

private boolean isStreamPartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
{
return streamPartitioning.isPresent() && streamPartitioning.get().isPartitionedOnExactly(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

/**
* @return true if all the data will effectively land in a single stream
*/
Expand Down
Loading

0 comments on commit 9a68dfe

Please sign in to comment.