Skip to content

Commit

Permalink
Add index.look_back_time setting for tsdb data streams
Browse files Browse the repository at this point in the history
This change adds a `index.look_back_time` index setting that sets the `index.time_series.start_time` setting for the first backing index when a data stream is created.

This allows accepting data that is older for initial indexing without changing the `index.look_ahead_time` setting. This setting also controls the `index.time_series.end_time` setting and would affect rollovers as well.

The default for the `index.look_back_time` is `2h`, which means documents with `@timestamp` up to 2 hours after creation of the data stream are allowed to be indexed. This is the same as is without this change, because `index.look_ahead_time` is used to set `index.time_series.start_time` of the first backing index.

Closes elastic#98463
  • Loading branch information
martijnvg committed Aug 16, 2023
1 parent 171bcbb commit 554d54c
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/reference/data-streams/set-up-tsds.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Optionally, the index settings component template for a TSDS can include:

* Your lifecycle policy in the `index.lifecycle.name` index setting.
* The <<tsds-look-ahead-time,`index.look_ahead_time`>> index setting.
* The <<tsds-look-back-time,`index.look_back_time`>> index setting.
* Other index settings, such as <<index-codec,`index.codec`>>, for your TSDS's
backing indices.

Expand Down
9 changes: 9 additions & 0 deletions docs/reference/data-streams/tsds-index-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ days). Only indices with an `index.mode` of `time_series` support this setting.
For more information, refer to <<tsds-look-ahead-time>>. Additionally this setting
can not be less than `time_series.poll_interval` cluster setting.

[[index-look-back-time]]
`index.look_back_time`::
(<<_static_index_settings,Static>>, <<time-units,time units>>)
Interval used to calculate the `index.time_series.start_time` for a TSDS's first
backing index when a tsdb data stream is created. Defaults to `2h` (2 hours).
Accepts `1m` (one minute) to `7d` (seven days). Only indices with an `index.mode`
of `time_series` support this setting. For more information,
refer to <<tsds-look-back-time>>.

[[index-routing-path]] `index.routing_path`::
(<<_static_index_settings,Static>>, string or array of strings) Plain `keyword`
fields used to route documents in a TSDS to index shards. Supports wildcards
Expand Down
16 changes: 16 additions & 0 deletions docs/reference/data-streams/tsds.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ value borders the `index.time_series.start_time` for the new write index. This
ensures the `@timestamp` ranges for neighboring backing indices always border
but never overlap.

[discrete]
[[tsds-look-back-time]]
==== Look-back time

Use the <<index-look-back-time,`index.look_back_time`>> index setting to
configure how far in the past you can add documents to an index. When you
create a data stream for a TSDS, {es} calculates the index's
`index.time_series.start_time` value as:

`now - index.look_back_time`

This setting is only used when a data stream gets created and controls
the `index.time_series.start_time` index setting of the first backing index.
Configuring this index setting can be useful to accept documents with `@timestamp`
field values that are older than 2 hours (the `index.look_back_time` default).

[discrete]
[[tsds-accepted-time-range]]
==== Accepted time range for adding data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.Before;

Expand Down Expand Up @@ -541,6 +543,69 @@ public void testUpdateComponentTemplateDoesNotFailIndexTemplateValidation() thro
client().performRequest(request);
}

public void testLookBackTime() throws IOException {
// Create template that uses index.look_back_time index setting:
String template = """
{
"index_patterns": ["test*"],
"template": {
"settings":{
"index": {
"look_back_time": "24h",
"number_of_replicas": 0,
"mode": "time_series"
}
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"field": {
"type": "keyword",
"time_series_dimension": true
}
}
}
},
"data_stream": {}
}""";
var putIndexTemplateRequest = new Request("PUT", "/_index_template/2");
putIndexTemplateRequest.setJsonEntity(template);
assertOK(client().performRequest(putIndexTemplateRequest));

// Create data stream:
var createDataStreamRequest = new Request("PUT", "/_data_stream/test123");
assertOK(client().performRequest(createDataStreamRequest));

// Check data stream has been created:
var getDataStreamsRequest = new Request("GET", "/_data_stream");
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
var dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("test123"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("2"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo("test123", 1));

// Check the backing index:
// 2023-08-15T04:35:50.000Z
var indices = getIndex(firstBackingIndex);
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("test123"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeFirstBackingIndex, notNullValue());
Instant now = Instant.now();
Instant startTime = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(startTimeFirstBackingIndex)).toInstant();
assertTrue(now.minus(24, ChronoUnit.HOURS).isAfter(startTime));
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeFirstBackingIndex, notNullValue());
}

private static Map<?, ?> getIndex(String indexName) throws IOException {
var getIndexRequest = new Request("GET", "/" + indexName + "?human");
var response = client().performRequest(getIndexRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ public Settings getAdditionalIndexSettings(
if (indexMode == IndexMode.TIME_SERIES) {
Settings.Builder builder = Settings.builder();
TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(allSettings);
TimeValue lookBackTime = DataStreamsPlugin.LOOK_BACK_TIME.get(allSettings);
final Instant start;
final Instant end;
if (dataStream == null || migrating) {
start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookAheadTime.getMillis()));
start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookBackTime.getMillis()));
end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis()));
} else {
IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
Setting.Property.IndexScope,
Setting.Property.Dynamic
);
public static final Setting<TimeValue> LOOK_BACK_TIME = Setting.timeSetting(
"index.look_back_time",
TimeValue.timeValueHours(2),
TimeValue.timeValueMinutes(1),
TimeValue.timeValueDays(7),
Setting.Property.IndexScope,
Setting.Property.Dynamic
);
// The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this:
private final SetOnce<UpdateTimeSeriesRangeService> updateTimeSeriesRangeService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();
Expand Down Expand Up @@ -140,6 +148,7 @@ public List<Setting<?>> getSettings() {
List<Setting<?>> pluginSettings = new ArrayList<>();
pluginSettings.add(TIME_SERIES_POLL_INTERVAL);
pluginSettings.add(LOOK_AHEAD_TIME);
pluginSettings.add(LOOK_BACK_TIME);

if (DataStreamLifecycle.isFeatureEnabled()) {
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

public class DataStreamIndexSettingsProviderTests extends ESTestCase {

private static final TimeValue DEFAULT_LOOK_BACK_TIME = TimeValue.timeValueHours(2); // default
private static final TimeValue DEFAULT_LOOK_AHEAD_TIME = TimeValue.timeValueHours(2); // default

DataStreamIndexSettingsProvider provider;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testGetAdditionalIndexSettings() throws Exception {
List.of(new CompressedXContent(mapping))
);
assertThat(result.size(), equalTo(3));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("field3"));
}
Expand Down Expand Up @@ -235,10 +236,31 @@ public void testGetAdditionalIndexSettingsLookAheadTime() throws Exception {
List.of(new CompressedXContent("{}"))
);
assertThat(result.size(), equalTo(2));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis())));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis())));
}

public void testGetAdditionalIndexSettingsLookBackTime() throws Exception {
Metadata metadata = Metadata.EMPTY_METADATA;
String dataStreamName = "logs-app1";

Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
TimeValue lookBackTime = TimeValue.timeValueHours(12);
Settings settings = builder().put("index.mode", "time_series").put("index.look_back_time", lookBackTime.getStringRep()).build();
Settings result = provider.getAdditionalIndexSettings(
DataStream.getDefaultBackingIndexName(dataStreamName, 1),
dataStreamName,
true,
metadata,
now,
settings,
List.of(new CompressedXContent("{}"))
);
assertThat(result.size(), equalTo(2));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookBackTime.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
}

public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() throws Exception {
String dataStreamName = "logs-app1";
TimeValue lookAheadTime = TimeValue.timeValueHours(2);
Expand Down Expand Up @@ -358,7 +380,7 @@ public void testGetAdditionalIndexSettingsMigrateToTsdb() {
List.of()
);
assertThat(result.size(), equalTo(2));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.After;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

Expand Down Expand Up @@ -52,7 +51,7 @@ public void testTimeSeriesPollIntervalSettingToHigh() {
assertThat(e.getMessage(), equalTo("failed to parse value [11m] for setting [time_series.poll_interval], must be <= [10m]"));
}

public void testLookAheadTimeSetting() throws IOException {
public void testLookAheadTimeSetting() {
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "10m").build();
updateIndexSettings(settings);
}
Expand All @@ -69,6 +68,18 @@ public void testLookAheadTimeSettingToHigh() {
assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_ahead_time], must be <= [7d]"));
}

public void testLookBackTimeSettingToLow() {
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "1s").build();
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [index.look_back_time], must be >= [1m]"));
}

public void testLookBackTimeSettingToHigh() {
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "8d").build();
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_back_time], must be <= [7d]"));
}

public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() {
{
var settings = Settings.builder()
Expand Down Expand Up @@ -99,7 +110,7 @@ public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() {
}
}

public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() throws IOException {
public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() {
var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m").build();
updateClusterSettings(clusterSettings);
var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "100m").build();
Expand All @@ -110,7 +121,7 @@ private void updateClusterSettings(Settings settings) {
clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(settings)).actionGet();
}

private void updateIndexSettings(Settings settings) throws IOException {
private void updateIndexSettings(Settings settings) {
try {
createIndex("test");
} catch (ResourceAlreadyExistsException e) {
Expand Down

0 comments on commit 554d54c

Please sign in to comment.