Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queryGroupId to search workload tasks at co-ordinator and data node level #14708

Merged
Merged
Show file tree
Hide file tree
Changes from 86 commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
a6ea59c
add logic to add headers to Task
kaushalmahi12 Jul 9, 2024
bc30528
add logic to add queryGroupId to task headers
kaushalmahi12 Jul 10, 2024
f0768b0
remove redundant code
kaushalmahi12 Jul 10, 2024
668a167
add changelog entry
kaushalmahi12 Jul 10, 2024
b31388e
address comments
kaushalmahi12 Jul 22, 2024
ea02672
fix precommit
kaushalmahi12 Jul 22, 2024
4920301
Add UTs for RemoteIndexMetadataManager (#14660)
shiv0408 Jul 9, 2024
f4d7983
Fix match_phrase_prefix_query not working on text field with multiple…
gaobinlong Jul 9, 2024
fa284e3
Offline calculation of total shard per node and caching it for weight…
RS146BIJAY Jul 9, 2024
8f5ef47
[bug fix] validate lower bound for top n size (#14587)
ansjcy Jul 9, 2024
b6d4c40
Create SystemIndexRegistry with helper method matchesSystemIndex (#14…
cwperks Jul 9, 2024
bb8e8c9
Refactor Grok validate pattern to iterative approach (#14206)
sandeshkr419 Jul 9, 2024
bddfa89
Bump opentelemetry from 1.39.0 to 1.40.0 (#14674)
reta Jul 9, 2024
4f02eb4
Bump jackson from 2.17.1 to 2.17.2 (#14687)
reta Jul 9, 2024
7574c48
Add release notes for release 1.3.18 (#14699)
zelinh Jul 9, 2024
241c00a
Bump reactor from 3.5.19 to 3.5.20 (#14697)
reta Jul 10, 2024
56c989f
Add unit tests for read flow of RemoteClusterStateService and bug fix…
shiv0408 Jul 10, 2024
4bc621f
Update version check for the bug fix of match_phrase_prefix_query not…
gaobinlong Jul 10, 2024
c9b075b
Remove unnecessary cast to int from test (#14696)
lukas-vlcek Jul 10, 2024
69dc9e2
print reason why parent task was cancelled (#14604)
kkewwei Jul 10, 2024
f21eb0a
Use set of shard routing for shard in unassigned shard batch check. (…
SwethaGuptha Jul 11, 2024
e1fbbe5
Add versioning for UploadedIndexMetadata (#14677)
soosinha Jul 11, 2024
0a40af0
Fix: update help output for _cat (#14722)
ahmedsobeh Jul 11, 2024
0afb641
Fix hdfs-fixture kerb-admin & hadoop-minicluster dependencies are not…
reta Jul 11, 2024
ead19ab
Update to Gradle 8.9 (#14574)
reta Jul 11, 2024
7c86cec
Fix hdfs-fixture hadoop-minicluster dependencies are not being update…
reta Jul 11, 2024
04f9ea1
Add `strict_allow_templates` dynamic mapping option (#14555)
gaobinlong Jul 11, 2024
e8da561
Bump net.minidev:json-smart from 2.5.0 to 2.5.1 in /plugins/repositor…
dependabot[bot] Jul 15, 2024
0205d9b
remove query insights plugin from core (#14743)
ansjcy Jul 15, 2024
c49659b
Add `strict_allow_templates` dynamic mapping option (#14555) (#14737)…
reta Jul 15, 2024
4127725
Fix create or update alias API doesn't throw exception for unsupporte…
gaobinlong Jul 15, 2024
8682859
Remove query categorization from core (#14759)
deshsidd Jul 16, 2024
0d70e36
Add changes to propagate queryGroupId across child requests and nodes…
kaushalmahi12 Jul 16, 2024
4b56657
Add consumers to remote store based index settings (#14764)
shourya035 Jul 16, 2024
10b187e
Add matchesPluginSystemIndexPattern to SystemIndexRegistry (#14750)
cwperks Jul 16, 2024
ef49713
SPI for loading ABC templates (#14659)
mgodwan Jul 16, 2024
2eb7a92
Fix bulk upsert ignores the default_pipeline and final_pipeline when …
gaobinlong Jul 16, 2024
09aa997
Fix flaky test due to node being used across all tests (#14787)
mgodwan Jul 17, 2024
763354c
Star Tree Implementation [OnHeap] (#14512)
sarthakaggarwal97 Jul 17, 2024
94aa7c0
Add Gao Binlong as maintainer (#14796)
reta Jul 17, 2024
28e516a
Clear ehcache disk cache files during initialization (#14738)
sgup432 Jul 18, 2024
1bf20c5
Refactor remote-routing-table service inline with remote state interf…
Arpit-Bandejiya Jul 18, 2024
2332fe3
Set version to 2.15 for determining metadata during migration to remo…
skumawat2025 Jul 18, 2024
e1bd0ad
Fix bulk upsert ignores the default_pipeline and final_pipeline when …
reta Jul 18, 2024
c4317f8
Fix create or update alias API doesn't throw exception for unsupporte…
reta Jul 18, 2024
acd5b51
Change RCSS info logs to debug (#14814)
shiv0408 Jul 18, 2024
eff7595
[Bugfix] Fix NPE in ReplicaShardAllocator (#13993) (#14385)
DaniilRoman Jul 18, 2024
8b97cd5
Run performance benchmark on pull requests (#14760)
rishabh6788 Jul 18, 2024
2caebaf
fix constant_keyword field type (#14807)
kkewwei Jul 18, 2024
ecb65cf
[Remote Store Migration] Reconcile remote store based index settings …
shourya035 Jul 19, 2024
2e31c78
Add prefix mode verification setting for repository verification (#14…
ashking94 Jul 19, 2024
9d69875
add length check on comment body for benchmark workflow (#14834)
rishabh6788 Jul 19, 2024
0267660
Add restore-from-snapshot test procedure for snapshot run benchmark c…
rishabh6788 Jul 19, 2024
c4164c5
Fix env variable name typo (#14843)
rishabh6788 Jul 19, 2024
9acd749
Use circuit breaker in InternalHistogram when adding empty buckets (#…
bowenlan-amzn Jul 19, 2024
23eac8d
[Remote State] Create interface RemoteEntitiesManager (#14671)
shiv0408 Jul 22, 2024
cb51734
Optimise TransportNodesAction to not send DiscoveryNodes for NodeStat…
Pranshu-S Jul 22, 2024
6165336
Enabling term version check on local state for all ClusterManager Rea…
rajiv-kv Jul 22, 2024
a6c97b6
Reduce logging in DEBUG for MasterService:run (#14795)
sumitasr Jul 22, 2024
f21de7d
Add SplitResponseProcessor to Search Pipelines (#14800)
dbwiddis Jul 22, 2024
d8a47ec
Add integration tests for RemoteRoutingTable Service. (#14631)
shailendra0811 Jul 22, 2024
566de75
Add SortResponseProcessor to Search Pipelines (#14785)
dbwiddis Jul 22, 2024
a49fc8c
Fix allowUnmappedFields, mapUnmappedFieldAsString settings to be appl…
imyp92 Jul 22, 2024
7829554
Bump com.microsoft.azure:msal4j from 1.16.0 to 1.16.1 in /plugins/rep…
dependabot[bot] Jul 22, 2024
071c265
Bump com.gradle.develocity from 3.17.5 to 3.17.6 (#14856)
dependabot[bot] Jul 22, 2024
0080832
Bump org.jline:jline in /test/fixtures/hdfs-fixture (#14859)
dependabot[bot] Jul 22, 2024
920f86a
Use Lucene provided Persian stem (#14847)
ebraminio Jul 22, 2024
824fab2
Bump actions/checkout from 2 to 4 (#14858)
dependabot[bot] Jul 22, 2024
a293d52
Deprecate batch_size parameter on bulk API (#14725)
chishui Jul 22, 2024
2e13b79
Add perms for remote snapshot cache eviction on scripted query (#14411)
finnegancarroll Jul 22, 2024
c926996
add transport interceptor to populate queryGroupId in task headers
kaushalmahi12 Jul 23, 2024
61b0032
Add rest, transport layer changes for Hot to warm tiering - dedicated…
neetikasinghal Jul 23, 2024
f188e91
Create listener to refresh search thread resource usage (#14832)
deshsidd Jul 23, 2024
f1235e0
Caching avg total bytes and avg free bytes inside ClusterInfo (#14851)
RS146BIJAY Jul 23, 2024
57f1cbc
Use default value when index.number_of_replicas is null (#14812)
chishui Jul 23, 2024
69bf700
[Remote Routing Table] Implement write and read flow for shard diff f…
shailendra0811 Jul 23, 2024
8369771
Optimized ClusterStatsIndices to precomute shard stats (#14426)
Pranshu-S Jul 23, 2024
ffc885e
Fix constraint bug which allows more primary shards than average prim…
gbbafna Jul 23, 2024
90ad0ab
Optmising AwarenessAllocationDecider for hashmap.get call (#14761)
RS146BIJAY Jul 23, 2024
1b00b5d
update comment
kaushalmahi12 Jul 23, 2024
2d52a2d
Fix IngestServiceTests.testBulkRequestExecutionWithFailures (#14918)
andrross Jul 23, 2024
b3fd0f8
Merge branch 'main' into feature/sandbox-qgTaskHeaders
kaushalmahi12 Jul 23, 2024
fe66e9d
add queryGroupTask
kaushalmahi12 Jul 24, 2024
d5d26e9
remove unnecessary imports
kaushalmahi12 Jul 24, 2024
3eb07e7
add QueryGroupTask tests
kaushalmahi12 Jul 24, 2024
c77c06e
rename WLM transport request handler
kaushalmahi12 Jul 24, 2024
9d12a95
Merge branch 'main' into feature/sandbox-qgTaskHeaders
kaushalmahi12 Jul 25, 2024
f5415b5
add CHANGELOG entry
kaushalmahi12 Jul 25, 2024
c108a50
fix ut
kaushalmahi12 Jul 25, 2024
2605a35
Merge branch 'main' into feature/sandbox-qgTaskHeaders
kaushalmahi12 Jul 29, 2024
31ca5cc
Merge branch 'main' into feature/sandbox-qgTaskHeaders
kaushalmahi12 Jul 29, 2024
4940dcf
address comments
kaushalmahi12 Jul 30, 2024
191a860
fix UT to remove the verify for final method
kaushalmahi12 Jul 30, 2024
cdd61bf
apply spotless
kaushalmahi12 Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- [Workload Management] Add QueryGroup schema ([13669](https://github.com/opensearch-project/OpenSearch/pull/13669))
- [Workload Management] Add queryGroupId to Task headers ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add `strict_allow_templates` dynamic mapping option ([#14555](https://github.com/opensearch-project/OpenSearch/pull/14555))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;
import org.opensearch.wlm.QueryGroupTask;

import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -50,7 +50,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
public class SearchShardTask extends QueryGroupTask implements SearchBackpressureTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;
import org.opensearch.wlm.QueryGroupTask;

import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -49,7 +49,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
public class SearchTask extends QueryGroupTask implements SearchBackpressureTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -442,6 +443,12 @@ private void executeRequest(
);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

// At this point either the QUERY_GROUP_ID header will be present in ThreadContext either via ActionFilter
// or HTTP header (HTTP header will be deprecated once ActionFilter is implemented)
if (task instanceof QueryGroupTask) {
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
}

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
try {
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@
import org.opensearch.transport.TransportService;
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1047,14 +1048,21 @@ protected Node(
admissionControlService
);

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(p -> p.getSecureSettingFactory(settings))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

List<TransportInterceptor> transportInterceptors = List.of(admissionControlTransportInterceptor);
List<TransportInterceptor> transportInterceptors = List.of(
admissionControlTransportInterceptor,
workloadManagementTransportInterceptor
);
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class),
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/wlm/QueryGroupConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import java.util.function.Supplier;

/**
* This class will hold all the QueryGroup related constants
*/
public class QueryGroupConstants {
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
public static final String QUERY_GROUP_ID_HEADER = "queryGroupId";
public static final Supplier<String> DEFAULT_QUERY_GROUP_ID_SUPPLIER = () -> "DEFAULT_QUERY_GROUP";
}
74 changes: 74 additions & 0 deletions server/src/main/java/org/opensearch/wlm/QueryGroupTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.tasks.CancellableTask;

import java.util.Map;

import static org.opensearch.search.SearchService.NO_TIMEOUT;

/**
* Base class to define QueryGroup tasks
*/
public class QueryGroupTask extends CancellableTask {
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LogManager.getLogger(QueryGroupTask.class);
private String queryGroupId;

public QueryGroupTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
}

public QueryGroupTask(
long id,
String type,
String action,
String description,
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval
) {
super(id, type, action, description, parentTaskId, headers, cancelAfterTimeInterval);
}

/**
* This method should always be called after calling setQueryGroupId at least once on this object
* @return task queryGroupId
*/
public String getQueryGroupId() {
if (queryGroupId == null) {
logger.warn("QueryGroup _id can't be null, It should be set before accessing it. This is abnormal behaviour ");
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
}
return queryGroupId;
}

/**
* sets the queryGroupId from threadContext into the task itself,
* This method was defined since the queryGroupId can only be evaluated after task creation
* @param threadContext current threadContext
*/
public void setQueryGroupId(final ThreadContext threadContext) {
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
this.queryGroupId = QueryGroupConstants.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get();
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

if (threadContext != null && threadContext.getHeader(QueryGroupConstants.QUERY_GROUP_ID_HEADER) != null) {
this.queryGroupId = threadContext.getHeader(QueryGroupConstants.QUERY_GROUP_ID_HEADER);
}
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;

/**
* This class is used to intercept search traffic requests and populate the queryGroupId header in task headers
*/
public class WorkloadManagementTransportInterceptor implements TransportInterceptor {
private final ThreadPool threadPool;

public WorkloadManagementTransportInterceptor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler
) {
return new RequestHandler<T>(threadPool, actualHandler);
}

/**
* This class is mainly used to populate the queryGroupId header
* @param <T> T is Search related request
*/
public static class RequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {

private final ThreadPool threadPool;
TransportRequestHandler<T> actualHandler;

public RequestHandler(ThreadPool threadPool, TransportRequestHandler<T> actualHandler) {
this.threadPool = threadPool;
this.actualHandler = actualHandler;
}

@Override
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
if (isSearchWorkloadRequest(task)) {
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
}
actualHandler.messageReceived(request, channel, task);
}

boolean isSearchWorkloadRequest(Task task) {
return task instanceof QueryGroupTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;

public class QueryGroupTaskTests extends OpenSearchTestCase {
private ThreadPool threadPool;
private QueryGroupTask sut;

public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getTestName());
sut = new QueryGroupTask(123, "transport", "Search", "test task", null, Collections.emptyMap());
}

public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdown();
}

public void testSuccessfulSetQueryGroupId() {
sut.setQueryGroupId(threadPool.getThreadContext());
assertEquals(QueryGroupConstants.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get(), sut.getQueryGroupId());

threadPool.getThreadContext().putHeader(QueryGroupConstants.QUERY_GROUP_ID_HEADER, "akfanglkaglknag2332");

sut.setQueryGroupId(threadPool.getThreadContext());
assertEquals("akfanglkaglknag2332", sut.getQueryGroupId());
}

public void testUnsuccessfulSetGroupId() {
assertThrows(IllegalStateException.class, () -> sut.getQueryGroupId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor.RequestHandler;

import static org.opensearch.threadpool.ThreadPool.Names.SAME;

public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestCase {

private ThreadPool threadPool;
private WorkloadManagementTransportInterceptor sut;

public void setUp() throws Exception {
threadPool = new TestThreadPool(getTestName());
sut = new WorkloadManagementTransportInterceptor(threadPool);
}

public void tearDown() throws Exception {
threadPool.shutdown();
}

public void testInterceptHandler() {
TransportRequestHandler<TransportRequest> requestHandler = sut.interceptHandler("Search", SAME, false, null);
assertTrue(requestHandler instanceof RequestHandler);
}
}
Loading
Loading