Skip to content

Commit

Permalink
[ML-DataFrame] Ensure latest index template exists before indexing do…
Browse files Browse the repository at this point in the history
…cs (#46553)

When upgrading data nodes to a newer version before
master nodes there was a risk that a transform running
on an upgraded data node would index a document into
the new transforms internal index before its index
template was created.  This would cause the index to
be created with entirely dynamic mappings.

This change introduces a check before indexing any
internal transforms document to ensure that the required
index template exists and create it if it doesn't.

Fixes #46501
  • Loading branch information
droberts195 authored Sep 11, 2019
1 parent d0a7bbc commit 4207fcf
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@
package org.elasticsearch.xpack.transform.persistence;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.transform.DataFrameField;
Expand All @@ -27,6 +37,8 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.transform.DataFrameField.TRANSFORM_ID;

public final class DataFrameInternalIndex {
Expand Down Expand Up @@ -130,6 +142,10 @@ private static XContentBuilder auditMappings() throws IOException {

public static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder();
return mappings(builder);
}

public static XContentBuilder mappings(XContentBuilder builder) throws IOException {
builder.startObject();

builder.startObject(MapperService.SINGLE_MAPPING_NAME);
Expand Down Expand Up @@ -301,6 +317,46 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw
.endObject();
}

public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(LATEST_INDEX_VERSIONED_NAME);
}

/**
* This method should be called before any document is indexed that relies on the
* existence of the latest index template to create the internal index. The
* reason is that the standard template upgrader only runs when the master node
* is upgraded to the newer version. If data nodes are upgraded before master
* nodes and transforms get assigned to those data nodes then without this check
* the data nodes will index documents into the internal index before the necessary
* index template is present and this will result in an index with completely
* dynamic mappings being created (which is very bad).
*/
public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client,
ActionListener<Void> listener) {

// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestVersionedIndexTemplate(clusterService.state())) {
listener.onResponse(null);
return;
}

// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetaData indexTemplateMetaData = getIndexTemplateMetaData();
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(LATEST_INDEX_VERSIONED_NAME)
.patterns(indexTemplateMetaData.patterns())
.version(indexTemplateMetaData.version())
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
innerListener, client.admin().indices()::putTemplate);
} catch (IOException e) {
listener.onFailure(e);
}
}

private DataFrameInternalIndex() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final DataFrameAuditor auditor;
private volatile int numFailureRetries;

Expand All @@ -81,6 +82,7 @@ public DataFrameTransformPersistentTasksExecutor(Client client,
this.schedulerEngine = schedulerEngine;
this.auditor = auditor;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.numFailureRetries = DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
Expand Down Expand Up @@ -144,7 +146,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
);

// <5> load next checkpoint
// <7> load next checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(
nextCheckpoint -> {

Expand All @@ -171,7 +173,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
}
);

// <4> load last checkpoint
// <6> load last checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(
lastCheckpoint -> {
indexerBuilder.setLastCheckpoint(lastCheckpoint);
Expand All @@ -188,7 +190,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
}
);

// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// <5> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<Tuple<DataFrameTransformStoredDoc, SeqNoPrimaryTermAndIndex>> transformStatsActionListener = ActionListener.wrap(
Expand Down Expand Up @@ -230,7 +232,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
}
);

// <2> set fieldmappings for the indexer, get the previous stats (if they exist)
// <4> set fieldmappings for the indexer, get the previous stats (if they exist)
ActionListener<Map<String, String>> getFieldMappingsListener = ActionListener.wrap(
fieldMappings -> {
indexerBuilder.setFieldMappings(fieldMappings);
Expand All @@ -244,7 +246,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
}
);

// <1> Validate the transform, assigning it to the indexer, and get the field mappings
// <3> Validate the transform, assigning it to the indexer, and get the field mappings
ActionListener<DataFrameTransformConfig> getTransformConfigListener = ActionListener.wrap(
config -> {
if (config.isValid()) {
Expand All @@ -261,8 +263,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
markAsFailed(buildTask, msg);
}
);
// <0> Get the transform config
transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener);

// <2> Get the transform config
ActionListener<Void> templateCheckListener = ActionListener.wrap(
aVoid -> transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener),
error -> {
String msg = "Failed to create internal index mappings";
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
);

// <1> Check the internal index template is installed
DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
}

private static IndexerState currentIndexerState(DataFrameTransformState previousState) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.persistence;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class DataFrameInternalIndexTests extends ESTestCase {

public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;

static {
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
try {
mapBuilder.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
MetaData.Builder metaBuilder = MetaData.builder();
metaBuilder.templates(mapBuilder.build());
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();
}

public void testHaveLatestVersionedIndexTemplate() {

assertTrue(DataFrameInternalIndex.haveLatestVersionedIndexTemplate(STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE));
assertFalse(DataFrameInternalIndex.haveLatestVersionedIndexTemplate(ClusterState.EMPTY_STATE));
}

public void testInstallLatestVersionedIndexTemplateIfRequired_GivenNotRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);

Client client = mock(Client.class);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
}

public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
verify(client, times(1)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(1)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.xpack.transform.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.transform.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.transform.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.transform.persistence.DataFrameInternalIndexTests;
import org.elasticsearch.xpack.transform.persistence.DataFrameTransformsConfigManager;

import java.util.ArrayList;
Expand Down Expand Up @@ -106,6 +107,7 @@ public void testNodeVersionAssignment() {
Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(cSettings);
when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client,
transformsConfigManager,
dataFrameTransformsCheckpointService, mock(SchedulerEngine.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,26 @@ setup:
- do:
data_frame.delete_data_frame_transform:
transform_id: "mixed-simple-continuous-transform"

---
"Test index mappings for latest internal index":
- do:
data_frame.put_data_frame_transform:
transform_id: "upgraded-simple-transform"
defer_validation: true
body: >
{
"source": { "index": "dataframe-transform-airline-data" },
"dest": { "index": "upgraded-simple-transform-idx" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }

- do:
indices.get_mapping:
index: .data-frame-internal-2
- match: { \.data-frame-internal-2.mappings.dynamic: "false" }
- match: { \.data-frame-internal-2.mappings.properties.id.type: "keyword" }

0 comments on commit 4207fcf

Please sign in to comment.