Skip to content

Commit

Permalink
HLRC: reindex API with wait_for_completion false (elastic#35202)
Browse files Browse the repository at this point in the history
Extend High Level Rest Client Reindex API to support requests with
wait_for_completion=false. This method will return a TaskSubmissionResult with task identifier as string and results can be queried with Task API

refers: elastic#27205
  • Loading branch information
pgomulka committed Nov 13, 2018
1 parent e4c987c commit 2773b8b
Show file tree
Hide file tree
Showing 10 changed files with 400 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,18 @@ static Request rankEval(RankEvalRequest rankEvalRequest) throws IOException {
}

static Request reindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, true);
}

static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, false);
}

private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withWaitForCompletion(waitForCompletion)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
Expand Down Expand Up @@ -897,11 +906,8 @@ Params withDetailed(boolean detailed) {
return this;
}

Params withWaitForCompletion(boolean waitForCompletion) {
if (waitForCompletion) {
return putParam("wait_for_completion", Boolean.TRUE.toString());
}
return this;
Params withWaitForCompletion(Boolean waitForCompletion) {
return putParam("wait_for_completion", waitForCompletion.toString());
}

Params withNodes(String[] nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -476,6 +477,20 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request
);
}

/**
* Submits a reindex task.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
* @param reindexRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final TaskSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
reindexRequest, RequestConverters::submitReindex, options, TaskSubmissionResponse::fromXContent, emptySet()
);
}

/**
* Asynchronously executes a reindex request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.tasks;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

public class TaskSubmissionResponse extends ActionResponse {

private static final ParseField TASK = new ParseField("task");

public static final ConstructingObjectParser<TaskSubmissionResponse, Void> PARSER = new ConstructingObjectParser<>(
"task_submission_response",
true, a -> new TaskSubmissionResponse((String) a[0]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK);
}

private final String task;

TaskSubmissionResponse(String task) {
this.task = task;
}

/**
* Get the task id
*
* @return the id of the reindex task.
*/
public String getTask() {
return task;
}

@Override
public int hashCode() {
return Objects.hash(task);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
TaskSubmissionResponse that = (TaskSubmissionResponse) other;
return Objects.equals(task, that.task);
}

public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -706,111 +704,6 @@ public void testBulk() throws IOException {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}

public void testReindex() throws Exception {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test2: create 1 and update 1
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(2, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test reindex rethrottling
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);

// this following settings are supposed to halt reindexing after first document
reindexRequest.setSourceBatchSize(1);
reindexRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {

@Override
public void onResponse(BulkByScrollResponse response) {
reindexTaskFinished.countDown();
}

@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});

TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
reindexTaskFinished.await(2, TimeUnit.SECONDS);

// any rethrottling after the reindex is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}

private TaskId findTaskToRethrottle(String actionName) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;
import java.util.function.BooleanSupplier;

public class ReindexIT extends ESRestHighLevelClientTestCase {

public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// reindex one document with id 1 from source to destination
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);

BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);

assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
}

public void testReindexTask() throws IOException, InterruptedException {
final String sourceIndex = "source123";
final String destinationIndex = "dest2";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);

TaskSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask());
awaitBusy(hasUpgradeCompleted);
}
}

private BooleanSupplier checkCompletionStatus(String taskId) {
return () -> {
try {
Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId));
return (boolean) entityAsMap(response).get("completed");
} catch (IOException e) {
fail(e.getMessage());
return false;
}
};
}
}
Loading

0 comments on commit 2773b8b

Please sign in to comment.