Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster state and CRUD operations for data streams #53877

Merged
merged 9 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

- do:
indices.get_data_streams: {}
- match: { 0.name: my_data_stream1 }
- match: { 0.name: data-stream2 }
danhermann marked this conversation as resolved.
Show resolved Hide resolved
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: ['my_data_stream1-000000'] }
- match: { 1.name: my_data_stream2 }
- match: { 1.timestamp_field: '@timestamp' }
- match: { 1.indices: [] }
- match: { 0.indices: [] }

- do:
indices.delete_data_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.ingest.IngestProcessorException.class,
org.elasticsearch.ingest.IngestProcessorException::new,
157,
Version.V_7_5_0);
Version.V_7_5_0),
DATA_STREAM_MISSING_EXCEPTION(
org.elasticsearch.indices.DataStreamMissingException.class,
org.elasticsearch.indices.DataStreamMissingException::new,
158,
Version.V_7_7_0);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,42 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(CreateDataStreamAction.class);

public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
public static final String NAME = "indices:admin/data_stream/create";

Expand All @@ -64,7 +76,14 @@ public void setTimestampFieldName(String timestampFieldName) {

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
danhermann marked this conversation as resolved.
Show resolved Hide resolved
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
if (Strings.hasText(timestampFieldName) == false) {
validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException);
}
return validationException;
}

public Request(StreamInput in) throws IOException {
Expand Down Expand Up @@ -116,7 +135,37 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
listener.onResponse(new AcknowledgedResponse(true));
clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]",
new ClusterStateUpdateTask(Priority.HIGH) {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (currentState.metaData().dataStreams().containsKey(request.name)) {
danhermann marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}

MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(request.name,
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));

logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metaData(builder).build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
Expand All @@ -26,22 +28,32 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.DataStreamMissingException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class);

public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction();
public static final String NAME = "indices:admin/data_stream/delete";

Expand Down Expand Up @@ -108,7 +120,47 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
listener.onResponse(new AcknowledgedResponse(true));
clusterService.submitStateUpdateTask("remove-data-stream [" + request.name + "]", new ClusterStateUpdateTask(Priority.HIGH) {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) {
Set<String> dataStreams = new HashSet<>();
for (String dataStreamName : currentState.metaData().dataStreams().keySet()) {
if (Regex.simpleMatch(request.name, dataStreamName)) {
dataStreams.add(dataStreamName);
}
}
if (dataStreams.isEmpty()) {
// if a match-all pattern was specified and no data streams were found because none exist, do not
// fail with data stream missing exception
if (Regex.isMatchAllPattern(request.name)) {
return currentState;
}
throw new DataStreamMissingException(request.name);
}
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
for (String dataStreamName : dataStreams) {
logger.info("removing data stream [{}]", dataStreamName);
metaData.removeDataStream(dataStreamName);
}
return ClusterState.builder(currentState).metaData(metaData).build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetDataStreamsAction extends ActionType<GetDataStreamsAction.Response> {
Expand Down Expand Up @@ -154,11 +157,29 @@ protected Response read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
List<DataStream> dataStreams = List.of(
new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")),
new DataStream("my_data_stream2", "@timestamp", List.of())
);
listener.onResponse(new Response(dataStreams));

Map<String, DataStream> dataStreams = state.metaData().dataStreams();

// return all data streams if no name was specified
if (request.names.length == 0) {
listener.onResponse(new Response(new ArrayList<>(dataStreams.values())));
return;
}

final List<DataStream> results = new ArrayList<>();
for (String name : request.names) {
if (Regex.isSimpleMatchPattern(name)) {
for (Map.Entry<String, DataStream> entry : dataStreams.entrySet()) {
if (Regex.simpleMatch(name, entry.getKey())) {
results.add(entry.getValue());
}
}
} else if (dataStreams.containsKey(name)) {
results.add(dataStreams.get(name));
}
}

listener.onResponse(new Response(results));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata;
import org.elasticsearch.cluster.metadata.DataStreamMetadata;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateV2Metadata;
Expand Down Expand Up @@ -133,6 +134,7 @@ public static List<Entry> getNamedWriteables() {
ComponentTemplateMetadata::readDiffFrom);
registerMetaDataCustom(entries, IndexTemplateV2Metadata.TYPE, IndexTemplateV2Metadata::new,
IndexTemplateV2Metadata::readDiffFrom);
registerMetaDataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand All @@ -155,6 +157,8 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
ComponentTemplateMetadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexTemplateV2Metadata.TYPE),
IndexTemplateV2Metadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(DataStreamMetadata.TYPE),
DataStreamMetadata::fromXContent));
return entries;
}

Expand Down
Loading