diff --git a/CHANGELOG.md b/CHANGELOG.md index d07223fffd14b..03a65c552886f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362)) - Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378)) - [Workload Management] Enhance rejection mechanism in workload management ([#16417](https://github.com/opensearch-project/OpenSearch/pull/16417)) +- [Workload Management] Fixing Create/Update QueryGroup TransportActions to execute from non-cluster manager nodes ([16422](https://github.com/opensearch-project/OpenSearch/pull/16422)) ### Security diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java index d92283391dd3b..1ce04faa7ccc1 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java @@ -8,8 +8,8 @@ package org.opensearch.plugin.wlm.action; -import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.UUIDs; import org.opensearch.core.common.io.stream.StreamInput; @@ -33,7 +33,7 @@ * * @opensearch.experimental */ -public class CreateQueryGroupRequest extends ActionRequest { +public class CreateQueryGroupRequest extends ClusterManagerNodeRequest { private final QueryGroup queryGroup; /** diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java index 190ff17261bb4..dff9c429d63b0 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java @@ -9,43 +9,82 @@ package org.opensearch.plugin.wlm.action; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; -import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; + +import static org.opensearch.threadpool.ThreadPool.Names.SAME; + /** * Transport action to create QueryGroup * * @opensearch.experimental */ -public class TransportCreateQueryGroupAction extends HandledTransportAction { +public class TransportCreateQueryGroupAction extends TransportClusterManagerNodeAction { private final QueryGroupPersistenceService queryGroupPersistenceService; /** * Constructor for TransportCreateQueryGroupAction * - * @param actionName - action name + * @param threadPool - {@link ThreadPool} object * @param transportService - a {@link TransportService} object * @param actionFilters - a {@link ActionFilters} object + * @param indexNameExpressionResolver - {@link IndexNameExpressionResolver} object * @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object */ @Inject public TransportCreateQueryGroupAction( - String actionName, + ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, QueryGroupPersistenceService queryGroupPersistenceService ) { - super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new); + super( + CreateQueryGroupAction.NAME, + transportService, + queryGroupPersistenceService.getClusterService(), + threadPool, + actionFilters, + CreateQueryGroupRequest::new, + indexNameExpressionResolver + ); this.queryGroupPersistenceService = queryGroupPersistenceService; } @Override - protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener listener) { + protected void clusterManagerOperation( + CreateQueryGroupRequest request, + ClusterState clusterState, + ActionListener listener + ) { queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener); } + + @Override + protected String executor() { + return SAME; + } + + @Override + protected CreateQueryGroupResponse read(StreamInput in) throws IOException { + return new CreateQueryGroupResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(CreateQueryGroupRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java index a6aa2da8fdc08..09a0da7086b36 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java @@ -9,43 +9,81 @@ package org.opensearch.plugin.wlm.action; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; -import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; + +import static org.opensearch.threadpool.ThreadPool.Names.SAME; + /** * Transport action to update QueryGroup * * @opensearch.experimental */ -public class TransportUpdateQueryGroupAction extends HandledTransportAction { +public class TransportUpdateQueryGroupAction extends TransportClusterManagerNodeAction { private final QueryGroupPersistenceService queryGroupPersistenceService; /** * Constructor for TransportUpdateQueryGroupAction * - * @param actionName - action name + * @param threadPool - {@link ThreadPool} object * @param transportService - a {@link TransportService} object * @param actionFilters - a {@link ActionFilters} object + * @param indexNameExpressionResolver - {@link IndexNameExpressionResolver} object * @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object */ @Inject public TransportUpdateQueryGroupAction( - String actionName, + ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, QueryGroupPersistenceService queryGroupPersistenceService ) { - super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new); + super( + UpdateQueryGroupAction.NAME, + transportService, + queryGroupPersistenceService.getClusterService(), + threadPool, + actionFilters, + UpdateQueryGroupRequest::new, + indexNameExpressionResolver + ); this.queryGroupPersistenceService = queryGroupPersistenceService; } @Override - protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener listener) { + protected void clusterManagerOperation( + UpdateQueryGroupRequest request, + ClusterState clusterState, + ActionListener listener + ) { queryGroupPersistenceService.updateInClusterStateMetadata(request, listener); } + + @Override + protected String executor() { + return SAME; + } + + @Override + protected UpdateQueryGroupResponse read(StreamInput in) throws IOException { + return new UpdateQueryGroupResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(UpdateQueryGroupRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java index 048b599f095fd..18af58289be13 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java @@ -8,8 +8,8 @@ package org.opensearch.plugin.wlm.action; -import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -23,7 +23,7 @@ * * @opensearch.experimental */ -public class UpdateQueryGroupRequest extends ActionRequest { +public class UpdateQueryGroupRequest extends ClusterManagerNodeRequest { private final String name; private final MutableQueryGroupFragment mutableQueryGroupFragment;