Skip to content

Commit

Permalink
Fix infinite retries under particular conditions (#2041) (#2049)
Browse files Browse the repository at this point in the history
Motivation:

When the new retrying http filter is applied under some particular conditions, the retry logic yields infinite attempts.

Modifications:

- When the filter is appended conditionally, capture the instance to lazy inject LB/SD state.
- Change the default max-total-retries to guard better LB/SD error flow.
- Additional input validation for durations.

Result:

Safe retry behavior.
  • Loading branch information
tkountis authored Jan 11, 2022
1 parent eb72c09 commit 6e3667f
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;

import javax.annotation.Nullable;

/**
* API introduced to help transition 0.41 core to 0.42.
* @deprecated DO NOT USE.
*/
@FunctionalInterface
@Deprecated
public interface ContextAwareStreamingHttpClientFilterFactory
extends StreamingHttpClientFilterFactory {
StreamingHttpClientFilter create(FilterableStreamingHttpClient client,
@Nullable Publisher<Object> lbEventStream,
@Nullable Completable sdStatus);

@Override
default StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
return create(client, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;

import java.util.function.Predicate;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

Expand All @@ -37,7 +41,9 @@ public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strat
}

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client,
@Nullable final Publisher<Object> lbEventStream,
@Nullable final Completable sdStatus) {
return original.create(address, client);
}
};
Expand Down Expand Up @@ -108,7 +114,7 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect
return connection -> new ConditionalHttpConnectionFilter(predicate, original.create(connection), connection);
}

static StreamingHttpClientFilterFactory toConditionalClientFilterFactory(
static ContextAwareStreamingHttpClientFilterFactory toConditionalClientFilterFactory(
final Predicate<StreamingHttpRequest> predicate, final StreamingHttpClientFilterFactory original) {
requireNonNull(predicate);
requireNonNull(original);
Expand All @@ -122,12 +128,26 @@ public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strat
}

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client,
@Nullable final Publisher<Object> lbEventStream,
@Nullable final Completable sdStatus) {
if (original instanceof ContextAwareStreamingHttpClientFilterFactory) {
return new ConditionalHttpClientFilter(predicate,
((ContextAwareStreamingHttpClientFilterFactory) original).create(client,
lbEventStream, sdStatus), client);
}
return new ConditionalHttpClientFilter(predicate, original.create(client), client);
}
};
}
return client -> new ConditionalHttpClientFilter(predicate, original.create(client), client);

if (original instanceof ContextAwareStreamingHttpClientFilterFactory) {
return (client, lbEventStream, sdStatus) -> new ConditionalHttpClientFilter(predicate,
((ContextAwareStreamingHttpClientFilterFactory) original).create(client,
lbEventStream, sdStatus), client);
}

return (client, __, ___) -> new ConditionalHttpClientFilter(predicate, original.create(client), client);
}

static <U> MultiAddressHttpClientFilterFactory<U> toMultiAddressConditionalFilterFactory(
Expand Down Expand Up @@ -164,7 +184,7 @@ interface StrategyInfluencingStreamingConnectionFilterFactory
}

interface StrategyInfluencingStreamingClientFilterFactory
extends StreamingHttpClientFilterFactory, HttpExecutionStrategyInfluencer {
extends ContextAwareStreamingHttpClientFilterFactory, HttpExecutionStrategyInfluencer {
}

interface StrategyInfluencingMultiAddressHttpClientFilterFactory<U>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.http.api.ContextAwareStreamingHttpClientFilterFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
Expand All @@ -47,10 +48,11 @@
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.SingleAddressHttpClientSecurityConfigurator;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.ContextAwareRetryingHttpClientFilter;
import io.servicetalk.logging.api.LogLevel;
import io.servicetalk.transport.api.ClientSslConfig;
import io.servicetalk.transport.api.HostAndPort;
Expand All @@ -65,6 +67,7 @@
import java.util.Collection;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.netty.util.NetUtil.toSocketAddressString;
Expand Down Expand Up @@ -404,21 +407,30 @@ private static ContextAwareStreamingHttpClientFilterFactory appendFilter0(
if (appendClientFilterFactory instanceof RetryingHttpRequesterFilter) {
if (currClientFilterFactory == null) {
return (client, lbEventStream, sdStatus) -> {
final RetryingHttpRequesterFilter.ContextAwareRetryingHttpClientFilter filter =
(RetryingHttpRequesterFilter.ContextAwareRetryingHttpClientFilter)
final ContextAwareRetryingHttpClientFilter filter =
(ContextAwareRetryingHttpClientFilter)
appendClientFilterFactory.create(client);
filter.inject(lbEventStream, sdStatus);
return filter;
};
} else {
return (client, lbEventStream, sdStatus) -> {
final RetryingHttpRequesterFilter.ContextAwareRetryingHttpClientFilter filter =
(RetryingHttpRequesterFilter.ContextAwareRetryingHttpClientFilter)
final ContextAwareRetryingHttpClientFilter filter =
(ContextAwareRetryingHttpClientFilter)
appendClientFilterFactory.create(client);
filter.inject(lbEventStream, sdStatus);
return currClientFilterFactory.create(filter, lbEventStream, sdStatus);
};
}
} else if (appendClientFilterFactory instanceof ContextAwareStreamingHttpClientFilterFactory) {
if (currClientFilterFactory == null) {
return ((ContextAwareStreamingHttpClientFilterFactory) appendClientFilterFactory);
} else {
return (client, lbEventStream, sdError) ->
currClientFilterFactory.create(
((ContextAwareStreamingHttpClientFilterFactory) appendClientFilterFactory)
.create(client, lbEventStream, sdError), lbEventStream, sdError);
}
} else {
if (currClientFilterFactory == null) {
return (client, lbEventStream, sdError) -> appendClientFilterFactory.create(client);
Expand Down Expand Up @@ -578,6 +590,31 @@ public DefaultSingleAddressHttpClientBuilder<U, R> unresolvedAddressToHost(
return this;
}

public DefaultSingleAddressHttpClientBuilder<U, R> appendClientFilter(Predicate<StreamingHttpRequest> predicate,
StreamingHttpClientFilterFactory factory) {
if (factory instanceof RetryingHttpRequesterFilter) {
if (retryingHttpRequesterFilter != null) {
throw new IllegalStateException("Retrying HTTP requester filter was already found in " +
"the filter chain, only a single instance of that is allowed.");
}
if (customAutoRetry()) {
throw new IllegalStateException("Custom AutoRetryStrategyProvider was already configured and its " +
"functionality intersects with io.servicetalk.http.netty.RetryingHttpRequesterFilter, " +
"only a single instance of these is allowed.");
}
retryingHttpRequesterFilter = (RetryingHttpRequesterFilter) factory;
return (DefaultSingleAddressHttpClientBuilder<U, R>) super.appendClientFilter(predicate,
(ContextAwareStreamingHttpClientFilterFactory) (client, lbEventStream, sdStatus) -> {
ContextAwareRetryingHttpClientFilter filter =
(ContextAwareRetryingHttpClientFilter) factory.create(client);
filter.inject(lbEventStream, sdStatus);
return filter;
});
}

return (DefaultSingleAddressHttpClientBuilder<U, R>) super.appendClientFilter(predicate, factory);
}

@Override
public DefaultSingleAddressHttpClientBuilder<U, R> appendClientFilter(
final StreamingHttpClientFilterFactory factory) {
Expand Down Expand Up @@ -920,16 +957,4 @@ private static HttpHeadersFactory headersFactory(@Nullable HttpHeadersFactory fa
return factory;
}
}

@FunctionalInterface
interface ContextAwareStreamingHttpClientFilterFactory extends StreamingHttpClientFilterFactory {
StreamingHttpClientFilter create(FilterableStreamingHttpClient client,
@Nullable Publisher<Object> lbEventStream,
@Nullable Completable sdStatus);

@Override
default StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
return create(client, null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.forUnknownHostAndPort;
import static io.servicetalk.transport.netty.internal.BuilderUtils.toResolvedInetSocketAddress;
import static java.util.function.Function.identity;

/**
Expand Down
Loading

0 comments on commit 6e3667f

Please sign in to comment.