Skip to content

Commit

Permalink
[ML-DataFrame] Dataframe REST cleanups (#39451)
Browse files Browse the repository at this point in the history
fix a couple of odd behaviors of data frame transforms REST API's:

 -  check if id from body and id from URL match if both are specified
 -  do not allow a body for delete
 -  allow get and stats without specifying an id
  • Loading branch information
Hendrik Muhs authored Feb 28, 2019
1 parent 6b5efb8 commit f24fba7
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public final class DataFrameField {
// common strings
public static final String TASK_NAME = "data_frame/transforms";
public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH + "transforms/{id}/";
public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";

// note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class DataFrameMessages {
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
"Failed to start persistent task, configuration has been cleaned up: [{0}]";
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";

public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

import java.io.IOException;
import java.util.Map;

public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {

private static boolean indicesCreated = false;

// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}

@Before
public void createIndexes() throws IOException {

// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}

createReviewsIndex();
indicesCreated = true;
}

public void testGetAndGetStats() throws Exception {
createPivotReviewsTransform("pivot_1", "pivot_reviews_1", null);
createPivotReviewsTransform("pivot_2", "pivot_reviews_2", null);

startAndWaitForTransform("pivot_1", "pivot_reviews_1");
startAndWaitForTransform("pivot_2", "pivot_reviews_2");

// check all the different ways to retrieve all stats
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all/_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "*/_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));

// only pivot_1
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "pivot_1/_stats")));
assertEquals(1, XContentMapValues.extractValue("count", stats));

// check all the different ways to retrieve all transforms
Map<String, Object> transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT)));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all")));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "*")));
assertEquals(2, XContentMapValues.extractValue("count", transforms));

// only pivot_1
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "pivot_1")));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.elasticsearch.xpack.dataframe.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

Expand All @@ -17,7 +16,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -255,35 +253,6 @@ public void testPreviewTransform() throws Exception {
});
}

private void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));

// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
}

private void waitForDataFrameGeneration(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
}, 30, TimeUnit.SECONDS);
}

private static int getDataFrameGeneration(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));

Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
}

private void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}

private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -143,6 +144,28 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
assertTrue(indexExists(dataFrameIndex));
}

protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));

// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
}

void waitForDataFrameGeneration(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
}, 30, TimeUnit.SECONDS);
}

void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}

@SuppressWarnings("unchecked")
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
Response response = adminClient().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all"));
Expand Down Expand Up @@ -221,4 +244,11 @@ protected static void wipeIndices() throws IOException {
}
}
}

static int getDataFrameGeneration(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));

Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public RestDeleteDataFrameTransformAction(Settings settings, RestController cont

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
if (restRequest.hasContent()) {
throw new IllegalArgumentException("delete data frame transforms requests can not have a request body");
}

String id = restRequest.param(DataFrameField.ID.getPreferredName());
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class RestGetDataFrameTransformsAction extends BaseRestHandler {

public RestGetDataFrameTransformsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS, this);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class RestGetDataFrameTransformsStatsAction extends BaseRestHandler {

public RestGetDataFrameTransformsStatsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS + "_stats", this);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID + "_stats", this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
private static ConstructingObjectParser<DataFrameTransformConfig, String> createParser(boolean lenient) {
ConstructingObjectParser<DataFrameTransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient,
(args, optionalId) -> {
String id = args[0] != null ? (String) args[0] : optionalId;
String id = (String) args[0];

// if the id has been specified in the body and the path, they must match
if (id == null) {
id = optionalId;
} else if (optionalId != null && id.equals(optionalId) == false) {
throw new IllegalArgumentException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_INCONSISTENT_ID, id, optionalId));
}

String source = (String) args[1];
String dest = (String) args[2];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected boolean supportsUnknownFields() {

@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
return new Request(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.rest.action;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class RestDeleteDataFrameTransformActionTests extends ESTestCase {

public void testBodyRejection() throws Exception {
final RestDeleteDataFrameTransformAction handler = new RestDeleteDataFrameTransformAction(Settings.EMPTY,
mock(RestController.class));
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.field("id", "my_id");
}
builder.endObject();
final FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(builder.toString()), XContentType.JSON)
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> handler.prepareRequest(request, mock(NodeClient.class)));
assertThat(e.getMessage(), equalTo("delete data frame transforms requests can not have a request body"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT

public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(),
PivotConfigTests.randomPivotConfig());
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomDataFrameTransformConfig() {
Expand All @@ -46,6 +45,16 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig() {
PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) {
return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(),
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}

public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
if (randomBoolean()) {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
Expand Down Expand Up @@ -74,7 +83,7 @@ protected DataFrameTransformConfig doParseInstance(XContentParser parser) throws

@Override
protected DataFrameTransformConfig createTestInstance() {
return runWithHeaders ? randomDataFrameTransformConfig() : randomDataFrameTransformConfigWithoutHeaders();
return runWithHeaders ? randomDataFrameTransformConfig(transformId) : randomDataFrameTransformConfigWithoutHeaders(transformId);
}

@Override
Expand Down Expand Up @@ -143,6 +152,33 @@ public void testPreventHeaderInjection() throws IOException {
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
}

public void testSetIdInBody() throws IOException {
String pivotTransform = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : \"src\","
+ " \"dest\" : \"dest\","
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";

DataFrameTransformConfig dataFrameTransformConfig = createDataFrameTransformConfigFromString(pivotTransform, "body_id");
assertEquals("body_id", dataFrameTransformConfig.getId());

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "other_id"));

assertEquals("Inconsistent id; 'body_id' specified in the body differs from 'other_id' specified as a URL argument",
ex.getCause().getMessage());
}

private DataFrameTransformConfig createDataFrameTransformConfigFromString(String json, String id) throws IOException {
final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
Expand Down

0 comments on commit f24fba7

Please sign in to comment.