Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefer_v2_templates parameter to Reindex #56253

Merged
merged 6 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,11 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime());
}

if (reindexRequest.preferV2Templates() != null) {
params.putParam("prefer_v2_templates", reindexRequest.preferV2Templates().toString());
}

request.addParameters(params.asMap());
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.client.indices.PutIndexTemplateV2Request;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateV2;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -50,6 +56,9 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -113,6 +122,44 @@ public void testReindex() throws IOException {
}
}

public void testReindexPreferV2Templates() throws Exception {
//we don't care about warnings here
RequestOptions options = RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> false).build();

IndexRequest indexRequest = new IndexRequest("sourcev2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
RestHighLevelClient client = highLevelClient();
assertEquals(RestStatus.CREATED, client.index(indexRequest, options).status());

PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("v1", Collections.singletonList("target*"));
assertTrue(client.indices().putTemplate(putIndexTemplateRequest, options).isAcknowledged());

AliasMetadata alias = AliasMetadata.builder("alias").build();
Template template = new Template(null, null, Map.of("alias", alias));
List<String> pattern = Collections.singletonList("target*");
IndexTemplateV2 indexTemplate = new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>());
PutIndexTemplateV2Request putTemplateRequest = new PutIndexTemplateV2Request().name("v2").indexTemplate(indexTemplate);
assertTrue(client.indices().putIndexTemplate(putTemplateRequest, options).isAcknowledged());

ReindexRequest reindexRequest = new ReindexRequest()
.preferV2Templates(true)
.setSourceIndices("sourcev2")
.setDestIndex("target1")
.setRefresh(true);
assertEquals(1, client.reindex(reindexRequest, options).getStatus().getSuccessfullyProcessed());

GetAliasesResponse aliases = client.indices().getAlias(new GetAliasesRequest().indices("target1"), options);
assertEquals(RestStatus.OK, aliases.status());
assertEquals(Collections.singletonMap("target1", Collections.singleton(alias)), aliases.getAliases());

reindexRequest.setDestIndex("target2").preferV2Templates(false);
assertEquals(1, client.reindex(reindexRequest, options).getStatus().getSuccessfullyProcessed());

aliases = client.indices().getAlias(new GetAliasesRequest().indices("target2"), options);
assertEquals(RestStatus.OK, aliases.status());
assertEquals(Collections.singletonMap("target2", Collections.emptySet()), aliases.getAliases());
}

public void testReindexTask() throws Exception {
final String sourceIndex = "source123";
final String destinationIndex = "dest2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected boolean accept(ScrollableHitSource.Hit doc) {
return true;
}

private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
BulkRequest bulkRequest = new BulkRequest();
for (ScrollableHitSource.Hit doc : docs) {
if (accept(doc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
Expand Down Expand Up @@ -217,6 +218,11 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
return super.buildScriptApplier();
}

@Override
protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
return super.buildBulk(docs).preferV2Templates(mainRequest.preferV2Templates());
}

@Override
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
IndexRequest index = new IndexRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ protected ReindexRequest buildRequest(RestRequest request) throws IOException {
if (request.hasParam("scroll")) {
internal.setScroll(parseTimeValue(request.param("scroll"), "scroll"));
}

if (request.hasParam("prefer_v2_templates")) {
internal.preferV2Templates(Boolean.parseBoolean(request.param("prefer_v2_templates")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for 8.0 we want this to default to true if it's not specified by the user (this is just a reminder to change this after backport)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should change it here or simply pass null down the road and let create index request pick up correct default (for making it DRY and using only single place to set defaults)

}

return internal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
"max_docs":{
"type":"number",
"description":"Maximum number of documents to process (default: all documents)"
},
"prefer_v2_templates": {
"type": "boolean",
"description": "favor V2 templates instead of V1 templates during index creation"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ

private RemoteInfo remoteInfo;

private Boolean preferV2Templates;

public ReindexRequest() {
this(new SearchRequest(), new IndexRequest(), true);
}
Expand All @@ -86,6 +90,9 @@ public ReindexRequest(StreamInput in) throws IOException {
super(in);
destination = new IndexRequest(in);
remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
preferV2Templates = in.readOptionalBoolean();
}
}

@Override
Expand Down Expand Up @@ -250,6 +257,16 @@ public RemoteInfo getRemoteInfo() {
return remoteInfo;
}

public ReindexRequest preferV2Templates(@Nullable Boolean preferV2Templates) {
this.preferV2Templates = preferV2Templates;
return this;
}

@Nullable
public Boolean preferV2Templates() {
return this.preferV2Templates;
}

@Override
public ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) {
ReindexRequest sliced = doForSlice(new ReindexRequest(slice, destination, false), slicingTask, totalSlices);
Expand All @@ -262,6 +279,9 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
destination.writeTo(out);
out.writeOptionalWriteable(remoteInfo);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalBoolean(preferV2Templates);
}
}

@Override
Expand Down