Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GIE Compiler] Unify Gremlin Timeout Configurations #2953

Merged
merged 5 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions interactive_engine/compiler/conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ graph.planner: {"isOn":true,"opt":"RBO","rules":["FilterMatchRule"]}
# neo4j.bolt.server.disabled: true
# set neo4j server port if neo4j server is enabled
# neo4j.bolt.server.port: 7687

# set timeout in system config, can be overridden by session config per query
# query.execution.timeout.ms: 3000000
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;

/**
* client to submit request to remote engine service
Expand All @@ -33,7 +34,10 @@ public ExecutionClient(ChannelFetcher<C> channelFetcher) {
this.channelFetcher = channelFetcher;
}

public abstract void submit(ExecutionRequest request, ExecutionResponseListener listener)
public abstract void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
throws Exception;

public abstract void close() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.HiactorConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.tools.LogicalPlan;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.google.common.collect.Lists;
Expand All @@ -36,6 +37,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* http client to send request to hqps engine service
Expand All @@ -58,7 +60,10 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher<URI> channelFetch
}

@Override
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
public void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
throws Exception {
List<CompletableFuture> responseFutures = Lists.newArrayList();
for (URI httpURI : channelFetcher.fetch()) {
Expand All @@ -73,6 +78,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener)
CompletableFuture<HttpResponse<byte[]>> responseFuture =
httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.orTimeout(timeoutConfig.getChannelTimeoutMS(), TimeUnit.MILLISECONDS)
.whenComplete(
(bytes, exception) -> {
if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
Expand All @@ -46,14 +47,13 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channe
}

@Override
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
public void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
throws Exception {
if (rpcClientRef.get() == null) {
rpcClientRef.compareAndSet(
null,
new RpcClient(
PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(graphConfig),
channelFetcher.fetch()));
rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch()));
}
RpcClient rpcClient = rpcClientRef.get();
PegasusClient.JobRequest jobRequest =
Expand All @@ -68,7 +68,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener)
.setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(graphConfig))
.setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(graphConfig))
.setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(graphConfig))
.setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(graphConfig))
.setTimeLimit(timeoutConfig.getEngineTimeoutMS())
.setAll(
com.alibaba.pegasus.service.protocol.PegasusClient.Empty
.newBuilder()
Expand Down Expand Up @@ -97,7 +97,8 @@ public void finish() {
public void error(Status status) {
listener.onError(status.asException());
}
});
},
timeoutConfig.getChannelTimeoutMS());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ public class FrontendConfig {
public static final Config<Integer> NEO4J_BOLT_SERVER_PORT =
Config.intConfig("neo4j.bolt.server.port", 7687);

public static final Config<Integer> QUERY_EXECUTION_TIMEOUT_MS =
Config.intConfig("query.execution.timeout.ms", 3000000);

public static final Config<String> ENGINE_TYPE = Config.stringConfig("engine.type", "pegasus");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.config;

public class QueryTimeoutConfig {
private final long executionTimeoutMS;
private final long channelTimeoutMS;
private final long engineTimeoutMS;
private static final double GRADUAL_FACTOR = 0.1d;

public QueryTimeoutConfig(long executionTimeoutMS) {
this.executionTimeoutMS = executionTimeoutMS;
this.channelTimeoutMS = (long) (executionTimeoutMS * (1 - GRADUAL_FACTOR));
this.engineTimeoutMS = (long) (executionTimeoutMS * (1 - 2 * GRADUAL_FACTOR));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the three time align. Should be fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

public long getExecutionTimeoutMS() {
return executionTimeoutMS;
}

public long getChannelTimeoutMS() {
return channelTimeoutMS;
}

public long getEngineTimeoutMS() {
return engineTimeoutMS;
}

@Override
public String toString() {
return "QueryTimeoutConfig{"
+ "executionTimeoutMS="
+ executionTimeoutMS
+ ", channelTimeoutMS="
+ channelTimeoutMS
+ ", engineTimeoutMS="
+ engineTimeoutMS
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.tools.AliasInference;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.ir.tools.LogicalPlan;
Expand All @@ -41,10 +42,15 @@
public class GraphPlanExecution<C> implements StatementResults.SubscribableExecution {
private final ExecutionClient<C> client;
private final GraphPlanner.Summary planSummary;
private final QueryTimeoutConfig timeoutConfig;

public GraphPlanExecution(ExecutionClient<C> client, GraphPlanner.Summary planSummary) {
public GraphPlanExecution(
ExecutionClient<C> client,
GraphPlanner.Summary planSummary,
QueryTimeoutConfig timeoutConfig) {
this.client = client;
this.planSummary = planSummary;
this.timeoutConfig = timeoutConfig;
}

@Override
Expand All @@ -60,7 +66,7 @@ public QueryExecution subscribe(QuerySubscriber querySubscriber) {
new CypherRecordProcessor(
new CypherRecordParser(getOutputType(planSummary.getLogicalPlan())),
querySubscriber);
this.client.submit(request, recordProcessor);
this.client.submit(request, recordProcessor, timeoutConfig);
return recordProcessor;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.graphscope.common.antlr4.Antlr4Parser;
import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.runtime.PhysicalBuilder;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class GraphQueryExecutor extends FabricExecutor {
private final ExecutionClient client;

private final GraphPlanner graphPlanner;
private final FabricConfig fabricConfig;

public GraphQueryExecutor(
FabricConfig config,
Expand All @@ -75,6 +77,7 @@ public GraphQueryExecutor(
internalLog,
statementLifecycles,
fabricWorkerExecutor);
this.fabricConfig = config;
this.graphConfig = graphConfig;
this.antlr4Parser = antlr4Parser;
this.graphPlanner = graphPlanner;
Expand Down Expand Up @@ -118,7 +121,7 @@ public StatementResult run(
}
QuerySubject querySubject = new QuerySubject.BasicQuerySubject();
StatementResults.SubscribableExecution execution =
new GraphPlanExecution(this.client, planSummary);
new GraphPlanExecution(this.client, planSummary, getQueryTimeoutConfig());
metaQueryCallback.afterExec(irMeta);
StatementResult result = StatementResults.connectVia(execution, querySubject);
return result;
Expand All @@ -131,4 +134,8 @@ public StatementResult run(
}
}
}

private QueryTimeoutConfig getQueryTimeoutConfig() {
return new QueryTimeoutConfig(fabricConfig.getTransactionTimeout().toMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ public void request(long l) throws Exception {
}

@Override
public void cancel() {}
public void cancel() {
this.recordIterator.close();
}

@Override
public boolean await() throws Exception {
return this.recordIterator.hasNext();
boolean hasNext = this.recordIterator.hasNext();
return hasNext;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
import com.alibaba.graphscope.common.store.IrMeta;
Expand Down Expand Up @@ -101,7 +102,8 @@ public ThrowingConsumer<Context> select(Context ctx) {
ctx, traversal, testGraph, this.configs),
jobId,
script,
irMeta);
irMeta,
new QueryTimeoutConfig(ctx.getRequestTimeout()));
metaQueryCallback.afterExec(irMeta);
});
return op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.intermediate.InterOpCollection;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
Expand Down Expand Up @@ -105,8 +105,7 @@ public IrStandardOpProcessor(
this.graph = graph;
this.g = g;
this.configs = configs;
this.rpcClient =
new RpcClient(PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(configs), fetcher.fetch());
this.rpcClient = new RpcClient(fetcher.fetch());
this.metaQueryCallback = metaQueryCallback;
this.graphPlanner = graphPlanner;
}
Expand Down Expand Up @@ -260,15 +259,10 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
long jobId,
String script,
IrMeta irMeta) {
final RequestMessage msg = ctx.getRequestMessage();
final Settings settings = ctx.getSettings();
final Map<String, Object> args = msg.getArgs();
long seto =
args.containsKey("evaluationTimeout")
? ((Number) args.get("evaluationTimeout")).longValue()
: settings.getEvaluationTimeout();
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
logger.info("query execution timeout is {}", timeoutConfig);
return GremlinExecutor.LifeCycle.build()
.evaluationTimeoutOverride(seto)
.evaluationTimeoutOverride(timeoutConfig.getExecutionTimeoutMS())
.beforeEval(
b -> {
try {
Expand Down Expand Up @@ -296,7 +290,8 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
new GremlinResultProcessor(ctx, traversal),
jobId,
script,
irMeta);
irMeta,
timeoutConfig);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -311,7 +306,8 @@ protected void processTraversal(
ResultProcessor resultProcessor,
long jobId,
String script,
IrMeta irMeta)
IrMeta irMeta,
QueryTimeoutConfig timeoutConfig)
throws InvalidProtocolBufferException, IOException, RuntimeException {
InterOpCollection opCollection = (new InterOpCollectionBuilder(traversal)).build();
// fuse order with limit to topK
Expand Down Expand Up @@ -342,11 +338,11 @@ protected void processTraversal(
.setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(configs))
.setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(configs))
.setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(configs))
.setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(configs))
.setTimeLimit(timeoutConfig.getEngineTimeoutMS())
.setAll(PegasusClient.Empty.newBuilder().build())
.build();
request = request.toBuilder().setConf(jobConfig).build();
this.rpcClient.submit(request, resultProcessor);
this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS());
}

public static void applyStrategies(Traversal traversal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public IrGremlinServer(
if (port >= 0) {
this.settings.port = port;
}
this.settings.evaluationTimeout = FrontendConfig.QUERY_EXECUTION_TIMEOUT_MS.get(configs);
this.graph = TinkerFactory.createModern();
this.g = this.graph.traversal(IrCustomizedTraversalSource.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void main(String[] args) throws Exception {
List<RpcChannel> channels = new ArrayList<>();
channels.add(rpcChannel0);
channels.add(rpcChannel1);
RpcClient rpcClient = new RpcClient(600000, channels);
RpcClient rpcClient = new RpcClient(channels);

logger.info("Will try to send request");
JobConfig confPb =
Expand Down Expand Up @@ -148,7 +148,8 @@ public void finish() {
public void error(Status status) {
resultIterator.fail(status.getCause());
}
});
},
600000);
rpcClient.shutdown();
}
}
Loading