Skip to content

Commit

Permalink
Add index and reindex request settings to speed up reindex (elastic#1…
Browse files Browse the repository at this point in the history
…19780)

- set slices:auto on the reindex request
- set refresh_interval: -1 on destination index before reindexing into it
- set number_of_replicas: 0 on destination index before reindexing into it
- reset refresh_interval and number_of_replicas to previous value or default after reindex

(cherry picked from commit c4024dc)
  • Loading branch information
parkertimmins committed Jan 10, 2025
1 parent 3b012c0 commit 997b946
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/changelog/119780.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119780
summary: Add index and reindex request settings to speed up reindex
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -155,7 +157,11 @@ public void testSettingsAddedBeforeReindex() throws Exception {

// update with a dynamic setting
var numReplicas = randomIntBetween(0, 10);
var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build();
var refreshInterval = randomIntBetween(1, 100) + "s";
var dynamicSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), refreshInterval)
.build();
indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet();

// call reindex
Expand All @@ -167,6 +173,7 @@ public void testSettingsAddedBeforeReindex() throws Exception {
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)));
assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)));
assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey()));
}

public void testMappingsAddedToDestIndex() throws Exception {
Expand Down Expand Up @@ -229,6 +236,38 @@ public void testReadOnlyAddedBack() {
removeReadOnly(destIndex);
}

public void testUpdateSettingsDefaultsRestored() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

// ESIntegTestCase creates a template random_index_template which contains a value for number_of_replicas.
// Since this test checks the behavior of default settings, there cannot be a value for number_of_replicas,
// so we delete the template within this method. This has no effect on other tests which will still
// have the template created during their setup.
assertAcked(
indicesAdmin().execute(TransportDeleteIndexTemplateAction.TYPE, new DeleteIndexTemplateRequest("random_index_template"))
);

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// call reindex
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
.actionGet()
.getDestIndex();

var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(sourceIndex, destIndex)).actionGet();
var destSettings = settingsResponse.getIndexToSettings().get(destIndex);

assertEquals(
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getDefault(destSettings),
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(destSettings)
);
assertEquals(
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getDefault(destSettings),
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.get(destSettings)
);
}

public void testSettingsAndMappingsFromTemplate() throws IOException {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
Expand Down Expand Up @@ -95,6 +97,7 @@ protected void doExecute(
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> copyOldSourceSettingsToDest(settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
Expand Down Expand Up @@ -147,6 +150,8 @@ private void createIndex(
var removeReadOnlyOverride = Settings.builder()
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_BLOCKS_WRITE)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build();

var request = new CreateIndexFromSourceAction.Request(
Expand All @@ -168,6 +173,7 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene
reindexRequest.getSearchRequest().source().fetchSource(true);
reindexRequest.setDestIndex(destIndexName);
reindexRequest.setParentTask(parentTaskId);
reindexRequest.setSlices(0); // equivalent to slices=auto in rest api
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
}

Expand All @@ -186,6 +192,35 @@ private void addBlockIfFromSource(
}
}

private void copyOldSourceSettingsToDest(
Settings settingsBefore,
String destIndexName,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
logger.debug("Updating settings on destination index after reindex completes");

var settings = Settings.builder();
copySettingOrUnset(settingsBefore, settings, IndexMetadata.SETTING_NUMBER_OF_REPLICAS);
copySettingOrUnset(settingsBefore, settings, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey());

var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), destIndexName);
updateSettingsRequest.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName);
client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage));
}

private static void copySettingOrUnset(Settings settingsBefore, Settings.Builder builder, String setting) {
// if setting was explicitly added to the source index
if (settingsBefore.get(setting) != null) {
// copy it back to the dest index
builder.copy(setting, settingsBefore);
} else {
// otherwise, delete from dest index so that it loads from the settings default
builder.putNull(setting);
}
}

public static String generateDestIndexName(String sourceIndex) {
return "migrated-" + sourceIndex;
}
Expand Down

0 comments on commit 997b946

Please sign in to comment.