Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Styx Eventual class for asynchronous events. #309

Merged
merged 10 commits into from
Oct 18, 2018

Conversation

mikkokar
Copy link
Contributor

Replaces StyxObservable with Eventual class, that asynchronously publishes a single event only.

A main difference between a Future and an Eventual supports deferred actions like Reactor Mono (in fact it is just a wrapper around Mono). That is, where as CompletableFuture action starts running straight away, an Eventual needs an explicit subscription to start the action.

The codebase still needs some tidying up, but you should get an impression how the final API will look like.

2. Transport: Add FSM to drive the connection pool returns and cancellations.
3. Add new Requests class.
4. Generalise streaming response event FSM into a separate ResponseEventListener
class.
5. Update StyxBackendServiceClient.
});
}

default void drop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this call drop on the body?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because the drop on the message still consumes the byte stream arriving from the TCP connection. It just drops each byte chunk that is received while retaining the publisher chain between styx server and styx client.

This message on the other hand drops the message. It is saying that we want to drop this message as it is and we are not interested in consuming the content.

As a result the underlying TCP connection gets closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this method must be used before any of the content is read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this from the API for now, and make a note of this issue.

Basically the only way to "read" content before calling this method is to subscribe with a 3rd partly subscriber like so:

        flux = Flux.from(response.body())
                .subscribe(buf -> doSomething(buf))

So now it would be illegal to call drop while this subscription is ongoing. This is because it causes a 2nd subscription to the live data stream which is not supported.

}

@Test
public void releasesIfRequestIsUnsubscribedAfterHeaders() {
public void returnsConnectionBackToPool_delayedResponseError() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another test that does not follow naming conventions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should discuss and clarify our naming conventions. Could you clarify what aspect doesn't conform to naming conventions?

In the meanwhile I will rename this test to: returnsConnectionBackToPoolDueToCancelledHeaders.

Copy link
Contributor

@kvosper kvosper Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java identifiers use camel case only, not underscores (except in static final fields, AKA constants). This is not specific to us, just a common Java convention we follow.

verify(pool).closeConnection(any(Connection.class));
}

@Test
public void terminatesConnection_bodyIsUnsubscribed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename to returnsConnectionBackToPoolDueToDelayedResponseError.

private Runnable onCompletedAction;

private final StateMachine<State> fsm = new StateMachine.Builder<State>()
.initialState(State.INITIAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These State instances can be statically imported to make the code more readable.

}

public ResponseEventListener whenCancelled(Runnable action) {
this.cancelAction = action;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These "when" methods should do null checks.

.onInappropriateEvent((state, event) -> state)
.build();

ResponseEventListener(Observable<HttpResponse> publisher) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see lots of things that could be private, and fields that could be final in this class, such as the constructor and in the inner classes further down. Please resolve them all.

}
}
return fromRxObservable(toRxObservable(client.handle(request, this.context))
.compose(StandardHttpPipeline::sendErrorOnDoubleSubscription));
return new Eventual<>(toPublisher(RxReactiveStreams.toObservable(client.handle(request, this.context))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this correctly? You are transforming from publisher to observable and then back to publisher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It first turns the Eventual from client.handle into an rx.Observable, so that sendErrorOnDoubleSubscription can be composed in. After that the composed rx.Observable is converted back to publisher.

This needs to be tidied up but that could be a separate enhancement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be written line this, to show which parts happen when:

return new Eventual<>(
                    toPublisher(
                            toObservable(client.handle(request, this.context))
                                    .compose(StandardHttpPipeline::sendErrorOnDoubleSubscription)
                    )
            );

@mikkokar mikkokar changed the base branch from master to feature-styx-1.0-api October 18, 2018 07:59
@mikkokar mikkokar merged commit c8b581f into ExpediaGroup:feature-styx-1.0-api Oct 18, 2018
@mikkokar mikkokar deleted the styx-future branch October 18, 2018 08:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants