Skip to content

Commit

Permalink
LLRC: Make warning behavior pluggable per request (elastic#36345)
Browse files Browse the repository at this point in the history
This allows you to plug the behavior that the LLRC uses to handle
warnings on a per request basis.

We entertained the idea of allowing you to set the warnings behavior to
strict mode on a per request basis but that wouldn't allow the high
level rest client to fail when it sees an unexpected warning.

We also entertained the idea of adding a list of "required warnings" to
the `RequestOptions` but that won't work well with failures that occur
*sometimes* like those we see in mixed clusters.

Adding a list of "allowed warnings" to the `RequestOptions` would work
for mixed clusters but it'd leave many of the assertions in our tests
weaker than we'd like.

This behavior plugging implementation allows us to make a "required
warnings" option when we need it and an "allowed warnings" behavior when
we need it.

I don't think this behavior is going to be commonly used by used outside
of the Elasticsearch build, but I expect they'll be a few commendably
paranoid folks who could use this behavior.
  • Loading branch information
nik9000 authored Dec 10, 2018
1 parent f79e602 commit 9626e70
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,37 @@
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;


import java.util.ArrayList;

/**
* The portion of an HTTP request to Elasticsearch that can be
* manipulated without changing Elasticsearch's behavior.
*/
public final class RequestOptions {
/**
* Default request options.
*/
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT).build();
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();

private final List<Header> headers;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private final WarningsHandler warningsHandler;

private RequestOptions(Builder builder) {
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
this.warningsHandler = builder.warningsHandler;
}

/**
* Create a builder that contains these options but can be modified.
*/
public Builder toBuilder() {
return new Builder(headers, httpAsyncResponseConsumerFactory);
return new Builder(headers, httpAsyncResponseConsumerFactory, warningsHandler);
}

/**
Expand All @@ -68,12 +74,35 @@ public HttpAsyncResponseConsumerFactory getHttpAsyncResponseConsumerFactory() {
return httpAsyncResponseConsumerFactory;
}

/**
* How this request should handle warnings. If null (the default) then
* this request will default to the behavior dictacted by
* {@link RestClientBuilder#setStrictDeprecationMode}.
* <p>
* This can be set to {@link WarningsHandler#PERMISSIVE} if the client
* should ignore all warnings which is the same behavior as setting
* strictDeprecationMode to true. It can be set to
* {@link WarningsHandler#STRICT} if the client should fail if there are
* any warnings which is the same behavior as settings
* strictDeprecationMode to false.
* <p>
* It can also be set to a custom implementation of
* {@linkplain WarningsHandler} to permit only certain warnings or to
* fail the request if the warnings returned don't
* <strong>exactly</strong> match some set.
*/
public WarningsHandler getWarningsHandler() {
return warningsHandler;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append("RequestOptions{");
boolean comma = false;
if (headers.size() > 0) {
b.append(", headers=");
b.append("headers=");
comma = true;
for (int h = 0; h < headers.size(); h++) {
if (h != 0) {
b.append(',');
Expand All @@ -82,7 +111,14 @@ public String toString() {
}
}
if (httpAsyncResponseConsumerFactory != HttpAsyncResponseConsumerFactory.DEFAULT) {
b.append(", consumerFactory=").append(httpAsyncResponseConsumerFactory);
if (comma) b.append(", ");
comma = true;
b.append("consumerFactory=").append(httpAsyncResponseConsumerFactory);
}
if (warningsHandler != null) {
if (comma) b.append(", ");
comma = true;
b.append("warningsHandler=").append(warningsHandler);
}
return b.append('}').toString();
}
Expand All @@ -98,21 +134,30 @@ public boolean equals(Object obj) {

RequestOptions other = (RequestOptions) obj;
return headers.equals(other.headers)
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory);
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory)
&& Objects.equals(warningsHandler, other.warningsHandler);
}

@Override
public int hashCode() {
return Objects.hash(headers, httpAsyncResponseConsumerFactory);
return Objects.hash(headers, httpAsyncResponseConsumerFactory, warningsHandler);
}

/**
* Builds {@link RequestOptions}. Get one by calling
* {@link RequestOptions#toBuilder} on {@link RequestOptions#DEFAULT} or
* any other {@linkplain RequestOptions}.
*/
public static class Builder {
private final List<Header> headers;
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private WarningsHandler warningsHandler;

private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
WarningsHandler warningsHandler) {
this.headers = new ArrayList<>(headers);
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
this.warningsHandler = warningsHandler;
}

/**
Expand Down Expand Up @@ -141,6 +186,27 @@ public void setHttpAsyncResponseConsumerFactory(HttpAsyncResponseConsumerFactory
this.httpAsyncResponseConsumerFactory =
Objects.requireNonNull(httpAsyncResponseConsumerFactory, "httpAsyncResponseConsumerFactory cannot be null");
}

/**
* How this request should handle warnings. If null (the default) then
* this request will default to the behavior dictacted by
* {@link RestClientBuilder#setStrictDeprecationMode}.
* <p>
* This can be set to {@link WarningsHandler#PERMISSIVE} if the client
* should ignore all warnings which is the same behavior as setting
* strictDeprecationMode to true. It can be set to
* {@link WarningsHandler#STRICT} if the client should fail if there are
* any warnings which is the same behavior as settings
* strictDeprecationMode to false.
* <p>
* It can also be set to a custom implementation of
* {@linkplain WarningsHandler} to permit only certain warnings or to
* fail the request if the warnings returned don't
* <strong>exactly</strong> match some set.
*/
public void setWarningsHandler(WarningsHandler warningsHandler) {
this.warningsHandler = warningsHandler;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class RestClient implements Closeable {
private final FailureListener failureListener;
private final NodeSelector nodeSelector;
private volatile NodeTuple<List<Node>> nodeTuple;
private final boolean strictDeprecationMode;
private final WarningsHandler warningsHandler;

RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) {
Expand All @@ -120,7 +120,7 @@ public class RestClient implements Closeable {
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
this.strictDeprecationMode = strictDeprecationMode;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
setNodes(nodes);
}

Expand Down Expand Up @@ -275,11 +275,13 @@ void performRequestAsyncNoCatch(Request request, ResponseListener listener) thro
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
}

private void performRequestAsync(final long startTime, final NodeTuple<Iterator<Node>> nodeTuple, final HttpRequestBase request,
final Set<Integer> ignoreErrorCodes,
final WarningsHandler thisWarningsHandler,
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
final FailureTrackingResponseListener listener) {
final Node node = nodeTuple.nodes.next();
Expand All @@ -298,7 +300,7 @@ public void completed(HttpResponse httpResponse) {
Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
onResponse(node);
if (strictDeprecationMode && response.hasWarnings()) {
if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
listener.onDefinitiveFailure(new ResponseException(response));
} else {
listener.onSuccess(response);
Expand Down Expand Up @@ -343,7 +345,8 @@ private void retryIfPossible(Exception exception) {
} else {
listener.trackFailure(exception);
request.reset();
performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes, httpAsyncResponseConsumerFactory, listener);
performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes,
thisWarningsHandler, httpAsyncResponseConsumerFactory, listener);
}
} else {
listener.onDefinitiveFailure(exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import java.util.List;

/**
* Called if there are warnings to determine if those warnings should fail the
* request.
*/
public interface WarningsHandler {
boolean warningsShouldFailRequest(List<String> warnings);

WarningsHandler PERMISSIVE = new WarningsHandler() {
@Override
public boolean warningsShouldFailRequest(List<String> warnings) {
return false;
}

@Override
public String toString() {
return "permissive";
}
};
WarningsHandler STRICT = new WarningsHandler() {
@Override
public boolean warningsShouldFailRequest(List<String> warnings) {
return false == warnings.isEmpty();
}

@Override
public String toString() {
return "strict";
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ static RequestOptions.Builder randomBuilder() {
builder.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(1));
}

if (randomBoolean()) {
builder.setWarningsHandler(randomBoolean() ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE);
}

return builder;
}

Expand All @@ -127,14 +131,23 @@ private static RequestOptions copy(RequestOptions options) {

private static RequestOptions mutate(RequestOptions options) {
RequestOptions.Builder mutant = options.toBuilder();
int mutationType = between(0, 1);
int mutationType = between(0, 2);
switch (mutationType) {
case 0:
mutant.addHeader("extra", "m");
return mutant.build();
case 1:
mutant.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(5));
return mutant.build();
case 2:
mutant.setWarningsHandler(new WarningsHandler() {
@Override
public boolean warningsShouldFailRequest(List<String> warnings) {
fail("never called");
return false;
}
});
return mutant.build();
default:
throw new UnsupportedOperationException("Unknown mutation type [" + mutationType + "]");
}
Expand Down
Loading

0 comments on commit 9626e70

Please sign in to comment.