-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Styx Eventual class for asynchronous events. #309
Styx Eventual class for asynchronous events. #309
Conversation
2. Transport: Add FSM to drive the connection pool returns and cancellations. 3. Add new Requests class. 4. Generalise streaming response event FSM into a separate ResponseEventListener class. 5. Update StyxBackendServiceClient.
components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java
Show resolved
Hide resolved
}); | ||
} | ||
|
||
default void drop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this call drop
on the body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because the drop
on the message still consumes the byte stream arriving from the TCP connection. It just drops each byte chunk that is received while retaining the publisher chain between styx server and styx client.
This message on the other hand drops the message. It is saying that we want to drop this message as it is and we are not interested in consuming the content.
As a result the underlying TCP connection gets closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this method must be used before any of the content is read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this from the API for now, and make a note of this issue.
Basically the only way to "read" content before calling this method is to subscribe with a 3rd partly subscriber like so:
flux = Flux.from(response.body())
.subscribe(buf -> doSomething(buf))
So now it would be illegal to call drop
while this subscription is ongoing. This is because it causes a 2nd subscription to the live data stream which is not supported.
components/api/src/test/java/com/hotels/styx/api/EventualTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void releasesIfRequestIsUnsubscribedAfterHeaders() { | ||
public void returnsConnectionBackToPool_delayedResponseError() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another test that does not follow naming conventions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should discuss and clarify our naming conventions. Could you clarify what aspect doesn't conform to naming conventions?
In the meanwhile I will rename this test to: returnsConnectionBackToPoolDueToCancelledHeaders
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java identifiers use camel case only, not underscores (except in static final fields, AKA constants). This is not specific to us, just a common Java convention we follow.
verify(pool).closeConnection(any(Connection.class)); | ||
} | ||
|
||
@Test | ||
public void terminatesConnection_bodyIsUnsubscribed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename to returnsConnectionBackToPoolDueToDelayedResponseError
.
private Runnable onCompletedAction; | ||
|
||
private final StateMachine<State> fsm = new StateMachine.Builder<State>() | ||
.initialState(State.INITIAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These State
instances can be statically imported to make the code more readable.
} | ||
|
||
public ResponseEventListener whenCancelled(Runnable action) { | ||
this.cancelAction = action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These "when" methods should do null checks.
.onInappropriateEvent((state, event) -> state) | ||
.build(); | ||
|
||
ResponseEventListener(Observable<HttpResponse> publisher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see lots of things that could be private
, and fields that could be final
in this class, such as the constructor and in the inner classes further down. Please resolve them all.
components/common/src/test/java/com/hotels/styx/api/RequestsTest.java
Outdated
Show resolved
Hide resolved
components/common/src/test/java/com/hotels/styx/api/ResponseEventListenerTest.java
Outdated
Show resolved
Hide resolved
components/common/src/test/java/com/hotels/styx/api/ResponseEventListenerTest.java
Outdated
Show resolved
Hide resolved
components/proxy/src/main/java/com/hotels/styx/routing/handlers/StandardHttpPipeline.java
Outdated
Show resolved
Hide resolved
system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/OriginCancellationMetrics.scala
Show resolved
Hide resolved
system-tests/example-styx-plugin/src/main/java/loadtest/plugins/AsyncRequestPluginFactory.java
Outdated
Show resolved
Hide resolved
components/common/src/main/java/com/hotels/styx/api/ResponseEventListener.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
return fromRxObservable(toRxObservable(client.handle(request, this.context)) | ||
.compose(StandardHttpPipeline::sendErrorOnDoubleSubscription)); | ||
return new Eventual<>(toPublisher(RxReactiveStreams.toObservable(client.handle(request, this.context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I reading this correctly? You are transforming from publisher to observable and then back to publisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It first turns the Eventual
from client.handle
into an rx.Observable
, so that sendErrorOnDoubleSubscription
can be composed in. After that the composed rx.Observable
is converted back to publisher.
This needs to be tidied up but that could be a separate enhancement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be written line this, to show which parts happen when:
return new Eventual<>(
toPublisher(
toObservable(client.handle(request, this.context))
.compose(StandardHttpPipeline::sendErrorOnDoubleSubscription)
)
);
Replaces
StyxObservable
withEventual
class, that asynchronously publishes a single event only.A main difference between a Future and an Eventual supports deferred actions like Reactor Mono (in fact it is just a wrapper around Mono). That is, where as CompletableFuture action starts running straight away, an Eventual needs an explicit subscription to start the action.
The codebase still needs some tidying up, but you should get an impression how the final API will look like.