Skip to content

Commit

Permalink
Improve how reindex data stream index action handles api blocks (elas…
Browse files Browse the repository at this point in the history
…tic#120084)

- fail quickly if index has read or metadata block
- add write block, but proceed if already has read-only or read-only-allow-delete
- filter write, read-only, and read-only-allow-delete from dest index
- results in dest index having no blocks
  • Loading branch information
parkertimmins committed Jan 14, 2025
1 parent cf63a73 commit f7f1db6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 25 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120084.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120084
summary: Improve how reindex data stream index action handles api blocks
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -223,16 +224,46 @@ public void testMappingsAddedToDestIndex() throws Exception {
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
}

public void testReadOnlyAddedBack() {
public void testFailIfMetadataBlockSet() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet();

try {
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet();
} catch (ElasticsearchException e) {
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
}

cleanupMetadataBlocks(sourceIndex);
}

public void testFailIfReadBlockSet() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet();

try {
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet();
} catch (ElasticsearchException e) {
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
}

cleanupMetadataBlocks(sourceIndex);
}

public void testReadOnlyBlocksNotAddedBack() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

// Create source index with read-only and/or block-writes
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
boolean isReadOnly = randomBoolean();
boolean isBlockWrites = randomBoolean();
var settings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY, isReadOnly)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, isBlockWrites)
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, randomBoolean())
.put(IndexMetadata.SETTING_BLOCKS_WRITE, randomBoolean())
.build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet();

Expand All @@ -241,13 +272,13 @@ public void testReadOnlyAddedBack() {
.actionGet()
.getDestIndex();

// assert read-only settings added back to dest index
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(isReadOnly, Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY)));
assertEquals(isBlockWrites, Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
assertFalse(Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY)));
assertFalse(Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
assertFalse(Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));

removeReadOnly(sourceIndex);
removeReadOnly(destIndex);
cleanupMetadataBlocks(sourceIndex);
cleanupMetadataBlocks(destIndex);
}

public void testUpdateSettingsDefaultsRestored() {
Expand Down Expand Up @@ -426,10 +457,11 @@ public void testTsdbStartEndSet() throws Exception {
// TODO check other IndexMetadata fields that need to be fixed after the fact
// TODO what happens if don't have necessary perms for a given index?

private static void removeReadOnly(String index) {
private static void cleanupMetadataBlocks(String index) {
var settings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY, false)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, false)
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
.build();
assertAcked(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)).actionGet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;

public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
Expand Down Expand Up @@ -93,13 +92,22 @@ protected void doExecute(
);
}

if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_READ, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a read block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_METADATA, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a metadata block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}

SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> copyOldSourceSettingsToDest(settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
.addListener(listener);
}
Expand All @@ -120,7 +128,8 @@ public void onResponse(AddIndexBlockResponse response) {
@Override
public void onFailure(Exception e) {
if (e instanceof ClusterBlockException || e.getCause() instanceof ClusterBlockException) {
// It's fine if block-writes is already set
// Could fail with a cluster block exception if read-only or read-only-allow-delete is already set
// In this case, we can proceed
listener.onResponse(null);
} else {
listener.onFailure(e);
Expand All @@ -146,10 +155,12 @@ private void createIndex(
) {
logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName());

// override read-only settings if they exist
var removeReadOnlyOverride = Settings.builder()
// remove read-only settings if they exist
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_WRITE)
// settings to optimize reindex
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build();
Expand Down Expand Up @@ -192,22 +203,29 @@ private void addBlockIfFromSource(
}
}

private void updateSettings(
String index,
Settings.Builder settings,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), index);
updateSettingsRequest.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", index);
client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage));
}

private void copyOldSourceSettingsToDest(
Settings settingsBefore,
String destIndexName,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
logger.debug("Updating settings on destination index after reindex completes");

var settings = Settings.builder();
copySettingOrUnset(settingsBefore, settings, IndexMetadata.SETTING_NUMBER_OF_REPLICAS);
copySettingOrUnset(settingsBefore, settings, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey());

var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), destIndexName);
updateSettingsRequest.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName);
client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage));
updateSettings(destIndexName, settings, listener, parentTaskId);
}

private static void copySettingOrUnset(Settings settingsBefore, Settings.Builder builder, String setting) {
Expand Down

0 comments on commit f7f1db6

Please sign in to comment.