Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StyxHostHttpClient #338

Merged
merged 4 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import static com.hotels.styx.client.OriginsInventory.OriginState.ACTIVE;
import static com.hotels.styx.client.OriginsInventory.OriginState.DISABLED;
import static com.hotels.styx.client.OriginsInventory.OriginState.INACTIVE;
import static com.hotels.styx.client.StyxHeaderConfig.ORIGIN_ID_DEFAULT;
import static com.hotels.styx.client.connectionpool.ConnectionPools.simplePoolFactory;
import static com.hotels.styx.common.StyxFutures.await;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -542,7 +541,7 @@ public OriginsInventory build() {
await(originHealthMonitor.start());

if (hostClientFactory == null) {
hostClientFactory = (ConnectionPool connectionPool) -> StyxHostHttpClient.create(appId, connectionPool.getOrigin().id(), ORIGIN_ID_DEFAULT, connectionPool);
hostClientFactory = (ConnectionPool connectionPool) -> StyxHostHttpClient.create(connectionPool);
}

OriginsInventory originsInventory = new OriginsInventory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING;
import static com.hotels.styx.api.extension.service.StickySessionConfig.stickySessionDisabled;
import static com.hotels.styx.client.StyxHeaderConfig.ORIGIN_ID_DEFAULT;
import static com.hotels.styx.client.stickysession.StickySessionCookie.newStickySessionCookie;
import static io.netty.handler.codec.http.HttpMethod.HEAD;
import static java.util.Collections.emptyList;
Expand All @@ -74,6 +75,7 @@ public final class StyxBackendServiceClient implements BackendServiceClient {
private final boolean contentValidation;
private final String originsRestrictionCookieName;
private final StickySessionConfig stickySessionConfig;
private final CharSequence originIdHeader;

private StyxBackendServiceClient(Builder builder) {
this.id = requireNonNull(builder.backendServiceId);
Expand All @@ -93,6 +95,7 @@ private StyxBackendServiceClient(Builder builder) {
this.metricsRegistry = builder.metricsRegistry;
this.contentValidation = builder.contentValidation;
this.originsRestrictionCookieName = builder.originsRestrictionCookieName;
this.originIdHeader = builder.originIdHeader;
}

@Override
Expand Down Expand Up @@ -146,8 +149,8 @@ private Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<R
newPreviousOrigins.add(remoteHost.get());

return ResponseEventListener.from(
toObservable(host.hostClient().handle(request, HttpInterceptorContext.create()))
.map(response -> addStickySessionIdentifier(response, host.origin())))
toObservable(host.hostClient().handle(request, HttpInterceptorContext.create()))
.map(response -> addStickySessionIdentifier(response, host.origin())))
.whenResponseError(cause -> logError(request, cause))
.whenCancelled(() -> originStatsFactory.originStats(host.origin()).requestCancelled())
.apply()
Expand All @@ -157,13 +160,20 @@ private Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<R
.onErrorResumeNext(cause -> {
RetryPolicyContext retryContext = new RetryPolicyContext(this.id, attempt + 1, cause, request, previousOrigins);
return retry(request, retryContext, newPreviousOrigins, attempt + 1, cause);
});
})
.map(response -> addOriginId(host.id(), response));
} else {
RetryPolicyContext retryContext = new RetryPolicyContext(this.id, attempt + 1, null, request, previousOrigins);
return retry(request, retryContext, previousOrigins, attempt + 1, new NoAvailableHostsException(this.id));
}
}

private LiveHttpResponse addOriginId(Id originId, LiveHttpResponse response) {
return response.newBuilder()
.header(originIdHeader, originId)
.build();
}

Observable<LiveHttpResponse> retry(LiveHttpRequest request, RetryPolicyContext retryContext, List<RemoteHost> previousOrigins, int attempt, Throwable cause) {
LoadBalancer.Preferences lbContext = new LoadBalancer.Preferences() {
@Override
Expand Down Expand Up @@ -274,8 +284,6 @@ private void recordErrorStatusMetrics(LiveHttpResponse response) {
}

private Optional<RemoteHost> selectOrigin(LiveHttpRequest rewrittenRequest) {


LoadBalancer.Preferences preferences = new LoadBalancer.Preferences() {
@Override
public Optional<String> preferredOrigins() {
Expand Down Expand Up @@ -337,6 +345,7 @@ public static class Builder {
private OriginStatsFactory originStatsFactory;
private String originsRestrictionCookieName;
private StickySessionConfig stickySessionConfig = stickySessionDisabled();
private CharSequence originIdHeader = ORIGIN_ID_DEFAULT;

public Builder(Id backendServiceId) {
this.backendServiceId = requireNonNull(backendServiceId);
Expand Down Expand Up @@ -383,12 +392,16 @@ public Builder originsRestrictionCookieName(String originsRestrictionCookieName)
return this;
}

public Builder originIdHeader(CharSequence originIdHeader) {
this.originIdHeader = requireNonNull(originIdHeader);
return this;
}

public StyxBackendServiceClient build() {
if (originStatsFactory == null) {
originStatsFactory = new OriginStatsFactory(metricsRegistry);
}
return new StyxBackendServiceClient(this);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,33 @@

import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.Id;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancingMetric;
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancingMetricSupplier;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import rx.Observable;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* A Styx HTTP Client for proxying to an individual origin host.
*/
public class StyxHostHttpClient implements BackendServiceClient, LoadBalancingMetricSupplier {
private final Transport transport;
private final Id originId;
private final ConnectionPool pool;

public StyxHostHttpClient(Id originId, ConnectionPool pool, Transport transport) {
this.originId = requireNonNull(originId);
this.pool = requireNonNull(pool);
StyxHostHttpClient(ConnectionPool pool, Transport transport) {
this.transport = requireNonNull(transport);
this.pool = requireNonNull(pool);
}

public static StyxHostHttpClient create(Id appId, Id originId, CharSequence headerName, ConnectionPool pool) {
return new StyxHostHttpClient(originId, pool, new Transport(appId, headerName));
public static StyxHostHttpClient create(ConnectionPool pool) {
return new StyxHostHttpClient(pool, new Transport());
}

@Override
public Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
return transport
.send(request, Optional.of(pool), originId)
.send(request, pool)
.response();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,41 @@
*/
package com.hotels.styx.client;

import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.ResponseEventListener;
import com.hotels.styx.api.exceptions.NoAvailableHostsException;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import rx.Observable;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* Encapsulates a single connection to remote server which we can use to send the messages.
*/
class Transport {
private final Id appId;
private final CharSequence originIdHeaderName;

public Transport(Id appId, CharSequence originIdHeaderName) {
this.appId = requireNonNull(appId);
this.originIdHeaderName = requireNonNull(originIdHeaderName);
}

public HttpTransaction send(LiveHttpRequest request, Optional<ConnectionPool> origin, Id originId) {
Observable<Connection> connection = connection(request, origin);
public HttpTransaction send(LiveHttpRequest request, ConnectionPool connectionPool) {
Observable<Connection> connection = connectionPool.borrowConnection();

return new HttpTransaction() {
@Override
public Observable<LiveHttpResponse> response() {
return connection.flatMap(connection -> {
Observable<LiveHttpResponse> responseObservable = connection.write(request)
.map(response -> addOriginId(originId, response));
Observable<LiveHttpResponse> responseObservable = connection.write(request);

return ResponseEventListener.from(responseObservable)
.whenCancelled(() -> closeIfConnected(origin, connection))
.whenResponseError(cause -> closeIfConnected(origin, connection))
.whenContentError(cause -> closeIfConnected(origin, connection))
.whenCompleted(() -> returnIfConnected(origin, connection))
.whenCancelled(() -> closeIfConnected(connectionPool, connection))
.whenResponseError(cause -> closeIfConnected(connectionPool, connection))
.whenContentError(cause -> closeIfConnected(connectionPool, connection))
.whenCompleted(() -> returnIfConnected(connectionPool, connection))
.apply();
});
}

private synchronized void closeIfConnected(Optional<ConnectionPool> connectionPool, Connection connection) {
if (connection != null && connectionPool.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we confirmed that the connections are never null? It would make sense, but thought it worth checking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A null value wouldn't be emitted under any circumstances (that would be a bug elsewhere in styx).

Note that the Reactive Streams specification specifically forbids null values as events (Rule 2.13 https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md). Although we are still in RxJava 1.0, our intention is to fully migrate to reactive-streams compatible implementation (Flux), ensuring it cannot be null.

connectionPool.get().closeConnection(connection);
}
private synchronized void closeIfConnected(ConnectionPool connectionPool, Connection connection) {
connectionPool.closeConnection(connection);
}

private synchronized void returnIfConnected(Optional<ConnectionPool> connectionPool, Connection connection) {
if (connection != null && connectionPool.isPresent()) {
connectionPool.get().returnConnection(connection);
}
private synchronized void returnIfConnected(ConnectionPool connectionPool, Connection connection) {
Copy link
Contributor

@dvlato dvlato Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we ever need to synchronize on something different than the current object? In that case, do we actually need to set 'closeIfConnected' and/or 'returnIfConnected' as synchronized methods or should we try to ensure they are idempotent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey David. There is no longer need to synchronise on anything. I realised myself the synchronized modifiers were left in the methods. They should have been removed already. Perhaps you have commented an outdated version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it, perhaps I should remove these methods altogether.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sure, I see your commit now right before my comment. You can remove them but I don't see too much of a difference between both options to be honest... I don't dislike the 'extra indirection' myself.

connectionPool.returnConnection(connection);
}
};
}

private Observable<Connection> connection(LiveHttpRequest request, Optional<ConnectionPool> origin) {
return origin
.map(ConnectionPool::borrowConnection)
.orElseGet(() -> {
// Aggregates an empty body:
request.consume();
return Observable.error(new NoAvailableHostsException(appId));
});
}

private LiveHttpResponse addOriginId(Id originId, LiveHttpResponse response) {
return response.newBuilder()
.header(originIdHeaderName, originId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package com.hotels.styx.client;

import com.hotels.styx.api.HttpRequest;
import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.hotels.styx.api.LiveHttpRequest;

import java.util.Optional;
import rx.Observable;

import static com.hotels.styx.api.LiveHttpResponse.response;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
Expand All @@ -43,21 +42,24 @@ public void setUp() {
public void sendsRequestUsingTransport() {
ConnectionPool pool = mock(ConnectionPool.class);
Transport transport = mock(Transport.class);
when(transport.send(any(LiveHttpRequest.class), any(Optional.class), any(Id.class))).thenReturn(mock(HttpTransaction.class));
HttpTransaction transaction = mock(HttpTransaction.class);

when(transport.send(any(LiveHttpRequest.class), any(ConnectionPool.class))).thenReturn(transaction);
when(transaction.response()).thenReturn(Observable.just(response().build()));

new StyxHostHttpClient(Id.id("app-01"), pool, transport)
new StyxHostHttpClient(pool, transport)
.sendRequest(request);

verify(transport).send(eq(request), eq(Optional.of(pool)), eq(Id.id("app-01")));
verify(transport).send(eq(request), eq(pool));
}

@Test
public void closesTheConnectionPool() {
ConnectionPool pool = mock(ConnectionPool.class);
Transport transport = mock(Transport.class);
when(transport.send(any(LiveHttpRequest.class), any(Optional.class), any(Id.class))).thenReturn(mock(HttpTransaction.class));
when(transport.send(any(LiveHttpRequest.class), any(ConnectionPool.class))).thenReturn(mock(HttpTransaction.class));

StyxHostHttpClient hostClient = new StyxHostHttpClient(Id.id("app-01"), pool, transport);
StyxHostHttpClient hostClient = new StyxHostHttpClient(pool, transport);

hostClient.close();

Expand Down
Loading