Skip to content

Commit

Permalink
Add prefer_v2_templates parameter to Reindex (#56253)
Browse files Browse the repository at this point in the history
* prefer_v2_templates for reindex
  • Loading branch information
probakowski authored May 6, 2020
1 parent 32269f1 commit 7ca47f5
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,11 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime());
}

if (reindexRequest.preferV2Templates() != null) {
params.putParam("prefer_v2_templates", reindexRequest.preferV2Templates().toString());
}

request.addParameters(params.asMap());
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.client.indices.PutIndexTemplateV2Request;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateV2;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -50,6 +56,9 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -113,6 +122,44 @@ public void testReindex() throws IOException {
}
}

public void testReindexPreferV2Templates() throws Exception {
//we don't care about warnings here
RequestOptions options = RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> false).build();

IndexRequest indexRequest = new IndexRequest("sourcev2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
RestHighLevelClient client = highLevelClient();
assertEquals(RestStatus.CREATED, client.index(indexRequest, options).status());

PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("v1", Collections.singletonList("target*"));
assertTrue(client.indices().putTemplate(putIndexTemplateRequest, options).isAcknowledged());

AliasMetadata alias = AliasMetadata.builder("alias").build();
Template template = new Template(null, null, Map.of("alias", alias));
List<String> pattern = Collections.singletonList("target*");
IndexTemplateV2 indexTemplate = new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>());
PutIndexTemplateV2Request putTemplateRequest = new PutIndexTemplateV2Request().name("v2").indexTemplate(indexTemplate);
assertTrue(client.indices().putIndexTemplate(putTemplateRequest, options).isAcknowledged());

ReindexRequest reindexRequest = new ReindexRequest()
.preferV2Templates(true)
.setSourceIndices("sourcev2")
.setDestIndex("target1")
.setRefresh(true);
assertEquals(1, client.reindex(reindexRequest, options).getStatus().getSuccessfullyProcessed());

GetAliasesResponse aliases = client.indices().getAlias(new GetAliasesRequest().indices("target1"), options);
assertEquals(RestStatus.OK, aliases.status());
assertEquals(Collections.singletonMap("target1", Collections.singleton(alias)), aliases.getAliases());

reindexRequest.setDestIndex("target2").preferV2Templates(false);
assertEquals(1, client.reindex(reindexRequest, options).getStatus().getSuccessfullyProcessed());

aliases = client.indices().getAlias(new GetAliasesRequest().indices("target2"), options);
assertEquals(RestStatus.OK, aliases.status());
assertEquals(Collections.singletonMap("target2", Collections.emptySet()), aliases.getAliases());
}

public void testReindexTask() throws Exception {
final String sourceIndex = "source123";
final String destinationIndex = "dest2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected boolean accept(ScrollableHitSource.Hit doc) {
return true;
}

private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
BulkRequest bulkRequest = new BulkRequest();
for (ScrollableHitSource.Hit doc : docs) {
if (accept(doc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
Expand Down Expand Up @@ -217,6 +218,11 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
return super.buildScriptApplier();
}

@Override
protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
return super.buildBulk(docs).preferV2Templates(mainRequest.preferV2Templates());
}

@Override
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
IndexRequest index = new IndexRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ protected ReindexRequest buildRequest(RestRequest request) throws IOException {
if (request.hasParam("scroll")) {
internal.setScroll(parseTimeValue(request.param("scroll"), "scroll"));
}

if (request.hasParam("prefer_v2_templates")) {
internal.preferV2Templates(Boolean.parseBoolean(request.param("prefer_v2_templates")));
}

return internal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
"max_docs":{
"type":"number",
"description":"Maximum number of documents to process (default: all documents)"
},
"prefer_v2_templates": {
"type": "boolean",
"description": "favor V2 templates instead of V1 templates during index creation"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ

private RemoteInfo remoteInfo;

private Boolean preferV2Templates;

public ReindexRequest() {
this(new SearchRequest(), new IndexRequest(), true);
}
Expand All @@ -86,6 +90,9 @@ public ReindexRequest(StreamInput in) throws IOException {
super(in);
destination = new IndexRequest(in);
remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
preferV2Templates = in.readOptionalBoolean();
}
}

@Override
Expand Down Expand Up @@ -250,6 +257,16 @@ public RemoteInfo getRemoteInfo() {
return remoteInfo;
}

public ReindexRequest preferV2Templates(@Nullable Boolean preferV2Templates) {
this.preferV2Templates = preferV2Templates;
return this;
}

@Nullable
public Boolean preferV2Templates() {
return this.preferV2Templates;
}

@Override
public ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) {
ReindexRequest sliced = doForSlice(new ReindexRequest(slice, destination, false), slicingTask, totalSlices);
Expand All @@ -262,6 +279,9 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
destination.writeTo(out);
out.writeOptionalWriteable(remoteInfo);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalBoolean(preferV2Templates);
}
}

@Override
Expand Down

0 comments on commit 7ca47f5

Please sign in to comment.