diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 522e00a37d74..90dbcecf3706 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -72,11 +72,13 @@ import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SubqueryGuardrailHelper; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; +import org.apache.druid.utils.JvmUtils; import org.hamcrest.core.IsInstanceOf; import org.joda.time.Interval; import org.junit.Assert; @@ -388,7 +390,7 @@ public void emit(Event event) serverConfig, null, new CacheConfig(), - null, + new SubqueryGuardrailHelper(null, JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(), 1), new SubqueryCountStatsProvider() ); diff --git a/server/pom.xml b/server/pom.xml index c2b8269cf2a3..e3748b8c5994 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -443,6 +443,8 @@ org/apache/druid/server/QueryResponse.class org/apache/druid/curator/CuratorModule.class + + org/apache/druid/server/SubqueryGuardrailHelperProvider.class diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 0e0868813625..80c6d0a86af0 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -58,7 +58,6 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinableFactory; @@ -120,7 +119,7 @@ public ClientQuerySegmentWalker( ServerConfig serverConfig, Cache cache, CacheConfig cacheConfig, - LookupExtractorFactoryContainerProvider lookupManager, + SubqueryGuardrailHelper subqueryGuardrailHelper, SubqueryCountStatsProvider subqueryStatsProvider ) { @@ -134,11 +133,7 @@ public ClientQuerySegmentWalker( this.serverConfig = serverConfig; this.cache = cache; this.cacheConfig = cacheConfig; - this.subqueryGuardrailHelper = new SubqueryGuardrailHelper( - lookupManager, - Runtime.getRuntime().maxMemory(), - serverConfig.getNumThreads() - ); + this.subqueryGuardrailHelper = subqueryGuardrailHelper; this.subqueryStatsProvider = subqueryStatsProvider; } @@ -154,7 +149,7 @@ public ClientQuerySegmentWalker( ServerConfig serverConfig, Cache cache, CacheConfig cacheConfig, - LookupExtractorFactoryContainerProvider lookupManager, + SubqueryGuardrailHelper subqueryGuardrailHelper, SubqueryCountStatsProvider subqueryStatsProvider ) { @@ -169,7 +164,7 @@ public ClientQuerySegmentWalker( serverConfig, cache, cacheConfig, - lookupManager, + subqueryGuardrailHelper, subqueryStatsProvider ); } diff --git a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java index 88845ef955ea..faa97a802e60 100644 --- a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java +++ b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java @@ -48,11 +48,11 @@ public class SubqueryGuardrailHelper public SubqueryGuardrailHelper( final LookupExtractorFactoryContainerProvider lookupManager, final long maxMemoryInJvm, - final int brokerNumHttpConnections + final int maxConcurrentQueries ) { final DateTime start = DateTimes.nowUtc(); - autoLimitBytes = computeLimitBytesForAuto(lookupManager, maxMemoryInJvm, brokerNumHttpConnections); + autoLimitBytes = computeLimitBytesForAuto(lookupManager, maxMemoryInJvm, maxConcurrentQueries); final long startupTimeMs = new Interval(start, DateTimes.nowUtc()).toDurationMillis(); log.info("Took [%d] ms to initialize the SubqueryGuardrailHelper.", startupTimeMs); @@ -114,13 +114,13 @@ public long convertSubqueryLimitStringToLong(final String maxSubqueryLimit) private static long computeLimitBytesForAuto( final LookupExtractorFactoryContainerProvider lookupManager, final long maxMemoryInJvm, - final int brokerNumHttpConnections + final int maxConcurrentQueries ) { long memoryInJvmWithoutLookups = maxMemoryInJvm - computeLookupFootprint(lookupManager); long memoryInJvmForSubqueryResultsInlining = (long) (memoryInJvmWithoutLookups * SUBQUERY_MEMORY_BYTES_FRACTION); - long memoryInJvmForSubqueryResultsInliningPerQuery = memoryInJvmForSubqueryResultsInlining - / brokerNumHttpConnections; + long memoryInJvmForSubqueryResultsInliningPerQuery = + memoryInJvmForSubqueryResultsInlining / maxConcurrentQueries; return Math.max(memoryInJvmForSubqueryResultsInliningPerQuery, 1L); } diff --git a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java new file mode 100644 index 000000000000..ddbc0790084b --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelperProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.utils.JvmUtils; + +public class SubqueryGuardrailHelperProvider implements Provider +{ + private final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider; + private final ServerConfig serverConfig; + private final QuerySchedulerConfig querySchedulerConfig; + + @Inject + public SubqueryGuardrailHelperProvider( + LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider, + ServerConfig serverConfig, + QuerySchedulerConfig querySchedulerConfig + ) + { + this.lookupExtractorFactoryContainerProvider = lookupExtractorFactoryContainerProvider; + this.serverConfig = serverConfig; + this.querySchedulerConfig = querySchedulerConfig; + } + + @Override + @LazySingleton + public SubqueryGuardrailHelper get() + { + final int maxConcurrentQueries; + + if (querySchedulerConfig.getNumThreads() > 0) { + maxConcurrentQueries = Math.min( + querySchedulerConfig.getNumThreads(), + serverConfig.getNumThreads() + ); + } else { + maxConcurrentQueries = serverConfig.getNumThreads(); + } + + return new SubqueryGuardrailHelper( + lookupExtractorFactoryContainerProvider, + JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(), + maxConcurrentQueries + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index fcdc894f61fa..7180b03983fa 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -1439,8 +1439,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable getModules() binder.bind(BrokerQueryResource.class).in(LazySingleton.class); Jerseys.addResource(binder, BrokerQueryResource.class); + binder.bind(SubqueryGuardrailHelper.class).toProvider(SubqueryGuardrailHelperProvider.class); binder.bind(QueryCountStatsProvider.class).to(BrokerQueryResource.class).in(LazySingleton.class); binder.bind(SubqueryCountStatsProvider.class).toInstance(new SubqueryCountStatsProvider()); Jerseys.addResource(binder, BrokerResource.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index a47bbcf95779..b43a65159567 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -116,8 +116,7 @@ public SpecificSegmentsQuerySegmentWalker( ), conglomerate, joinableFactoryWrapper.getJoinableFactory(), - new ServerConfig(), - LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER + new ServerConfig() ); }