Skip to content

Commit

Permalink
API for adding and removing indices from a data stream (#79279)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann authored Oct 19, 2021
1 parent 88250c2 commit 2e05e3e
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"indices.modify_data_stream":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Modifies a data stream"
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept": [ "application/json"],
"content_type": ["application/json"]
},
"url":{
"paths":[
{
"path":"/_data_stream/_modify",
"methods":["POST"]
}
]
},
"params":{
},
"body":{
"description":"The data stream modifications",
"required":true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.explain.ExplainAction;
Expand Down Expand Up @@ -370,6 +372,7 @@
import org.elasticsearch.rest.action.cat.RestTasksAction;
import org.elasticsearch.rest.action.cat.RestTemplatesAction;
import org.elasticsearch.rest.action.cat.RestThreadPoolAction;
import org.elasticsearch.rest.action.datastreams.RestModifyDataStreamsAction;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestDeleteAction;
import org.elasticsearch.rest.action.document.RestGetAction;
Expand Down Expand Up @@ -599,6 +602,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class);
actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class);

//Data streams
actions.register(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamsTransportAction.class);

//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
Expand Down Expand Up @@ -763,6 +769,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

registerHandler.accept(new RestReloadSecureSettingsAction());

// Data streams
registerHandler.accept(new RestModifyDataStreamsAction());

// Scripts API
registerHandler.accept(new RestGetStoredScriptAction());
registerHandler.accept(new RestPutStoredScriptAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.datastreams;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class ModifyDataStreamsAction extends ActionType<AcknowledgedResponse> {

public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction();
public static final String NAME = "indices:admin/data_stream/modify";

private ModifyDataStreamsAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static final class Request
extends AcknowledgedRequest<Request>
implements IndicesRequest, ToXContentObject {

// relevant only for authorizing the request, so require every specified
// index to exist, expand wildcards only to open indices, prohibit
// wildcard expressions that resolve to zero indices, and do not attempt
// to resolve expressions as aliases
private static final IndicesOptions INDICES_OPTIONS =
IndicesOptions.fromOptions(false, false, true, false, true, false, true, false);

private final List<DataStreamAction> actions;

public Request(StreamInput in) throws IOException {
super(in);
actions = in.readList(DataStreamAction::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(actions);
}

public Request(List<DataStreamAction> actions) {
this.actions = Collections.unmodifiableList(actions);
}

public List<DataStreamAction> getActions() {
return actions;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("actions");
for (DataStreamAction action : actions) {
action.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public ActionRequestValidationException validate() {
if (actions.isEmpty()) {
return addValidationError("must specify at least one data stream modification action", null);
}
return null;
}

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
"data_stream_actions",
args -> new Request(((List<DataStreamAction>) args[0]))
);
static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), DataStreamAction.PARSER, new ParseField("actions"));
}

@Override
public String[] indices() {
return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new);
}

@Override
public IndicesOptions indicesOptions() {
return INDICES_OPTIONS;
}

@Override
public boolean includeDataStreams() {
return true;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Arrays.equals(actions.toArray(), other.actions.toArray());
}

@Override
public int hashCode() {
return Objects.hash(actions);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.datastreams;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class ModifyDataStreamsTransportAction extends AcknowledgedTransportMasterNodeAction<
ModifyDataStreamsAction.Request> {

private final MetadataDataStreamsService metadataDataStreamsService;

@Inject
public ModifyDataStreamsTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
MetadataDataStreamsService metadataDataStreamsService
) {
super(
ModifyDataStreamsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
ModifyDataStreamsAction.Request::new,
indexNameExpressionResolver,
ThreadPool.Names.SAME
);
this.metadataDataStreamsService = metadataDataStreamsService;
}

@Override
protected void masterOperation(
Task task,
ModifyDataStreamsAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
metadataDataStreamsService.modifyDataStream(request, listener);
}

@Override
protected ClusterBlockException checkBlock(ModifyDataStreamsAction.Request request, ClusterState state) {
ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
if (globalBlock != null) {
return globalBlock;
}
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, request));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public DataStream removeBackingIndex(Index index) {
index.getName(), name
));
}
if (generation == (backingIndexPosition + 1)) {
if (indices.size() == (backingIndexPosition + 1)) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
Expand Down
Loading

0 comments on commit 2e05e3e

Please sign in to comment.