Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-DataFrame] Dataframe REST cleanups #39451

Merged
merged 6 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public final class DataFrameField {
// common strings
public static final String TASK_NAME = "data_frame/transforms";
public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH + "transforms/{id}/";
public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";

// note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class DataFrameMessages {
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
"Failed to start persistent task, configuration has been cleaned up: [{0}]";
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";

public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

import java.io.IOException;
import java.util.Map;

public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: All these tests seem like they could just be yml tests.


private static boolean indicesCreated = false;

// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}

@Before
public void createIndexes() throws IOException {

// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}

createReviewsIndex();
indicesCreated = true;
}

public void testGetAndGetStats() throws Exception {
createPivotReviewsTransform("pivot_1", "pivot_reviews_1", null);
createPivotReviewsTransform("pivot_2", "pivot_reviews_2", null);

startAndWaitForTransform("pivot_1", "pivot_reviews_1");
startAndWaitForTransform("pivot_2", "pivot_reviews_2");

// check all the different ways to retrieve all stats
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all/_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "*/_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));

// only pivot_1
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "pivot_1/_stats")));
assertEquals(1, XContentMapValues.extractValue("count", stats));

// check all the different ways to retrieve all transforms
Map<String, Object> transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT)));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all")));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "*")));
assertEquals(2, XContentMapValues.extractValue("count", transforms));

// only pivot_1
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "pivot_1")));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.elasticsearch.xpack.dataframe.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

Expand All @@ -17,7 +16,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -255,35 +253,6 @@ public void testPreviewTransform() throws Exception {
});
}

private void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));

// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
}

private void waitForDataFrameGeneration(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
}, 30, TimeUnit.SECONDS);
}

private static int getDataFrameGeneration(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));

Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
}

private void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}

private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -143,6 +144,28 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
assertTrue(indexExists(dataFrameIndex));
}

protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));

// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
}

void waitForDataFrameGeneration(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
}, 30, TimeUnit.SECONDS);
}

void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}

@SuppressWarnings("unchecked")
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
Response response = adminClient().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all"));
Expand Down Expand Up @@ -221,4 +244,11 @@ protected static void wipeIndices() throws IOException {
}
}
}

static int getDataFrameGeneration(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));

Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public RestDeleteDataFrameTransformAction(Settings settings, RestController cont

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
if (restRequest.hasContent()) {
throw new IllegalArgumentException("delete data frame transforms requests can not have a request body");
}

String id = restRequest.param(DataFrameField.ID.getPreferredName());
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class RestGetDataFrameTransformsAction extends BaseRestHandler {

public RestGetDataFrameTransformsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS, this);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class RestGetDataFrameTransformsStatsAction extends BaseRestHandler {

public RestGetDataFrameTransformsStatsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS + "_stats", this);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID + "_stats", this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
private static ConstructingObjectParser<DataFrameTransformConfig, String> createParser(boolean lenient) {
ConstructingObjectParser<DataFrameTransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient,
(args, optionalId) -> {
String id = args[0] != null ? (String) args[0] : optionalId;
String id = (String) args[0];

// if the id has been specified in the body and the path, they must match
if (id == null) {
id = optionalId;
} else if (optionalId != null && id.equals(optionalId) == false) {
throw new IllegalArgumentException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_INCONSISTENT_ID, id, optionalId));
}

String source = (String) args[1];
String dest = (String) args[2];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected boolean supportsUnknownFields() {

@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
return new Request(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.rest.action;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class RestDeleteDataFrameTransformActionTests extends ESTestCase {

public void testBodyRejection() throws Exception {
final RestDeleteDataFrameTransformAction handler = new RestDeleteDataFrameTransformAction(Settings.EMPTY,
mock(RestController.class));
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.field("id", "my_id");
}
builder.endObject();
final FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(builder.toString()), XContentType.JSON)
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> handler.prepareRequest(request, mock(NodeClient.class)));
assertThat(e.getMessage(), equalTo("delete data frame transforms requests can not have a request body"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT

public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(),
PivotConfigTests.randomPivotConfig());
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomDataFrameTransformConfig() {
Expand All @@ -46,6 +45,16 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig() {
PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) {
return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(),
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
if (randomBoolean()) {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
Expand Down Expand Up @@ -74,7 +83,7 @@ protected DataFrameTransformConfig doParseInstance(XContentParser parser) throws

@Override
protected DataFrameTransformConfig createTestInstance() {
return runWithHeaders ? randomDataFrameTransformConfig() : randomDataFrameTransformConfigWithoutHeaders();
return runWithHeaders ? randomDataFrameTransformConfig(transformId) : randomDataFrameTransformConfigWithoutHeaders(transformId);
}

@Override
Expand Down Expand Up @@ -143,6 +152,33 @@ public void testPreventHeaderInjection() throws IOException {
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
}

public void testSetIdInBody() throws IOException {
String pivotTransform = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : \"src\","
+ " \"dest\" : \"dest\","
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";

DataFrameTransformConfig dataFrameTransformConfig = createDataFrameTransformConfigFromString(pivotTransform, "body_id");
assertEquals("body_id", dataFrameTransformConfig.getId());

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "other_id"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also assert that the error message is as expected here. (For example this will detect if the body ID and URL ID are the wrong way around when the error message is formed.)


assertEquals("Inconsistent id; 'body_id' specified in the body differs from 'other_id' specified as a URL argument",
ex.getCause().getMessage());
}

private DataFrameTransformConfig createDataFrameTransformConfigFromString(String json, String id) throws IOException {
final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
Expand Down