diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 035c20c1600bf..d21abfc11c754 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -1,26 +1,33 @@ --- -"Test stubs": +"Create data stream": - skip: - version: " - 7.6.99" - reason: only available in 7.7+ + version: "all" + reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/54022" - do: indices.create_data_stream: - name: data-stream2 + name: simple-data-stream1 body: timestamp_field: "@timestamp" - is_true: acknowledged + - do: + indices.create_data_stream: + name: simple-data-stream2 + body: + timestamp_field: "@timestamp2" + - is_true: acknowledged + - do: indices.get_data_streams: {} - - match: { 0.name: my_data_stream1 } + - match: { 0.name: simple-data-stream1 } - match: { 0.timestamp_field: '@timestamp' } - - match: { 0.indices: ['my_data_stream1-000000'] } - - match: { 1.name: my_data_stream2 } - - match: { 1.timestamp_field: '@timestamp' } + - match: { 0.indices: [] } + - match: { 1.name: simple-data-stream2 } + - match: { 1.timestamp_field: '@timestamp2' } - match: { 1.indices: [] } - do: indices.delete_data_stream: - name: data-stream2 + name: simple-data-stream2 - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index df6e829a28af4..37f190691f33a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -18,30 +18,42 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; import java.util.Objects; public class CreateDataStreamAction extends ActionType { + private static final Logger logger = LogManager.getLogger(CreateDataStreamAction.class); + public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction(); public static final String NAME = "indices:admin/data_stream/create"; @@ -64,7 +76,14 @@ public void setTimestampFieldName(String timestampFieldName) { @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (Strings.hasText(name) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + if (Strings.hasText(timestampFieldName) == false) { + validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException); + } + return validationException; } public Request(StreamInput in) throws IOException { @@ -116,7 +135,41 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - listener.onResponse(new AcknowledgedResponse(true)); + clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]", + new ClusterStateUpdateTask(Priority.HIGH) { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return createDataStream(currentState, request); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } + + static ClusterState createDataStream(ClusterState currentState, Request request) { + if (currentState.metaData().dataStreams().containsKey(request.name)) { + throw new IllegalArgumentException("data_stream [" + request.name + "] already exists"); + } + + MetaData.Builder builder = MetaData.builder(currentState.metaData()).put( + new DataStream(request.name, request.timestampFieldName, Collections.emptyList())); + + logger.info("adding data stream [{}]", request.name); + return ClusterState.builder(currentState).metaData(builder).build(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 20a2ba4aa2cd6..91444ef64320b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -18,30 +18,44 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; public class DeleteDataStreamAction extends ActionType { + private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class); + public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction(); public static final String NAME = "indices:admin/data_stream/delete"; @@ -59,7 +73,11 @@ public Request(String name) { @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (Strings.hasText(name) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + return validationException; } public Request(StreamInput in) throws IOException { @@ -108,7 +126,51 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - listener.onResponse(new AcknowledgedResponse(true)); + clusterService.submitStateUpdateTask("remove-data-stream [" + request.name + "]", new ClusterStateUpdateTask(Priority.HIGH) { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return removeDataStream(currentState, request); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } + + static ClusterState removeDataStream(ClusterState currentState, Request request) { + Set dataStreams = new HashSet<>(); + for (String dataStreamName : currentState.metaData().dataStreams().keySet()) { + if (Regex.simpleMatch(request.name, dataStreamName)) { + dataStreams.add(dataStreamName); + } + } + if (dataStreams.isEmpty()) { + // if a match-all pattern was specified and no data streams were found because none exist, do not + // fail with data stream missing exception + if (Regex.isMatchAllPattern(request.name)) { + return currentState; + } + throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found"); + } + MetaData.Builder metaData = MetaData.builder(currentState.metaData()); + for (String dataStreamName : dataStreams) { + logger.info("removing data stream [{}]", dataStreamName); + metaData.removeDataStream(dataStreamName); + } + return ClusterState.builder(currentState).metaData(metaData).build(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java index 1549f056e811f..ed2411401aeb6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; @@ -41,8 +42,10 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; public class GetDataStreamsAction extends ActionType { @@ -154,11 +157,30 @@ protected Response read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - List dataStreams = List.of( - new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")), - new DataStream("my_data_stream2", "@timestamp", List.of()) - ); - listener.onResponse(new Response(dataStreams)); + listener.onResponse(new Response(getDataStreams(state, request))); + } + + static List getDataStreams(ClusterState clusterState, Request request) { + Map dataStreams = clusterState.metaData().dataStreams(); + + // return all data streams if no name was specified + if (request.names.length == 0) { + return new ArrayList<>(dataStreams.values()); + } + + final List results = new ArrayList<>(); + for (String name : request.names) { + if (Regex.isSimpleMatchPattern(name)) { + for (Map.Entry entry : dataStreams.entrySet()) { + if (Regex.simpleMatch(name, entry.getKey())) { + results.add(entry.getValue()); + } + } + } else if (dataStreams.containsKey(name)) { + results.add(dataStreams.get(name)); + } + } + return results; } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index f52b32f3ccf67..6dfe07e5c20aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata; +import org.elasticsearch.cluster.metadata.DataStreamMetadata; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateV2Metadata; @@ -133,6 +134,7 @@ public static List getNamedWriteables() { ComponentTemplateMetadata::readDiffFrom); registerMetaDataCustom(entries, IndexTemplateV2Metadata.TYPE, IndexTemplateV2Metadata::new, IndexTemplateV2Metadata::readDiffFrom); + registerMetaDataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom); // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); return entries; @@ -155,6 +157,8 @@ public static List getNamedXWriteables() { ComponentTemplateMetadata::fromXContent)); entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexTemplateV2Metadata.TYPE), IndexTemplateV2Metadata::fromXContent)); + entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(DataStreamMetadata.TYPE), + DataStreamMetadata::fromXContent)); return entries; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java new file mode 100644 index 0000000000000..5731c1353f989 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java @@ -0,0 +1,188 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Custom {@link MetaData} implementation for storing a map of {@link DataStream}s and their names. + */ +public class DataStreamMetadata implements MetaData.Custom { + + public static final String TYPE = "data_stream"; + private static final ParseField DATA_STREAM = new ParseField("data_stream"); + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE, false, + a -> new DataStreamMetadata((Map) a[0])); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map dataStreams = new HashMap<>(); + while (p.nextToken() != XContentParser.Token.END_OBJECT) { + String name = p.currentName(); + dataStreams.put(name, DataStream.fromXContent(p)); + } + return dataStreams; + }, DATA_STREAM); + } + + private final Map dataStreams; + + public DataStreamMetadata(Map dataStreams) { + this.dataStreams = dataStreams; + } + + public DataStreamMetadata(StreamInput in) throws IOException { + this.dataStreams = in.readMap(StreamInput::readString, DataStream::new); + } + + public Map dataStreams() { + return this.dataStreams; + } + + @Override + public Diff diff(MetaData.Custom before) { + return new DataStreamMetadata.DataStreamMetadataDiff((DataStreamMetadata) before, this); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return new DataStreamMetadata.DataStreamMetadataDiff(in); + } + + @Override + public EnumSet context() { + return MetaData.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_8_0_0; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.dataStreams, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); + } + + public static DataStreamMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(DATA_STREAM.getPreferredName()); + for (Map.Entry dataStream : dataStreams.entrySet()) { + builder.field(dataStream.getKey(), dataStream.getValue()); + } + builder.endObject(); + return builder; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public int hashCode() { + return Objects.hash(this.dataStreams); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + DataStreamMetadata other = (DataStreamMetadata) obj; + return Objects.equals(this.dataStreams, other.dataStreams); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class Builder { + + private final Map dataStreams = new HashMap<>(); + + public Builder putDataStream(DataStream dataStream) { + dataStreams.put(dataStream.getName(), dataStream); + return this; + } + + public DataStreamMetadata build() { + return new DataStreamMetadata(dataStreams); + } + } + + static class DataStreamMetadataDiff implements NamedDiff { + + final Diff> dataStreamDiff; + + DataStreamMetadataDiff(DataStreamMetadata before, DataStreamMetadata after) { + this.dataStreamDiff = DiffableUtils.diff(before.dataStreams, after.dataStreams, + DiffableUtils.getStringKeySerializer()); + } + + DataStreamMetadataDiff(StreamInput in) throws IOException { + this.dataStreamDiff = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), + DataStream::new, DataStream::readDiffFrom); + } + + @Override + public MetaData.Custom apply(MetaData.Custom part) { + return new DataStreamMetadata(dataStreamDiff.apply(((DataStreamMetadata) part).dataStreams)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + dataStreamDiff.writeTo(out); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 8471364b07dea..73ab3ccab7176 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -674,6 +674,12 @@ public Map templatesV2() { .orElse(Collections.emptyMap()); } + public Map dataStreams() { + return Optional.ofNullable((DataStreamMetadata) this.custom(DataStreamMetadata.TYPE)) + .map(DataStreamMetadata::dataStreams) + .orElse(Collections.emptyMap()); + } + public ImmutableOpenMap customs() { return this.customs; } @@ -1121,6 +1127,32 @@ public Builder removeIndexTemplate(String name) { return this; } + public Builder dataStreams(Map dataStreams) { + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(dataStreams)); + return this; + } + + public Builder put(DataStream dataStream) { + Objects.requireNonNull(dataStream, "it is invalid to add a null data stream"); + Map existingDataStreams = + Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)) + .map(dsmd -> new HashMap<>(dsmd.dataStreams())) + .orElse(new HashMap<>()); + existingDataStreams.put(dataStream.getName(), dataStream); + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams)); + return this; + } + + public Builder removeDataStream(String name) { + Map existingDataStreams = + Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)) + .map(dsmd -> new HashMap<>(dsmd.dataStreams())) + .orElse(new HashMap<>()); + existingDataStreams.remove(name); + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams)); + return this; + } + public Custom getCustom(String type) { return customs.get(type); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index d6a846c205fb3..6945811c8df23 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -18,10 +18,21 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { @Override @@ -35,4 +46,40 @@ protected Request createTestInstance() { request.setTimestampFieldName(randomAlphaOfLength(8)); return request; } + + public void testValidateRequest() { + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream"); + req.setTimestampFieldName("my-timestamp-field"); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testValidateRequestWithoutTimestampField() { + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream"); + ActionRequestValidationException e = req.validate(); + assertNotNull(e); + assertThat(e.validationErrors().size(), equalTo(1)); + assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing")); + } + + public void testCreateDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); + ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req); + assertThat(newState.metaData().dataStreams().size(), equalTo(1)); + assertThat(newState.metaData().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + } + + public void testCreateDuplicateDataStream() { + final String dataStreamName = "my-data-stream"; + DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build(); + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> CreateDataStreamAction.TransportAction.createDataStream(cs, req)); + assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists")); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index f460065699795..42a961d828f4e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -18,10 +18,22 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { @Override @@ -33,4 +45,38 @@ protected Writeable.Reader instanceReader() { protected Request createTestInstance() { return new Request(randomAlphaOfLength(8)); } + + public void testValidateRequest() { + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request("my-data-stream"); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testValidateRequestWithoutName() { + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(""); + ActionRequestValidationException e = req.validate(); + assertNotNull(e); + assertThat(e.validationErrors().size(), equalTo(1)); + assertThat(e.validationErrors().get(0), containsString("name is missing")); + } + + public void testDeleteDataStream() { + final String dataStreamName = "my-data-stream"; + DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build(); + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + + ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(cs, req); + assertThat(newState.metaData().dataStreams().size(), equalTo(0)); + } + + public void testDeleteNonexistentDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, + () -> DeleteDataStreamAction.TransportAction.removeDataStream(cs, req)); + assertThat(e.getMessage(), containsString("data_streams matching [" + dataStreamName + "] not found")); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java index 062bdef629cc9..1caa8ce568883 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -18,10 +18,21 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { @Override @@ -33,4 +44,29 @@ protected Writeable.Reader instanceReader() { protected Request createTestInstance() { return new Request(generateRandomStringArray(8, 8, false)); } + + public void testValidateRequest() { + GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{}); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testGetDataStreams() { + final String dataStreamName = "my-data-stream"; + DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build(); + GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{dataStreamName}); + List dataStreams = GetDataStreamsAction.TransportAction.getDataStreams(cs, req); + assertThat(dataStreams.size(), equalTo(1)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamName)); + } + + public void testGetNonexistentDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{dataStreamName}); + List dataStreams = GetDataStreamsAction.TransportAction.getDataStreams(cs, req); + assertThat(dataStreams.size(), equalTo(0)); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java new file mode 100644 index 0000000000000..7ef4aad1a1802 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.test.AbstractNamedWriteableTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class DataStreamMetadataTests extends AbstractNamedWriteableTestCase { + + @Override + protected DataStreamMetadata createTestInstance() { + if (randomBoolean()) { + return new DataStreamMetadata(Collections.emptyMap()); + } + Map dataStreams = new HashMap<>(); + for (int i = 0; i < randomIntBetween(1, 5); i++) { + dataStreams.put(randomAlphaOfLength(5), DataStreamTests.randomInstance()); + } + return new DataStreamMetadata(dataStreams); + } + + @Override + protected DataStreamMetadata mutateInstance(DataStreamMetadata instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Collections.singletonList(new NamedWriteableRegistry.Entry(DataStreamMetadata.class, + DataStreamMetadata.TYPE, DataStreamMetadata::new))); + } + + @Override + protected Class categoryClass() { + return DataStreamMetadata.class; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 072165ab098ef..8e2cae06290ce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -28,6 +28,15 @@ public class DataStreamTests extends AbstractSerializingTestCase { + public static DataStream randomInstance() { + int numIndices = randomIntBetween(0, 128); + List indices = new ArrayList<>(numIndices); + for (int i = 0; i < numIndices; i++) { + indices.add(randomAlphaOfLength(10)); + } + return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + } + @Override protected DataStream doParseInstance(XContentParser parser) throws IOException { return DataStream.fromXContent(parser); @@ -40,11 +49,7 @@ protected Writeable.Reader instanceReader() { @Override protected DataStream createTestInstance() { - int numIndices = randomIntBetween(0, 128); - List indices = new ArrayList<>(numIndices); - for (int i = 0; i < numIndices; i++) { - indices.add(randomAlphaOfLength(10)); - } - return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + return randomInstance(); } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java index bd2e48a20c2e4..e3099ab8f9de3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java @@ -939,6 +939,7 @@ public static MetaData randomMetaData() { .version(randomNonNegativeLong()) .put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance()) .put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance()) + .put(DataStreamTests.randomInstance()) .build(); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java index d5e76c97e1ad4..d2a95cb4fc905 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java @@ -151,6 +151,8 @@ public void testSimpleJsonFromAndTo() throws IOException { .putAlias(newAliasMetaDataBuilder("alias-bar1")) .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) + .put(new DataStream("data-stream1", "@timestamp", Collections.emptyList())) + .put(new DataStream("data-stream2", "@timestamp2", Collections.emptyList())) .build(); String metaDataSource = MetaData.Builder.toXContent(metaData); @@ -323,6 +325,16 @@ public void testSimpleJsonFromAndTo() throws IOException { equalTo(new Template(Settings.builder().put("setting", "value").build(), new CompressedXContent("{\"baz\":\"eggplant\"}"), Collections.singletonMap("alias", AliasMetaData.builder("alias").build())))); + + // data streams + assertNotNull(parsedMetaData.dataStreams().get("data-stream1")); + assertThat(parsedMetaData.dataStreams().get("data-stream1").getName(), is("data-stream1")); + assertThat(parsedMetaData.dataStreams().get("data-stream1").getTimeStampField(), is("@timestamp")); + assertThat(parsedMetaData.dataStreams().get("data-stream1").getIndices(), is(Collections.emptyList())); + assertNotNull(parsedMetaData.dataStreams().get("data-stream2")); + assertThat(parsedMetaData.dataStreams().get("data-stream2").getName(), is("data-stream2")); + assertThat(parsedMetaData.dataStreams().get("data-stream2").getTimeStampField(), is("@timestamp2")); + assertThat(parsedMetaData.dataStreams().get("data-stream2").getIndices(), is(Collections.emptyList())); } private static final String MAPPING_SOURCE1 = "{\"mapping1\":{\"text1\":{\"type\":\"string\"}}}";