Skip to content

Commit

Permalink
Reindex max_docs parameter name (#42942)
Browse files Browse the repository at this point in the history
Previously, a reindex request had two different size specifications in the body:
* Outer level, determining the maximum documents to process
* Inside the source element, determining the scroll/batch size.

The outer level size has now been renamed to max_docs to
avoid confusion and clarify its semantics, with backwards compatibility and
deprecation warnings for using size.
Similarly, the size parameter has been renamed to max_docs for
update/delete-by-query to keep the 3 interfaces consistent.

Finally, all 3 endpoints now support max_docs in both body and URL.

Relates #24344
  • Loading branch information
henningandersen authored Jun 7, 2019
1 parent 5929803 commit dea935a
Show file tree
Hide file tree
Showing 34 changed files with 824 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", updateByQueryRequest.getScrollTime());
}
if (updateByQueryRequest.getSize() > 0) {
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
if (updateByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
}
request.addParameters(params.asMap());
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
Expand All @@ -613,8 +613,8 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
}
if (deleteByQueryRequest.getSize() > 0) {
params.putParam("size", Integer.toString(deleteByQueryRequest.getSize()));
if (deleteByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(deleteByQueryRequest.getMaxDocs()));
}
request.addParameters(params.asMap());
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,11 @@ public void testReindex() throws IOException {
reindexRequest.setDestRouting("=cat");
}
if (randomBoolean()) {
reindexRequest.setSize(randomIntBetween(100, 1000));
if (randomBoolean()) {
reindexRequest.setMaxDocs(randomIntBetween(100, 1000));
} else {
reindexRequest.setSize(randomIntBetween(100, 1000));
}
}
if (randomBoolean()) {
reindexRequest.setAbortOnVersionConflict(false);
Expand Down Expand Up @@ -488,8 +492,12 @@ public void testUpdateByQuery() throws IOException {
}
if (randomBoolean()) {
int size = randomIntBetween(100, 1000);
updateByQueryRequest.setSize(size);
expectedParams.put("size", Integer.toString(size));
if (randomBoolean()) {
updateByQueryRequest.setMaxDocs(size);
} else {
updateByQueryRequest.setSize(size);
}
expectedParams.put("max_docs", Integer.toString(size));
}
if (randomBoolean()) {
updateByQueryRequest.setAbortOnVersionConflict(false);
Expand Down Expand Up @@ -538,8 +546,12 @@ public void testDeleteByQuery() throws IOException {
}
if (randomBoolean()) {
int size = randomIntBetween(100, 1000);
deleteByQueryRequest.setSize(size);
expectedParams.put("size", Integer.toString(size));
if (randomBoolean()) {
deleteByQueryRequest.setMaxDocs(size);
} else {
deleteByQueryRequest.setSize(size);
}
expectedParams.put("max_docs", Integer.toString(size));
}
if (randomBoolean()) {
deleteByQueryRequest.setAbortOnVersionConflict(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,9 @@ public void testReindex() throws Exception {
// tag::reindex-request-conflicts
request.setConflicts("proceed"); // <1>
// end::reindex-request-conflicts
// tag::reindex-request-size
request.setSize(10); // <1>
// end::reindex-request-size
// tag::reindex-request-maxDocs
request.setMaxDocs(10); // <1>
// end::reindex-request-maxDocs
// tag::reindex-request-sourceSize
request.setSourceBatchSize(100); // <1>
// end::reindex-request-sourceSize
Expand Down Expand Up @@ -1026,9 +1026,9 @@ public void testUpdateByQuery() throws Exception {
// tag::update-by-query-request-query
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <1>
// end::update-by-query-request-query
// tag::update-by-query-request-size
request.setSize(10); // <1>
// end::update-by-query-request-size
// tag::update-by-query-request-maxDocs
request.setMaxDocs(10); // <1>
// end::update-by-query-request-maxDocs
// tag::update-by-query-request-scrollSize
request.setBatchSize(100); // <1>
// end::update-by-query-request-scrollSize
Expand Down Expand Up @@ -1148,9 +1148,9 @@ public void testDeleteByQuery() throws Exception {
// tag::delete-by-query-request-query
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <1>
// end::delete-by-query-request-query
// tag::delete-by-query-request-size
request.setSize(10); // <1>
// end::delete-by-query-request-size
// tag::delete-by-query-request-maxDocs
request.setMaxDocs(10); // <1>
// end::delete-by-query-request-maxDocs
// tag::delete-by-query-request-scrollSize
request.setBatchSize(100); // <1>
// end::delete-by-query-request-scrollSize
Expand Down
2 changes: 1 addition & 1 deletion docs/java-api/docs/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ otherwise modify the request for matching documents.
include-tagged::{client-reindex-tests}/ReindexDocumentationIT.java[update-by-query-size]
--------------------------------------------------

You can also combine `size` with sorting to limit the documents updated:
You can also combine `maxDocs` with sorting to limit the documents updated:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions docs/java-rest/high-level/document/delete-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ include-tagged::{doc-tests-file}[{api}-request-query]
--------------------------------------------------
<1> Only copy documents which have field `user` set to `kimchy`

It’s also possible to limit the number of processed documents by setting size.
It’s also possible to limit the number of processed documents by setting `maxDocs`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-size]
include-tagged::{doc-tests-file}[{api}-request-maxDocs]
--------------------------------------------------
<1> Only copy 10 documents

Expand Down
6 changes: 3 additions & 3 deletions docs/java-rest/high-level/document/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ include-tagged::{doc-tests-file}[{api}-request-query]
--------------------------------------------------
<1> Only copy documents which have field `user` set to `kimchy`

It’s also possible to limit the number of processed documents by setting size.
It’s also possible to limit the number of processed documents by setting `maxDocs`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-size]
include-tagged::{doc-tests-file}[{api}-request-maxDocs]
--------------------------------------------------
<1> Only copy 10 documents

Expand All @@ -90,7 +90,7 @@ include-tagged::{doc-tests-file}[{api}-request-pipeline]
<1> set pipeline to `my_pipeline`

If you want a particular set of documents from the source index you’ll need to use sort. If possible, prefer a more
selective query to size and sort.
selective query to maxDocs and sort.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions docs/java-rest/high-level/document/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ include-tagged::{doc-tests-file}[{api}-request-query]
--------------------------------------------------
<1> Only copy documents which have field `user` set to `kimchy`

It’s also possible to limit the number of processed documents by setting size.
It’s also possible to limit the number of processed documents by setting `maxDocs`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-size]
include-tagged::{doc-tests-file}[{api}-request-maxDocs]
--------------------------------------------------
<1> Only copy 10 documents

Expand Down
10 changes: 5 additions & 5 deletions docs/reference/docs/delete-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,11 @@ sub-request proportionally.
* Due to the nature of `slices` each sub-request won't get a perfectly even
portion of the documents. All documents will be addressed, but some slices may
be larger than others. Expect larger slices to have a more even distribution.
* Parameters like `requests_per_second` and `size` on a request with `slices`
are distributed proportionally to each sub-request. Combine that with the point
above about distribution being uneven and you should conclude that the using
`size` with `slices` might not result in exactly `size` documents being
deleted.
* Parameters like `requests_per_second` and `max_docs` on a request with
slices` are distributed proportionally to each sub-request. Combine that with
the point above about distribution being uneven and you should conclude that
using `max_docs` with `slices` might not result in exactly `max_docs` documents
being deleted.
* Each sub-request gets a slightly different snapshot of the source index
though these are all taken at approximately the same time.

Expand Down
20 changes: 10 additions & 10 deletions docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ not a good idea to rely on this behavior. Instead, make sure that IDs are unique
using a script.

It's also possible to limit the number of processed documents by setting
`size`. This will only copy a single document from `twitter` to
`max_docs`. This will only copy a single document from `twitter` to
`new_twitter`:

[source,js]
--------------------------------------------------
POST _reindex
{
"size": 1,
"max_docs": 1,
"source": {
"index": "twitter"
},
Expand All @@ -211,14 +211,14 @@ POST _reindex

If you want a particular set of documents from the `twitter` index you'll
need to use `sort`. Sorting makes the scroll less efficient but in some contexts
it's worth it. If possible, prefer a more selective query to `size` and `sort`.
it's worth it. If possible, prefer a more selective query to `max_docs` and `sort`.
This will copy 10000 documents from `twitter` into `new_twitter`:

[source,js]
--------------------------------------------------
POST _reindex
{
"size": 10000,
"max_docs": 10000,
"source": {
"index": "twitter",
"sort": { "date": "desc" }
Expand Down Expand Up @@ -1111,11 +1111,11 @@ sub-request proportionally.
* Due to the nature of `slices` each sub-request won't get a perfectly even
portion of the documents. All documents will be addressed, but some slices may
be larger than others. Expect larger slices to have a more even distribution.
* Parameters like `requests_per_second` and `size` on a request with `slices`
are distributed proportionally to each sub-request. Combine that with the point
above about distribution being uneven and you should conclude that the using
`size` with `slices` might not result in exactly `size` documents being
reindexed.
* Parameters like `requests_per_second` and `max_docs` on a request with
`slices` are distributed proportionally to each sub-request. Combine that with
the point above about distribution being uneven and you should conclude that
using `max_docs` with `slices` might not result in exactly `max_docs` documents
being reindexed.
* Each sub-request gets a slightly different snapshot of the source index,
though these are all taken at approximately the same time.

Expand Down Expand Up @@ -1232,7 +1232,7 @@ to load only the existing data into the new index and rename any fields if neede
----------------------------------------------------------------
POST _reindex
{
"size": 10,
"max_docs": 10,
"source": {
"index": "twitter",
"query": {
Expand Down
10 changes: 5 additions & 5 deletions docs/reference/docs/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,11 @@ sub-request proportionally.
* Due to the nature of `slices` each sub-request won't get a perfectly even
portion of the documents. All documents will be addressed, but some slices may
be larger than others. Expect larger slices to have a more even distribution.
* Parameters like `requests_per_second` and `size` on a request with `slices`
are distributed proportionally to each sub-request. Combine that with the point
above about distribution being uneven and you should conclude that the using
`size` with `slices` might not result in exactly `size` documents being
updated.
* Parameters like `requests_per_second` and `max_docs` on a request with
`slices` are distributed proportionally to each sub-request. Combine that with
the point above about distribution being uneven and you should conclude that
using `max_docs` with `slices` might not result in exactly `max_docs` documents
being updated.
* Each sub-request gets a slightly different snapshot of the source index
though these are all taken at approximately the same time.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;

Expand Down Expand Up @@ -263,8 +263,8 @@ void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, Scrollabl
return;
}
long total = response.getTotalHits();
if (mainRequest.getSize() > 0) {
total = min(total, mainRequest.getSize());
if (mainRequest.getMaxDocs() > 0) {
total = min(total, mainRequest.getMaxDocs());
}
worker.setTotal(total);
AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() {
Expand Down Expand Up @@ -304,9 +304,9 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon
}
worker.countBatch();
List<? extends ScrollableHitSource.Hit> hits = response.getHits();
if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
// Truncate the hits if we have more than the request size
long remaining = max(0, mainRequest.getSize() - worker.getSuccessfullyProcessed());
if (mainRequest.getMaxDocs() != MAX_DOCS_ALL_MATCHES) {
// Truncate the hits if we have more than the request max docs
long remaining = max(0, mainRequest.getMaxDocs() - worker.getSuccessfullyProcessed());
if (remaining < hits.size()) {
hits = hits.subList(0, (int) remaining);
}
Expand Down Expand Up @@ -395,7 +395,7 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
return;
}

if (mainRequest.getSize() != SIZE_ALL_MATCHES && worker.getSuccessfullyProcessed() >= mainRequest.getSize()) {
if (mainRequest.getMaxDocs() != MAX_DOCS_ALL_MATCHES && worker.getSuccessfullyProcessed() >= mainRequest.getMaxDocs()) {
// We've processed all the requested docs.
refreshAndFinish(emptyList(), emptyList(), false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -105,6 +105,11 @@ protected Request setCommonOptions(RestRequest restRequest, Request request) {
if (requestsPerSecond != null) {
request.setRequestsPerSecond(requestsPerSecond);
}

if (restRequest.hasParam("max_docs")) {
setMaxDocsValidateIdentical(request, restRequest.paramAsInt("max_docs", -1));
}

return request;
}

Expand Down Expand Up @@ -170,4 +175,13 @@ public static Float parseRequestsPerSecond(RestRequest request) {
}
return requestsPerSecond;
}

static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest<?> request, int maxDocs) {
if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) {
throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" +
" and [" + maxDocs + "]");
} else {
request.setMaxDocs(maxDocs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -52,7 +53,7 @@ protected void parseInternalRequest(Request internal, RestRequest restRequest,
SearchRequest searchRequest = internal.getSearchRequest();

try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, internal::setSize);
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> setMaxDocsFromSearchSize(internal, size));
}

searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));
Expand Down Expand Up @@ -94,4 +95,9 @@ private XContentParser extractRequestSpecificFields(RestRequest restRequest,
parser.getDeprecationHandler(), BytesReference.bytes(builder.map(body)).streamInput());
}
}

private void setMaxDocsFromSearchSize(Request request, int size) {
LoggingDeprecationHandler.INSTANCE.usedDeprecatedName("size", "max_docs");
setMaxDocsValidateIdentical(request, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOExcept

Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq

PARSER.declareField(sourceParser::parse, new ParseField("source"), ValueType.OBJECT);
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ValueType.OBJECT);
PARSER.declareInt(ReindexRequest::setSize, new ParseField("size"));
PARSER.declareInt(RestReindexAction::setMaxDocsValidateIdentical, new ParseField("max_docs", "size"));
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"),
ValueType.OBJECT);
PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOExcept
Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("script", o -> internal.setScript(parseScript(o)));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);

Expand Down
Loading

0 comments on commit dea935a

Please sign in to comment.