From 4c16c0339a3b60a2002ddf4a1bbb54aede47ab02 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Fri, 30 Jun 2023 15:41:16 +0800 Subject: [PATCH 1/4] [GIE Compiler] unify timeout in gremlin stack --- .../compiler/conf/ir.compiler.properties | 3 ++ .../common/client/ExecutionClient.java | 6 ++- .../common/client/HttpExecutionClient.java | 8 ++- .../common/client/RpcExecutionClient.java | 17 +++--- .../common/config/FrontendConfig.java | 3 ++ .../common/config/QueryTimeoutConfig.java | 54 +++++++++++++++++++ .../cypher/executor/GraphPlanExecution.java | 10 +++- .../cypher/executor/GraphQueryExecutor.java | 9 +++- .../cypher/result/CypherRecordProcessor.java | 7 ++- .../processor/IrTestOpProcessor.java | 4 +- .../processor/IrStandardOpProcessor.java | 26 ++++----- .../gremlin/service/IrGremlinServer.java | 1 + .../com/alibaba/pegasus/ClientExample.java | 5 +- .../java/com/alibaba/pegasus/RpcClient.java | 8 ++- 14 files changed, 123 insertions(+), 38 deletions(-) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java diff --git a/interactive_engine/compiler/conf/ir.compiler.properties b/interactive_engine/compiler/conf/ir.compiler.properties index 743d33519bba..4885f4732f1a 100644 --- a/interactive_engine/compiler/conf/ir.compiler.properties +++ b/interactive_engine/compiler/conf/ir.compiler.properties @@ -32,3 +32,6 @@ graph.planner: {"isOn":true,"opt":"RBO","rules":["FilterMatchRule"]} # neo4j.bolt.server.disabled: true # set neo4j server port if neo4j server is enabled # neo4j.bolt.server.port: 7687 + +# set timeout in system config, can be overridden by session config per query +# query.execution.timeout.ms: 3000000 diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java index 130b64568d6a..2fe515709b8d 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java @@ -20,6 +20,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionRequest; import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; /** * client to submit request to remote engine service @@ -33,7 +34,10 @@ public ExecutionClient(ChannelFetcher channelFetcher) { this.channelFetcher = channelFetcher; } - public abstract void submit(ExecutionRequest request, ExecutionResponseListener listener) + public abstract void submit( + ExecutionRequest request, + ExecutionResponseListener listener, + QueryTimeoutConfig timeoutConfig) throws Exception; public abstract void close() throws Exception; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index feaece1e25be..c6c4fc86c716 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.HiactorConfig; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.common.ir.tools.LogicalPlan; import com.alibaba.graphscope.gaia.proto.IrResult; import com.google.common.collect.Lists; @@ -36,6 +37,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * http client to send request to hqps engine service @@ -58,7 +60,10 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher channelFetch } @Override - public void submit(ExecutionRequest request, ExecutionResponseListener listener) + public void submit( + ExecutionRequest request, + ExecutionResponseListener listener, + QueryTimeoutConfig timeoutConfig) throws Exception { List responseFutures = Lists.newArrayList(); for (URI httpURI : channelFetcher.fetch()) { @@ -73,6 +78,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) CompletableFuture> responseFuture = httpClient .sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) + .orTimeout(timeoutConfig.getChannelTimeoutMS(), TimeUnit.MILLISECONDS) .whenComplete( (bytes, exception) -> { if (exception != null) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java index 1833c2fb98b4..83eda4c3de2e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.PegasusConfig; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.pegasus.RpcChannel; import com.alibaba.pegasus.RpcClient; @@ -46,14 +47,13 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher channe } @Override - public void submit(ExecutionRequest request, ExecutionResponseListener listener) + public void submit( + ExecutionRequest request, + ExecutionResponseListener listener, + QueryTimeoutConfig timeoutConfig) throws Exception { if (rpcClientRef.get() == null) { - rpcClientRef.compareAndSet( - null, - new RpcClient( - PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(graphConfig), - channelFetcher.fetch())); + rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch())); } RpcClient rpcClient = rpcClientRef.get(); PegasusClient.JobRequest jobRequest = @@ -68,7 +68,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) .setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(graphConfig)) .setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(graphConfig)) .setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(graphConfig)) - .setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(graphConfig)) + .setTimeLimit(timeoutConfig.getEngineTimeoutMS()) .setAll( com.alibaba.pegasus.service.protocol.PegasusClient.Empty .newBuilder() @@ -97,7 +97,8 @@ public void finish() { public void error(Status status) { listener.onError(status.asException()); } - }); + }, + timeoutConfig.getChannelTimeoutMS()); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java index 6eb5ba5acca8..b5470e07ed15 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java @@ -29,5 +29,8 @@ public class FrontendConfig { public static final Config NEO4J_BOLT_SERVER_PORT = Config.intConfig("neo4j.bolt.server.port", 7687); + public static final Config QUERY_EXECUTION_TIMEOUT_MS = + Config.intConfig("query.execution.timeout.ms", 3000000); + public static final Config ENGINE_TYPE = Config.stringConfig("engine.type", "pegasus"); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java new file mode 100644 index 000000000000..e89946bedc1d --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.config; + +public class QueryTimeoutConfig { + private final long executionTimeoutMS; + private final long channelTimeoutMS; + private final long engineTimeoutMS; + private static final double GRADUAL_FACTOR = 0.1d; + + public QueryTimeoutConfig(long executionTimeoutMS) { + this.executionTimeoutMS = executionTimeoutMS; + this.channelTimeoutMS = (long) (executionTimeoutMS * (1 - GRADUAL_FACTOR)); + this.engineTimeoutMS = (long) (executionTimeoutMS * (1 - 2 * GRADUAL_FACTOR)); + } + + public long getExecutionTimeoutMS() { + return executionTimeoutMS; + } + + public long getChannelTimeoutMS() { + return channelTimeoutMS; + } + + public long getEngineTimeoutMS() { + return engineTimeoutMS; + } + + @Override + public String toString() { + return "QueryTimeoutConfig{" + + "executionTimeoutMS=" + + executionTimeoutMS + + ", channelTimeoutMS=" + + channelTimeoutMS + + ", engineTimeoutMS=" + + engineTimeoutMS + + '}'; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphPlanExecution.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphPlanExecution.java index 8bbee4c4505a..0aa4a8d74e19 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphPlanExecution.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphPlanExecution.java @@ -18,6 +18,7 @@ import com.alibaba.graphscope.common.client.ExecutionClient; import com.alibaba.graphscope.common.client.type.ExecutionRequest; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.common.ir.tools.AliasInference; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.common.ir.tools.LogicalPlan; @@ -41,10 +42,15 @@ public class GraphPlanExecution implements StatementResults.SubscribableExecution { private final ExecutionClient client; private final GraphPlanner.Summary planSummary; + private final QueryTimeoutConfig timeoutConfig; - public GraphPlanExecution(ExecutionClient client, GraphPlanner.Summary planSummary) { + public GraphPlanExecution( + ExecutionClient client, + GraphPlanner.Summary planSummary, + QueryTimeoutConfig timeoutConfig) { this.client = client; this.planSummary = planSummary; + this.timeoutConfig = timeoutConfig; } @Override @@ -60,7 +66,7 @@ public QueryExecution subscribe(QuerySubscriber querySubscriber) { new CypherRecordProcessor( new CypherRecordParser(getOutputType(planSummary.getLogicalPlan())), querySubscriber); - this.client.submit(request, recordProcessor); + this.client.submit(request, recordProcessor, timeoutConfig); return recordProcessor; } catch (Exception e) { throw new RuntimeException(e); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index c00e9faef8fc..e1dbd80dc4b1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -19,6 +19,7 @@ import com.alibaba.graphscope.common.antlr4.Antlr4Parser; import com.alibaba.graphscope.common.client.ExecutionClient; import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.common.ir.runtime.PhysicalBuilder; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; @@ -53,6 +54,7 @@ public class GraphQueryExecutor extends FabricExecutor { private final ExecutionClient client; private final GraphPlanner graphPlanner; + private final FabricConfig fabricConfig; public GraphQueryExecutor( FabricConfig config, @@ -75,6 +77,7 @@ public GraphQueryExecutor( internalLog, statementLifecycles, fabricWorkerExecutor); + this.fabricConfig = config; this.graphConfig = graphConfig; this.antlr4Parser = antlr4Parser; this.graphPlanner = graphPlanner; @@ -118,7 +121,7 @@ public StatementResult run( } QuerySubject querySubject = new QuerySubject.BasicQuerySubject(); StatementResults.SubscribableExecution execution = - new GraphPlanExecution(this.client, planSummary); + new GraphPlanExecution(this.client, planSummary, getQueryTimeoutConfig()); metaQueryCallback.afterExec(irMeta); StatementResult result = StatementResults.connectVia(execution, querySubject); return result; @@ -131,4 +134,8 @@ public StatementResult run( } } } + + private QueryTimeoutConfig getQueryTimeoutConfig() { + return new QueryTimeoutConfig(fabricConfig.getTransactionTimeout().toMillis()); + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java index dc3b40f3de98..edb454c88fa7 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java @@ -96,11 +96,14 @@ public void request(long l) throws Exception { } @Override - public void cancel() {} + public void cancel() { + this.recordIterator.close(); + } @Override public boolean await() throws Exception { - return this.recordIterator.hasNext(); + boolean hasNext = this.recordIterator.hasNext(); + return hasNext; } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java index 5d1e349a812b..f7c9a69b2fc2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java @@ -18,6 +18,7 @@ import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.store.IrMeta; @@ -101,7 +102,8 @@ public ThrowingConsumer select(Context ctx) { ctx, traversal, testGraph, this.configs), jobId, script, - irMeta); + irMeta, + new QueryTimeoutConfig(ctx.getRequestTimeout())); metaQueryCallback.afterExec(irMeta); }); return op; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index fa95a3b4b00d..514a77689d45 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -29,6 +29,7 @@ import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.PegasusConfig; +import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.common.intermediate.InterOpCollection; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; @@ -58,7 +59,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies; import org.apache.tinkerpop.gremlin.server.Context; -import org.apache.tinkerpop.gremlin.server.Settings; import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor; import org.apache.tinkerpop.gremlin.server.op.OpProcessorException; import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor; @@ -105,8 +105,7 @@ public IrStandardOpProcessor( this.graph = graph; this.g = g; this.configs = configs; - this.rpcClient = - new RpcClient(PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(configs), fetcher.fetch()); + this.rpcClient = new RpcClient(fetcher.fetch()); this.metaQueryCallback = metaQueryCallback; this.graphPlanner = graphPlanner; } @@ -260,15 +259,10 @@ protected GremlinExecutor.LifeCycle createLifeCycle( long jobId, String script, IrMeta irMeta) { - final RequestMessage msg = ctx.getRequestMessage(); - final Settings settings = ctx.getSettings(); - final Map args = msg.getArgs(); - long seto = - args.containsKey("evaluationTimeout") - ? ((Number) args.get("evaluationTimeout")).longValue() - : settings.getEvaluationTimeout(); + QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout()); + logger.info("query execution timeout is {}", timeoutConfig); return GremlinExecutor.LifeCycle.build() - .evaluationTimeoutOverride(seto) + .evaluationTimeoutOverride(timeoutConfig.getExecutionTimeoutMS()) .beforeEval( b -> { try { @@ -296,7 +290,8 @@ protected GremlinExecutor.LifeCycle createLifeCycle( new GremlinResultProcessor(ctx, traversal), jobId, script, - irMeta); + irMeta, + timeoutConfig); } } catch (Exception e) { throw new RuntimeException(e); @@ -311,7 +306,8 @@ protected void processTraversal( ResultProcessor resultProcessor, long jobId, String script, - IrMeta irMeta) + IrMeta irMeta, + QueryTimeoutConfig timeoutConfig) throws InvalidProtocolBufferException, IOException, RuntimeException { InterOpCollection opCollection = (new InterOpCollectionBuilder(traversal)).build(); // fuse order with limit to topK @@ -342,11 +338,11 @@ protected void processTraversal( .setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(configs)) .setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(configs)) .setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(configs)) - .setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(configs)) + .setTimeLimit(timeoutConfig.getEngineTimeoutMS()) .setAll(PegasusClient.Empty.newBuilder().build()) .build(); request = request.toBuilder().setConf(jobConfig).build(); - this.rpcClient.submit(request, resultProcessor); + this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS()); } public static void applyStrategies(Traversal traversal) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java index ce12ec3e2461..4a33ab6a5712 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java @@ -72,6 +72,7 @@ public IrGremlinServer( if (port >= 0) { this.settings.port = port; } + this.settings.evaluationTimeout = FrontendConfig.QUERY_EXECUTION_TIMEOUT_MS.get(configs); this.graph = TinkerFactory.createModern(); this.g = this.graph.traversal(IrCustomizedTraversalSource.class); } diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java index 5e05a311f697..f19c63fa30fd 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java @@ -99,7 +99,7 @@ public static void main(String[] args) throws Exception { List channels = new ArrayList<>(); channels.add(rpcChannel0); channels.add(rpcChannel1); - RpcClient rpcClient = new RpcClient(600000, channels); + RpcClient rpcClient = new RpcClient(channels); logger.info("Will try to send request"); JobConfig confPb = @@ -148,7 +148,8 @@ public void finish() { public void error(Status status) { resultIterator.fail(status.getCause()); } - }); + }, + 600000); rpcClient.shutdown(); } } diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java index 088552af7cbd..2cf8d5a6e479 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java @@ -39,10 +39,8 @@ public class RpcClient { private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); private final List channels; private final List serviceStubs; - private final long rpcTimeout; - public RpcClient(long grpcTimeout, List channels) { - this.rpcTimeout = grpcTimeout; + public RpcClient(List channels) { this.channels = Objects.requireNonNull(channels); this.serviceStubs = channels.stream() @@ -50,13 +48,13 @@ public RpcClient(long grpcTimeout, List channels) { .collect(Collectors.toList()); } - public void submit(JobRequest jobRequest, ResultProcessor processor) { + public void submit(JobRequest jobRequest, ResultProcessor processor, long rpcTimeoutMS) { AtomicInteger counter = new AtomicInteger(this.channels.size()); AtomicBoolean finished = new AtomicBoolean(false); serviceStubs.forEach( asyncStub -> { asyncStub - .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS) + .withDeadlineAfter(rpcTimeoutMS, TimeUnit.MILLISECONDS) .submit( jobRequest, new JobResponseObserver(processor, finished, counter)); From 7901dd7776bd0849e28c6cb41305c400a5ee88a8 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Sun, 2 Jul 2023 20:12:10 +0800 Subject: [PATCH 2/4] [GIE Compiler] minor fix --- .../compiler/src/main/antlr4/GremlinGS.g4 | 11 ++++++++++- .../common/config/QueryTimeoutConfig.java | 17 ++++++++++------- .../cypher/result/CypherRecordProcessor.java | 3 +-- .../antlr4/GraphTraversalSourceVisitor.java | 4 ++-- .../gremlin/antlr4/TraversalMethodVisitor.java | 4 +++- .../plugin/processor/IrStandardOpProcessor.java | 1 - 6 files changed, 26 insertions(+), 14 deletions(-) diff --git a/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 b/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 index 5a91a53cc65c..be94785038ce 100644 --- a/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 +++ b/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 @@ -25,8 +25,10 @@ query ; // g +// g.with(ARGS_EVAL_TIMEOUT, 2000L) +// g.with(Tokens.ARGS_EVAL_TIMEOUT, 2000L) traversalSource - : TRAVERSAL_ROOT + : TRAVERSAL_ROOT (DOT traversalMethod_with) ? ; // g.rootTraversal() @@ -165,8 +167,15 @@ traversalMethod_bothE // with('PATH_OPT', 'SIMPLE' | 'ARBITRARY') // with('RESULT_OPT', 'ALL_V' | 'END_V') // with('UNTIL', expression) +// with('ARGS_EVAL_TIMEOUT', 2000L) // set evaluation timeout to 2 seconds +// with('Tokens.ARGS_EVAL_TIMEOUT', 2000L) // set evaluation timeout to 2 seconds traversalMethod_with : 'with' LPAREN stringLiteral COMMA stringLiteral RPAREN + | 'with' LPAREN evaluationTimeoutKey COMMA integerLiteral RPAREN + ; + +evaluationTimeoutKey + : 'ARGS_EVAL_TIMEOUT' | 'Tokens.ARGS_EVAL_TIMEOUT' ; // outV() diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java index e89946bedc1d..887c647fc101 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java @@ -17,15 +17,18 @@ package com.alibaba.graphscope.common.config; public class QueryTimeoutConfig { - private final long executionTimeoutMS; - private final long channelTimeoutMS; + // timeout in milliseconds for engine execution private final long engineTimeoutMS; - private static final double GRADUAL_FACTOR = 0.1d; + // timeout in milliseconds for channel communication + private final long channelTimeoutMS; + // timeout in milliseconds for total query execution + private final long executionTimeoutMS; + private static final double GRADUAL_FACTOR = 0.0d; - public QueryTimeoutConfig(long executionTimeoutMS) { - this.executionTimeoutMS = executionTimeoutMS; - this.channelTimeoutMS = (long) (executionTimeoutMS * (1 - GRADUAL_FACTOR)); - this.engineTimeoutMS = (long) (executionTimeoutMS * (1 - 2 * GRADUAL_FACTOR)); + public QueryTimeoutConfig(long engineTimeoutMS) { + this.engineTimeoutMS = engineTimeoutMS; + this.channelTimeoutMS = (long) (engineTimeoutMS * (1 + GRADUAL_FACTOR)); + this.executionTimeoutMS = (long) (engineTimeoutMS * (1 + 2 * GRADUAL_FACTOR)); } public long getExecutionTimeoutMS() { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java index edb454c88fa7..34051c931d2f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java @@ -102,8 +102,7 @@ public void cancel() { @Override public boolean await() throws Exception { - boolean hasNext = this.recordIterator.hasNext(); - return hasNext; + return this.recordIterator.hasNext(); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/GraphTraversalSourceVisitor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/GraphTraversalSourceVisitor.java index 89b1ae238d89..4d2929281f2a 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/GraphTraversalSourceVisitor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/GraphTraversalSourceVisitor.java @@ -31,9 +31,9 @@ public GraphTraversalSourceVisitor(GraphTraversalSource g) { @Override public GraphTraversalSource visitTraversalSource(GremlinGSParser.TraversalSourceContext ctx) { - if (ctx.getChildCount() != 1) { + if (ctx.getChildCount() > 3) { throw new UnsupportedEvalException( - ctx.getClass(), "supported pattern of source is [g]"); + ctx.getClass(), "supported pattern of source is [g] or [g.with(..)]"); } return g; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java index 6e3bf903cc1b..697bd9aebee3 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java @@ -756,7 +756,9 @@ public Traversal visitTraversalMethod_with(GremlinGSParser.TraversalMethod_withC Step endStep = graphTraversal.asAdmin().getEndStep(); if (!(endStep instanceof PathExpandStep)) { throw new UnsupportedEvalException( - ctx.getClass(), "with should follow path expand, i.e. out('1..2').with(..)"); + ctx.getClass(), + "with should follow source or path expand, i.e. g.with(..) or" + + " out('1..2').with(..)"); } String optKey = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral(0)); String optValue = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral(1)); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 514a77689d45..66756491fd16 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -260,7 +260,6 @@ protected GremlinExecutor.LifeCycle createLifeCycle( String script, IrMeta irMeta) { QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout()); - logger.info("query execution timeout is {}", timeoutConfig); return GremlinExecutor.LifeCycle.build() .evaluationTimeoutOverride(timeoutConfig.getExecutionTimeoutMS()) .beforeEval( From 1dff84c4743998aba03d379d4d1c5f9d922873b5 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Mon, 3 Jul 2023 13:05:45 +0800 Subject: [PATCH 3/4] [GIE Compiler] support with('evaluationTimeout', ..) in gremlin grammar --- .../compiler/src/main/antlr4/GremlinGS.g4 | 10 ++++++++-- .../gremlin/antlr4/TraversalMethodVisitor.java | 5 +++-- .../plugin/processor/IrStandardOpProcessor.java | 4 ++++ .../result/processor/AbstractResultProcessor.java | 4 ++++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 b/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 index be94785038ce..a0fc2e78aa7e 100644 --- a/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 +++ b/interactive_engine/compiler/src/main/antlr4/GremlinGS.g4 @@ -27,6 +27,7 @@ query // g // g.with(ARGS_EVAL_TIMEOUT, 2000L) // g.with(Tokens.ARGS_EVAL_TIMEOUT, 2000L) +// g.with('evaluationTimeout', 2000L) traversalSource : TRAVERSAL_ROOT (DOT traversalMethod_with) ? ; @@ -169,15 +170,20 @@ traversalMethod_bothE // with('UNTIL', expression) // with('ARGS_EVAL_TIMEOUT', 2000L) // set evaluation timeout to 2 seconds // with('Tokens.ARGS_EVAL_TIMEOUT', 2000L) // set evaluation timeout to 2 seconds +// with('evaluationTimeout', 2000L) // set evaluation timeout to 2 seconds traversalMethod_with - : 'with' LPAREN stringLiteral COMMA stringLiteral RPAREN - | 'with' LPAREN evaluationTimeoutKey COMMA integerLiteral RPAREN + : 'with' LPAREN stringLiteral COMMA genericLiteral RPAREN + | 'with' LPAREN evaluationTimeoutKey COMMA evaluationTimeoutValue RPAREN ; evaluationTimeoutKey : 'ARGS_EVAL_TIMEOUT' | 'Tokens.ARGS_EVAL_TIMEOUT' ; +evaluationTimeoutValue + : integerLiteral + ; + // outV() traversalMethod_outV : 'outV' LPAREN RPAREN diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java index 697bd9aebee3..e806ae45579e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/antlr4/TraversalMethodVisitor.java @@ -760,8 +760,9 @@ public Traversal visitTraversalMethod_with(GremlinGSParser.TraversalMethod_withC "with should follow source or path expand, i.e. g.with(..) or" + " out('1..2').with(..)"); } - String optKey = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral(0)); - String optValue = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral(1)); + String optKey = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral()); + Object optValue = + GenericLiteralVisitor.getInstance().visitGenericLiteral(ctx.genericLiteral()); return graphTraversal.with(optKey, optValue); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 66756491fd16..82c1309a5c86 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -40,6 +40,7 @@ import com.alibaba.graphscope.gremlin.plugin.strategy.ExpandFusionStepStrategy; import com.alibaba.graphscope.gremlin.plugin.strategy.RemoveUselessStepStrategy; import com.alibaba.graphscope.gremlin.plugin.strategy.ScanFusionStepStrategy; +import com.alibaba.graphscope.gremlin.result.processor.AbstractResultProcessor; import com.alibaba.graphscope.gremlin.result.processor.GremlinResultProcessor; import com.alibaba.pegasus.RpcClient; import com.alibaba.pegasus.intf.ResultProcessor; @@ -149,6 +150,9 @@ protected void evalOpInternal( elapsed / 1000000.0f, startTime); if (t != null) { + if (v instanceof AbstractResultProcessor) { + ((AbstractResultProcessor) v).cancel(); + } Optional possibleTemporaryException = determineIfTemporaryException(t); if (possibleTemporaryException.isPresent()) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index 56630bebf540..2906b1557412 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -118,6 +118,10 @@ public void error(Status status) { } } + public synchronized void cancel() { + this.locked = true; + } + protected abstract void aggregateResults(); protected void writeResultList( From 3eea7e3f205b1c3f31d8a1fe3463335e33b5bf95 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Mon, 3 Jul 2023 13:07:35 +0800 Subject: [PATCH 4/4] [GIE Compiler] minor fix --- .../alibaba/graphscope/common/config/QueryTimeoutConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java index 887c647fc101..e6be8481862f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/QueryTimeoutConfig.java @@ -23,7 +23,7 @@ public class QueryTimeoutConfig { private final long channelTimeoutMS; // timeout in milliseconds for total query execution private final long executionTimeoutMS; - private static final double GRADUAL_FACTOR = 0.0d; + private static final double GRADUAL_FACTOR = 0.1d; public QueryTimeoutConfig(long engineTimeoutMS) { this.engineTimeoutMS = engineTimeoutMS;