Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add native expression optimizer #23331

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
<module>presto-singlestore</module>
<module>presto-hana</module>
<module>presto-openapi</module>
<module>presto-native-plugin</module>
</modules>

<dependencyManagement>
Expand Down Expand Up @@ -697,6 +698,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-benchmark</artifactId>
Expand Down Expand Up @@ -892,6 +900,19 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-plugin</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-execution</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.hive</groupId>
<artifactId>hive-dwrf</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms";
public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns";
public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values";
public static final String DELEGATING_ROW_EXPRESSION_OPTIMIZER_ENABLED = "delegating_row_expression_optimizer_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -2038,6 +2039,11 @@ public SystemSessionProperties(
booleanProperty(INLINE_PROJECTIONS_ON_VALUES,
"Whether to evaluate project node on values node",
featuresConfig.getInlineProjectionsOnValues(),
false),
booleanProperty(
DELEGATING_ROW_EXPRESSION_OPTIMIZER_ENABLED,
"Enable delegating row optimizer",
featuresConfig.isDelegatingRowExpressionOptimizerEnabled(),
false));
}

Expand Down Expand Up @@ -3370,4 +3376,9 @@ public static boolean isInlineProjectionsOnValues(Session session)
{
return session.getSystemProperty(INLINE_PROJECTIONS_ON_VALUES, Boolean.class);
}

public static boolean isDelegatingRowExpressionOptimizerEnabled(Session session)
{
return session.getSystemProperty(DELEGATING_ROW_EXPRESSION_OPTIMIZER_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
*/
package com.facebook.presto.connector;

import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

import static com.facebook.presto.spi.StandardErrorCode.NO_CPP_SIDECARS;
import static java.util.Objects.requireNonNull;

public class ConnectorAwareNodeManager
Expand Down Expand Up @@ -58,6 +61,16 @@ public Node getCurrentNode()
return nodeManager.getCurrentNode();
}

@Override
public Node getSidecarNode()
{
Set<InternalNode> coordinatorSidecars = nodeManager.getCoordinatorSidecars();
if (coordinatorSidecars.isEmpty()) {
throw new PrestoException(NO_CPP_SIDECARS, "Expected exactly one coordinator sidecar, but found none");
}
return coordinatorSidecars.iterator().next();
}

@Override
public String getEnvironment()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.NO_CPP_SIDECARS;
import static com.facebook.presto.spi.StandardErrorCode.TOO_MANY_SIDECARS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.lang.String.format;
Expand Down Expand Up @@ -183,11 +182,7 @@ public boolean hasRequiredCoordinators()
*/
public boolean hasRequiredCoordinatorSidecars()
{
if (currentCoordinatorSidecarCount > 1) {
throw new PrestoException(TOO_MANY_SIDECARS,
format("Expected a single active coordinator sidecar. Found %s active coordinator sidecars", currentCoordinatorSidecarCount));
}
return currentCoordinatorSidecarCount == 1;
return currentCoordinatorSidecarCount > 0;
}

/**
Expand Down Expand Up @@ -257,7 +252,7 @@ public synchronized ListenableFuture<?> waitForMinimumCoordinators()

public synchronized ListenableFuture<?> waitForMinimumCoordinatorSidecars()
{
if (currentCoordinatorSidecarCount == 1 || !isCoordinatorSidecarEnabled) {
if (currentCoordinatorSidecarCount > 0 || !isCoordinatorSidecarEnabled) {
return immediateFuture(null);
}

Expand Down Expand Up @@ -309,7 +304,6 @@ private synchronized void updateAllNodes(AllNodes allNodes)
Set<Node> activeNodes = new HashSet<>(allNodes.getActiveNodes());
activeNodes.removeAll(allNodes.getActiveCoordinators());
activeNodes.removeAll(allNodes.getActiveResourceManagers());
activeNodes.removeAll(allNodes.getActiveCoordinatorSidecars());
currentWorkerCount = activeNodes.size();
}
currentCoordinatorCount = allNodes.getActiveCoordinators().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
public class HandleJsonModule
implements Module
{
private final HandleResolver handleResolver;

public HandleJsonModule()
{
this(null);
}

public HandleJsonModule(HandleResolver handleResolver)
{
this.handleResolver = handleResolver;
}

@Override
public void configure(Binder binder)
{
Expand All @@ -38,6 +50,11 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class);

binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
if (handleResolver == null) {
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
}
else {
binder.bind(HandleResolver.class).toInstance(handleResolver);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nodeManager;

import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;

import java.util.Set;

import static com.facebook.presto.spi.StandardErrorCode.NO_CPP_SIDECARS;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;

/**
* This class simplifies managing Presto's cluster nodes,
* focusing on active workers and coordinators without tying to specific connectors.
*/
public class PluginNodeManager
implements NodeManager
{
private final InternalNodeManager nodeManager;
private final String environment;

@Inject
public PluginNodeManager(InternalNodeManager nodeManager)
{
this.nodeManager = nodeManager;
this.environment = "test";
}

public PluginNodeManager(InternalNodeManager nodeManager, String environment)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.environment = requireNonNull(environment, "environment is null");
}

@Override
public Set<Node> getAllNodes()
{
return ImmutableSet.<Node>builder()
.addAll(getWorkerNodes())
.addAll(nodeManager.getCoordinators())
.build();
}

@Override
public Set<Node> getWorkerNodes()
{
//Retrieves all active worker nodes, excluding coordinators, resource managers, and catalog servers.
return nodeManager.getAllNodes().getActiveNodes().stream()
.filter(node -> !node.isResourceManager() && !node.isCoordinator() && !node.isCatalogServer())
.collect(toImmutableSet());
}

@Override
public Node getCurrentNode()
{
return nodeManager.getCurrentNode();
}

@Override
public Node getSidecarNode()
{
Set<InternalNode> coordinatorSidecars = nodeManager.getCoordinatorSidecars();
if (coordinatorSidecars.isEmpty()) {
throw new PrestoException(NO_CPP_SIDECARS, "Expected exactly one coordinator sidecar, but found none");
}
return coordinatorSidecars.iterator().next();
}

@Override
public String getEnvironment()
{
return environment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
import com.facebook.presto.spi.security.SystemAccessControlFactory;
import com.facebook.presto.spi.session.SessionPropertyConfigurationManagerFactory;
import com.facebook.presto.spi.sql.planner.ExpressionOptimizerFactory;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.storage.TempStorageFactory;
import com.facebook.presto.spi.tracing.TracerProvider;
import com.facebook.presto.spi.ttl.ClusterTtlProviderFactory;
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.tracing.TracerProviderManager;
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager;
Expand Down Expand Up @@ -131,6 +133,7 @@ public class PluginManager
private final AnalyzerProviderManager analyzerProviderManager;
private final QueryPreparerProviderManager queryPreparerProviderManager;
private final NodeStatusNotificationManager nodeStatusNotificationManager;
private final ExpressionOptimizerManager expressionOptimizerManager;

@Inject
public PluginManager(
Expand All @@ -152,7 +155,8 @@ public PluginManager(
ClusterTtlProviderManager clusterTtlProviderManager,
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
TracerProviderManager tracerProviderManager,
NodeStatusNotificationManager nodeStatusNotificationManager)
NodeStatusNotificationManager nodeStatusNotificationManager,
ExpressionOptimizerManager expressionOptimizerManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -184,6 +188,7 @@ public PluginManager(
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
this.expressionOptimizerManager = requireNonNull(expressionOptimizerManager, "expressionManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -356,6 +361,11 @@ public void installCoordinatorPlugin(CoordinatorPlugin plugin)
log.info("Registering function namespace manager %s", functionNamespaceManagerFactory.getName());
metadata.getFunctionAndTypeManager().addFunctionNamespaceFactory(functionNamespaceManagerFactory);
}

for (ExpressionOptimizerFactory batchRowExpressionInterpreterProvider : plugin.getRowExpressionInterpreterServiceFactories()) {
log.info("Registering batch row expression interpreter provider %s", batchRowExpressionInterpreterProvider.getName());
expressionOptimizerManager.addExpressionOptimizerFactory(batchRowExpressionInterpreterProvider);
}
}

private URLClassLoader buildClassLoader(String plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.facebook.presto.server.security.PasswordAuthenticatorManager;
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.storage.TempStorageModule;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void run()
injector.getInstance(TracerProviderManager.class).loadTracerProvider();
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
injector.getInstance(ExpressionOptimizerManager.class).loadExpressions();
startAssociatedProcesses(injector);

injector.getInstance(Announcer.class).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,13 @@
import com.facebook.presto.spi.ConnectorTypeSerde;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.RowExpressionSerde;
import com.facebook.presto.spi.analyzer.ViewDefinition;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.PredicateCompiler;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spiller.FileSingleStreamSpillerFactory;
import com.facebook.presto.spiller.GenericPartitioningSpillerFactory;
Expand Down Expand Up @@ -186,6 +188,8 @@
import com.facebook.presto.sql.analyzer.MetadataExtractorMBean;
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.expressions.JsonCodecRowExpressionSerde;
import com.facebook.presto.sql.gen.ExpressionCompiler;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
Expand Down Expand Up @@ -346,6 +350,10 @@ else if (serverConfig.isCoordinator()) {
binder.bind(SystemSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(SessionPropertyDefaults.class).in(Scopes.SINGLETON);

// expression manager
binder.bind(ExpressionOptimizerManager.class).in(Scopes.SINGLETON);
binder.bind(RowExpressionSerde.class).to(JsonCodecRowExpressionSerde.class).in(Scopes.SINGLETON);

// schema properties
binder.bind(SchemaPropertyManager.class).in(Scopes.SINGLETON);

Expand Down Expand Up @@ -538,6 +546,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
jsonCodecBinder(binder).bindJsonCodec(SqlInvokedFunction.class);
jsonCodecBinder(binder).bindJsonCodec(TaskSource.class);
jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class);
jsonCodecBinder(binder).bindJsonCodec(RowExpression.class);
smileCodecBinder(binder).bindSmileCodec(TaskStatus.class);
smileCodecBinder(binder).bindSmileCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
Expand Down
Loading
Loading