Skip to content

Commit

Permalink
Propogate version in reindex from remote search (elastic#42412)
Browse files Browse the repository at this point in the history
This is related to elastic#31908. In order to use the external version in a
reindex from remote request, the search request must be configured to
request the version (as it is not returned by default). This commit
modifies the search request to request the version. Additionally, it
modifies our current reindex from remote tests to randomly use the
external version_type.
  • Loading branch information
Tim-Brooks committed Jun 6, 2019
1 parent 2de919e commit 9e36add
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 41 deletions.
2 changes: 1 addition & 1 deletion modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ forbiddenPatterns {
exclude '**/*.p12'
}

// Support for testing reindex-from-remote against old Elaticsearch versions
// Support for testing reindex-from-remote against old Elasticsearch versions
configurations {
oldesFixture
es2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,13 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
request.addParameter("scroll", keepAlive.getStringRep());
}
request.addParameter("size", Integer.toString(searchRequest.source().size()));
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
/*
* Passing `null` here just add the `version` request parameter
* without any value. This way of requesting the version works
* for all supported versions of Elasticsearch.
*/
request.addParameter("version", null);

if (searchRequest.source().version() == null || searchRequest.source().version() == false) {
request.addParameter("version", Boolean.FALSE.toString());
} else {
request.addParameter("version", Boolean.TRUE.toString());
}

if (searchRequest.source().sorts() != null) {
boolean useScan = false;
// Detect if we should use search_type=scan rather than a sort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,35 @@ public void testReindexFromRemote() throws IOException {
Map<?, ?> http = (Map<?, ?>) nodeInfo.get("http");
String remote = "http://"+ http.get("publish_address");
Request request = new Request("POST", "/_reindex");
request.setJsonEntity(
if (randomBoolean()) {
request.setJsonEntity(
"{\n" +
" \"source\":{\n" +
" \"index\":\"test\",\n" +
" \"remote\":{\n" +
" \"host\":\"" + remote + "\"\n" +
" }\n" +
" }\n," +
" \"dest\":{\n" +
" \"index\":\"des\"\n" +
" }\n" +
"}");
" \"source\":{\n" +
" \"index\":\"test\",\n" +
" \"remote\":{\n" +
" \"host\":\"" + remote + "\"\n" +
" }\n" +
" }\n," +
" \"dest\":{\n" +
" \"index\":\"des\"\n" +
" }\n" +
"}");
} else {
// Test with external version_type
request.setJsonEntity(
"{\n" +
" \"source\":{\n" +
" \"index\":\"test\",\n" +
" \"remote\":{\n" +
" \"host\":\"" + remote + "\"\n" +
" }\n" +
" }\n," +
" \"dest\":{\n" +
" \"index\":\"des\",\n" +
" \"version_type\": \"external\"\n" +
" }\n" +
"}");
}
Map<String, Object> response = entityAsMap(client().performRequest(request));
assertThat(response, hasEntry("total", count));
assertThat(response, hasEntry("created", count));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,38 @@ private void oldEsTestCase(String portPropertyName, String requestsPerSecond) th
}

Request reindex = new Request("POST", "/_reindex");
reindex.setJsonEntity(
if (randomBoolean()) {
// Reindex using the external version_type
reindex.setJsonEntity(
"{\n"
+ " \"source\":{\n"
+ " \"index\": \"test\",\n"
+ " \"size\": 1,\n"
+ " \"remote\": {\n"
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
+ " }\n"
+ " },\n"
+ " \"dest\": {\n"
+ " \"index\": \"test\"\n"
+ " }\n"
+ "}");
+ " \"source\":{\n"
+ " \"index\": \"test\",\n"
+ " \"size\": 1,\n"
+ " \"remote\": {\n"
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
+ " }\n"
+ " },\n"
+ " \"dest\": {\n"
+ " \"index\": \"test\",\n"
+ " \"version_type\": \"external\"\n"
+ " }\n"
+ "}");
} else {
// Reindex using the default internal version_type
reindex.setJsonEntity(
"{\n"
+ " \"source\":{\n"
+ " \"index\": \"test\",\n"
+ " \"size\": 1,\n"
+ " \"remote\": {\n"
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
+ " }\n"
+ " },\n"
+ " \"dest\": {\n"
+ " \"index\": \"test\"\n"
+ " }\n"
+ "}");
}
reindex.addParameter("refresh", "true");
reindex.addParameter("pretty", "true");
if (requestsPerSecond != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testInitialSearchParamsFields() {
// Test request without any fields
Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id));
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
not(either(hasKey("stored_fields")).or(hasKey("fields"))));

// Test stored_fields for versions that support it
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
Expand All @@ -162,14 +162,14 @@ public void testInitialSearchParamsFields() {
searchRequest.source().storedField("_source").storedField("_id");
remoteVersion = Version.fromId(between(0, 2000099 - 1));
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));

// But only versions before 1.0 force _source to be in the list
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
searchRequest.source().storedField("_id");
remoteVersion = Version.fromId(between(1000099, 2000099 - 1));
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
hasEntry("fields", "_id,_parent,_routing,_ttl"));
hasEntry("fields", "_id,_parent,_routing,_ttl"));
}

public void testInitialSearchParamsMisc() {
Expand All @@ -189,7 +189,7 @@ public void testInitialSearchParamsMisc() {
fetchVersion = randomBoolean();
searchRequest.source().version(fetchVersion);
}

Map<String, String> params = initialSearch(searchRequest, query, remoteVersion).getParameters();

if (scroll == null) {
Expand All @@ -198,7 +198,12 @@ public void testInitialSearchParamsMisc() {
assertScroll(remoteVersion, params, scroll);
}
assertThat(params, hasEntry("size", Integer.toString(size)));
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
if (fetchVersion != null) {
assertThat(params, fetchVersion ? hasEntry("version", Boolean.TRUE.toString()) :
hasEntry("version", Boolean.FALSE.toString()));
} else {
assertThat(params, hasEntry("version", Boolean.FALSE.toString()));
}
}

private void assertScroll(Version remoteVersion, Map<String, String> params, TimeValue requested) {
Expand All @@ -225,22 +230,22 @@ public void testInitialSearchEntity() throws IOException {
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
assertEquals("{\"query\":" + query + ",\"_source\":true}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
} else {
assertEquals("{\"query\":" + query + "}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
}

// Source filtering is included if set up
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
searchRequest.source().fetchSource(new String[]{"in1", "in2"}, new String[]{"out"});
entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));

// Invalid XContent fails
RuntimeException e = expectThrows(RuntimeException.class,
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
e = expectThrows(RuntimeException.class, () -> initialSearch(searchRequest, new BytesArray("{"), remoteVersion));
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));
Expand Down

0 comments on commit 9e36add

Please sign in to comment.