Skip to content

Commit

Permalink
Refactor StyxHostHttpClient (#338)
Browse files Browse the repository at this point in the history
* Move functionality from StyxHostHttpClient to StyxBackendServiceClient:
- Origin ID header attachment
- Application name in the exceptions

* Remove unnecessary Optional<ConnectionPool> from Transport.

* Remove unnecessary NoAvailableHostsException mapping from StyxBackendServiceClient.
  • Loading branch information
mikkokar authored Nov 12, 2018
1 parent eee66d9 commit b744a77
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import static com.hotels.styx.client.OriginsInventory.OriginState.ACTIVE;
import static com.hotels.styx.client.OriginsInventory.OriginState.DISABLED;
import static com.hotels.styx.client.OriginsInventory.OriginState.INACTIVE;
import static com.hotels.styx.client.StyxHeaderConfig.ORIGIN_ID_DEFAULT;
import static com.hotels.styx.client.connectionpool.ConnectionPools.simplePoolFactory;
import static com.hotels.styx.common.StyxFutures.await;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -542,7 +541,7 @@ public OriginsInventory build() {
await(originHealthMonitor.start());

if (hostClientFactory == null) {
hostClientFactory = (ConnectionPool connectionPool) -> StyxHostHttpClient.create(appId, connectionPool.getOrigin().id(), ORIGIN_ID_DEFAULT, connectionPool);
hostClientFactory = (ConnectionPool connectionPool) -> StyxHostHttpClient.create(connectionPool);
}

OriginsInventory originsInventory = new OriginsInventory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING;
import static com.hotels.styx.api.extension.service.StickySessionConfig.stickySessionDisabled;
import static com.hotels.styx.client.StyxHeaderConfig.ORIGIN_ID_DEFAULT;
import static com.hotels.styx.client.stickysession.StickySessionCookie.newStickySessionCookie;
import static io.netty.handler.codec.http.HttpMethod.HEAD;
import static java.util.Collections.emptyList;
Expand All @@ -74,6 +75,7 @@ public final class StyxBackendServiceClient implements BackendServiceClient {
private final boolean contentValidation;
private final String originsRestrictionCookieName;
private final StickySessionConfig stickySessionConfig;
private final CharSequence originIdHeader;

private StyxBackendServiceClient(Builder builder) {
this.id = requireNonNull(builder.backendServiceId);
Expand All @@ -93,6 +95,7 @@ private StyxBackendServiceClient(Builder builder) {
this.metricsRegistry = builder.metricsRegistry;
this.contentValidation = builder.contentValidation;
this.originsRestrictionCookieName = builder.originsRestrictionCookieName;
this.originIdHeader = builder.originIdHeader;
}

@Override
Expand Down Expand Up @@ -146,8 +149,8 @@ private Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<R
newPreviousOrigins.add(remoteHost.get());

return ResponseEventListener.from(
toObservable(host.hostClient().handle(request, HttpInterceptorContext.create()))
.map(response -> addStickySessionIdentifier(response, host.origin())))
toObservable(host.hostClient().handle(request, HttpInterceptorContext.create()))
.map(response -> addStickySessionIdentifier(response, host.origin())))
.whenResponseError(cause -> logError(request, cause))
.whenCancelled(() -> originStatsFactory.originStats(host.origin()).requestCancelled())
.apply()
Expand All @@ -157,13 +160,20 @@ private Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<R
.onErrorResumeNext(cause -> {
RetryPolicyContext retryContext = new RetryPolicyContext(this.id, attempt + 1, cause, request, previousOrigins);
return retry(request, retryContext, newPreviousOrigins, attempt + 1, cause);
});
})
.map(response -> addOriginId(host.id(), response));
} else {
RetryPolicyContext retryContext = new RetryPolicyContext(this.id, attempt + 1, null, request, previousOrigins);
return retry(request, retryContext, previousOrigins, attempt + 1, new NoAvailableHostsException(this.id));
}
}

private LiveHttpResponse addOriginId(Id originId, LiveHttpResponse response) {
return response.newBuilder()
.header(originIdHeader, originId)
.build();
}

Observable<LiveHttpResponse> retry(LiveHttpRequest request, RetryPolicyContext retryContext, List<RemoteHost> previousOrigins, int attempt, Throwable cause) {
LoadBalancer.Preferences lbContext = new LoadBalancer.Preferences() {
@Override
Expand Down Expand Up @@ -274,8 +284,6 @@ private void recordErrorStatusMetrics(LiveHttpResponse response) {
}

private Optional<RemoteHost> selectOrigin(LiveHttpRequest rewrittenRequest) {


LoadBalancer.Preferences preferences = new LoadBalancer.Preferences() {
@Override
public Optional<String> preferredOrigins() {
Expand Down Expand Up @@ -337,6 +345,7 @@ public static class Builder {
private OriginStatsFactory originStatsFactory;
private String originsRestrictionCookieName;
private StickySessionConfig stickySessionConfig = stickySessionDisabled();
private CharSequence originIdHeader = ORIGIN_ID_DEFAULT;

public Builder(Id backendServiceId) {
this.backendServiceId = requireNonNull(backendServiceId);
Expand Down Expand Up @@ -383,12 +392,16 @@ public Builder originsRestrictionCookieName(String originsRestrictionCookieName)
return this;
}

public Builder originIdHeader(CharSequence originIdHeader) {
this.originIdHeader = requireNonNull(originIdHeader);
return this;
}

public StyxBackendServiceClient build() {
if (originStatsFactory == null) {
originStatsFactory = new OriginStatsFactory(metricsRegistry);
}
return new StyxBackendServiceClient(this);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,33 @@

import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.Id;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancingMetric;
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancingMetricSupplier;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import rx.Observable;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* A Styx HTTP Client for proxying to an individual origin host.
*/
public class StyxHostHttpClient implements BackendServiceClient, LoadBalancingMetricSupplier {
private final Transport transport;
private final Id originId;
private final ConnectionPool pool;

public StyxHostHttpClient(Id originId, ConnectionPool pool, Transport transport) {
this.originId = requireNonNull(originId);
this.pool = requireNonNull(pool);
StyxHostHttpClient(ConnectionPool pool, Transport transport) {
this.transport = requireNonNull(transport);
this.pool = requireNonNull(pool);
}

public static StyxHostHttpClient create(Id appId, Id originId, CharSequence headerName, ConnectionPool pool) {
return new StyxHostHttpClient(originId, pool, new Transport(appId, headerName));
public static StyxHostHttpClient create(ConnectionPool pool) {
return new StyxHostHttpClient(pool, new Transport());
}

@Override
public Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
return transport
.send(request, Optional.of(pool), originId)
.send(request, pool)
.response();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,41 @@
*/
package com.hotels.styx.client;

import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.ResponseEventListener;
import com.hotels.styx.api.exceptions.NoAvailableHostsException;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import rx.Observable;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* Encapsulates a single connection to remote server which we can use to send the messages.
*/
class Transport {
private final Id appId;
private final CharSequence originIdHeaderName;

public Transport(Id appId, CharSequence originIdHeaderName) {
this.appId = requireNonNull(appId);
this.originIdHeaderName = requireNonNull(originIdHeaderName);
}

public HttpTransaction send(LiveHttpRequest request, Optional<ConnectionPool> origin, Id originId) {
Observable<Connection> connection = connection(request, origin);
public HttpTransaction send(LiveHttpRequest request, ConnectionPool connectionPool) {
Observable<Connection> connection = connectionPool.borrowConnection();

return new HttpTransaction() {
@Override
public Observable<LiveHttpResponse> response() {
return connection.flatMap(connection -> {
Observable<LiveHttpResponse> responseObservable = connection.write(request)
.map(response -> addOriginId(originId, response));
Observable<LiveHttpResponse> responseObservable = connection.write(request);

return ResponseEventListener.from(responseObservable)
.whenCancelled(() -> closeIfConnected(origin, connection))
.whenResponseError(cause -> closeIfConnected(origin, connection))
.whenContentError(cause -> closeIfConnected(origin, connection))
.whenCompleted(() -> returnIfConnected(origin, connection))
.whenCancelled(() -> closeIfConnected(connectionPool, connection))
.whenResponseError(cause -> closeIfConnected(connectionPool, connection))
.whenContentError(cause -> closeIfConnected(connectionPool, connection))
.whenCompleted(() -> returnIfConnected(connectionPool, connection))
.apply();
});
}

private synchronized void closeIfConnected(Optional<ConnectionPool> connectionPool, Connection connection) {
if (connection != null && connectionPool.isPresent()) {
connectionPool.get().closeConnection(connection);
}
private void closeIfConnected(ConnectionPool connectionPool, Connection connection) {
connectionPool.closeConnection(connection);
}

private synchronized void returnIfConnected(Optional<ConnectionPool> connectionPool, Connection connection) {
if (connection != null && connectionPool.isPresent()) {
connectionPool.get().returnConnection(connection);
}
private void returnIfConnected(ConnectionPool connectionPool, Connection connection) {
connectionPool.returnConnection(connection);
}
};
}

private Observable<Connection> connection(LiveHttpRequest request, Optional<ConnectionPool> origin) {
return origin
.map(ConnectionPool::borrowConnection)
.orElseGet(() -> {
// Aggregates an empty body:
request.consume();
return Observable.error(new NoAvailableHostsException(appId));
});
}

private LiveHttpResponse addOriginId(Id originId, LiveHttpResponse response) {
return response.newBuilder()
.header(originIdHeaderName, originId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package com.hotels.styx.client;

import com.hotels.styx.api.HttpRequest;
import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.hotels.styx.api.LiveHttpRequest;

import java.util.Optional;
import rx.Observable;

import static com.hotels.styx.api.LiveHttpResponse.response;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
Expand All @@ -43,21 +42,24 @@ public void setUp() {
public void sendsRequestUsingTransport() {
ConnectionPool pool = mock(ConnectionPool.class);
Transport transport = mock(Transport.class);
when(transport.send(any(LiveHttpRequest.class), any(Optional.class), any(Id.class))).thenReturn(mock(HttpTransaction.class));
HttpTransaction transaction = mock(HttpTransaction.class);

when(transport.send(any(LiveHttpRequest.class), any(ConnectionPool.class))).thenReturn(transaction);
when(transaction.response()).thenReturn(Observable.just(response().build()));

new StyxHostHttpClient(Id.id("app-01"), pool, transport)
new StyxHostHttpClient(pool, transport)
.sendRequest(request);

verify(transport).send(eq(request), eq(Optional.of(pool)), eq(Id.id("app-01")));
verify(transport).send(eq(request), eq(pool));
}

@Test
public void closesTheConnectionPool() {
ConnectionPool pool = mock(ConnectionPool.class);
Transport transport = mock(Transport.class);
when(transport.send(any(LiveHttpRequest.class), any(Optional.class), any(Id.class))).thenReturn(mock(HttpTransaction.class));
when(transport.send(any(LiveHttpRequest.class), any(ConnectionPool.class))).thenReturn(mock(HttpTransaction.class));

StyxHostHttpClient hostClient = new StyxHostHttpClient(Id.id("app-01"), pool, transport);
StyxHostHttpClient hostClient = new StyxHostHttpClient(pool, transport);

hostClient.close();

Expand Down
Loading

0 comments on commit b744a77

Please sign in to comment.