-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor StyxHostHttpClient #338
Changes from 2 commits
a558747
49fce32
af38453
2352c98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,76 +15,41 @@ | |
*/ | ||
package com.hotels.styx.client; | ||
|
||
import com.hotels.styx.api.Id; | ||
import com.hotels.styx.api.LiveHttpRequest; | ||
import com.hotels.styx.api.LiveHttpResponse; | ||
import com.hotels.styx.api.ResponseEventListener; | ||
import com.hotels.styx.api.exceptions.NoAvailableHostsException; | ||
import com.hotels.styx.client.connectionpool.ConnectionPool; | ||
import rx.Observable; | ||
|
||
import java.util.Optional; | ||
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
/** | ||
* Encapsulates a single connection to remote server which we can use to send the messages. | ||
*/ | ||
class Transport { | ||
private final Id appId; | ||
private final CharSequence originIdHeaderName; | ||
|
||
public Transport(Id appId, CharSequence originIdHeaderName) { | ||
this.appId = requireNonNull(appId); | ||
this.originIdHeaderName = requireNonNull(originIdHeaderName); | ||
} | ||
|
||
public HttpTransaction send(LiveHttpRequest request, Optional<ConnectionPool> origin, Id originId) { | ||
Observable<Connection> connection = connection(request, origin); | ||
public HttpTransaction send(LiveHttpRequest request, ConnectionPool connectionPool) { | ||
Observable<Connection> connection = connectionPool.borrowConnection(); | ||
|
||
return new HttpTransaction() { | ||
@Override | ||
public Observable<LiveHttpResponse> response() { | ||
return connection.flatMap(connection -> { | ||
Observable<LiveHttpResponse> responseObservable = connection.write(request) | ||
.map(response -> addOriginId(originId, response)); | ||
Observable<LiveHttpResponse> responseObservable = connection.write(request); | ||
|
||
return ResponseEventListener.from(responseObservable) | ||
.whenCancelled(() -> closeIfConnected(origin, connection)) | ||
.whenResponseError(cause -> closeIfConnected(origin, connection)) | ||
.whenContentError(cause -> closeIfConnected(origin, connection)) | ||
.whenCompleted(() -> returnIfConnected(origin, connection)) | ||
.whenCancelled(() -> closeIfConnected(connectionPool, connection)) | ||
.whenResponseError(cause -> closeIfConnected(connectionPool, connection)) | ||
.whenContentError(cause -> closeIfConnected(connectionPool, connection)) | ||
.whenCompleted(() -> returnIfConnected(connectionPool, connection)) | ||
.apply(); | ||
}); | ||
} | ||
|
||
private synchronized void closeIfConnected(Optional<ConnectionPool> connectionPool, Connection connection) { | ||
if (connection != null && connectionPool.isPresent()) { | ||
connectionPool.get().closeConnection(connection); | ||
} | ||
private synchronized void closeIfConnected(ConnectionPool connectionPool, Connection connection) { | ||
mikkokar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
connectionPool.closeConnection(connection); | ||
} | ||
|
||
private synchronized void returnIfConnected(Optional<ConnectionPool> connectionPool, Connection connection) { | ||
if (connection != null && connectionPool.isPresent()) { | ||
connectionPool.get().returnConnection(connection); | ||
} | ||
private synchronized void returnIfConnected(ConnectionPool connectionPool, Connection connection) { | ||
mikkokar marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we ever need to synchronize on something different than the current object? In that case, do we actually need to set 'closeIfConnected' and/or 'returnIfConnected' as synchronized methods or should we try to ensure they are idempotent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey David. There is no longer need to synchronise on anything. I realised myself the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about it, perhaps I should remove these methods altogether. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, sure, I see your commit now right before my comment. You can remove them but I don't see too much of a difference between both options to be honest... I don't dislike the 'extra indirection' myself. |
||
connectionPool.returnConnection(connection); | ||
} | ||
}; | ||
} | ||
|
||
private Observable<Connection> connection(LiveHttpRequest request, Optional<ConnectionPool> origin) { | ||
return origin | ||
.map(ConnectionPool::borrowConnection) | ||
.orElseGet(() -> { | ||
// Aggregates an empty body: | ||
request.consume(); | ||
return Observable.error(new NoAvailableHostsException(appId)); | ||
}); | ||
} | ||
|
||
private LiveHttpResponse addOriginId(Id originId, LiveHttpResponse response) { | ||
return response.newBuilder() | ||
.header(originIdHeaderName, originId) | ||
.build(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we confirmed that the connections are never null? It would make sense, but thought it worth checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A
null
value wouldn't be emitted under any circumstances (that would be a bug elsewhere in styx).Note that the Reactive Streams specification specifically forbids
null
values as events (Rule 2.13 https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md). Although we are still in RxJava 1.0, our intention is to fully migrate to reactive-streams compatible implementation (Flux), ensuring it cannot be null.