-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add index and reindex request settings to speed up reindex #119780
Changes from 3 commits
ea30dac
8eaa71d
12652fe
b66e40a
00847ca
e4d20b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 119780 | ||
summary: Add index and reindex request settings to speed up reindex | ||
area: Data streams | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; | ||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; | ||
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; | ||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.HandledTransportAction; | ||
import org.elasticsearch.action.support.IndicesOptions; | ||
|
@@ -25,6 +26,7 @@ | |
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.reindex.BulkByScrollResponse; | ||
import org.elasticsearch.index.reindex.ReindexAction; | ||
import org.elasticsearch.index.reindex.ReindexRequest; | ||
|
@@ -95,6 +97,7 @@ protected void doExecute( | |
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId)) | ||
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId)) | ||
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId)) | ||
.<AcknowledgedResponse>andThen(l -> updateSettings(settingsBefore, destIndexName, l, taskId)) | ||
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId)) | ||
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId)) | ||
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) | ||
|
@@ -147,6 +150,8 @@ private void createIndex( | |
var removeReadOnlyOverride = Settings.builder() | ||
.putNull(IndexMetadata.SETTING_READ_ONLY) | ||
.putNull(IndexMetadata.SETTING_BLOCKS_WRITE) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) | ||
.build(); | ||
|
||
var request = new CreateIndexFromSourceAction.Request( | ||
|
@@ -168,6 +173,7 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene | |
reindexRequest.getSearchRequest().source().fetchSource(true); | ||
reindexRequest.setDestIndex(destIndexName); | ||
reindexRequest.setParentTask(parentTaskId); | ||
reindexRequest.setSlices(0); // equivalent to slices=auto in rest api | ||
client.execute(ReindexAction.INSTANCE, reindexRequest, listener); | ||
} | ||
|
||
|
@@ -186,6 +192,35 @@ private void addBlockIfFromSource( | |
} | ||
} | ||
|
||
private void updateSettings( | ||
Settings settingsBefore, | ||
String destIndexName, | ||
ActionListener<AcknowledgedResponse> listener, | ||
TaskId parentTaskId | ||
) { | ||
logger.debug("Updating settings on destination index after reindex completes"); | ||
|
||
var settings = Settings.builder(); | ||
copySettingOrUnset(settingsBefore, settings, IndexMetadata.SETTING_NUMBER_OF_REPLICAS); | ||
copySettingOrUnset(settingsBefore, settings, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey()); | ||
|
||
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), destIndexName); | ||
updateSettingsRequest.setParentTask(parentTaskId); | ||
var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName); | ||
client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); | ||
} | ||
|
||
private static void copySettingOrUnset(Settings settingsBefore, Settings.Builder builder, String setting) { | ||
// if setting was explicitly added to the source index | ||
if (settingsBefore.get(setting) != null) { | ||
// copy it back to the dest index | ||
builder.copy(setting, settingsBefore); | ||
} else { | ||
// otherwise, delete from dest index so that it loads from the settings default | ||
builder.putNull(setting); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we could get the current setting value using one of the methods that falls back to the default, then set this value on the dest index. But I prefer the current version as it avoids adding a settings explicitly to the dest index which had been unset (and using the default) on the source index. |
||
} | ||
} | ||
|
||
public static String generateDestIndexName(String sourceIndex) { | ||
return "migrated-" + sourceIndex; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit hacky, but not sure of a better way to force
number_of_replicas
to use the default value.