-
Notifications
You must be signed in to change notification settings - Fork 351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provides Prioritised delivering of Zero Streams Frames #718
Conversation
0aad6ea
to
dd0bf04
Compare
@@ -110,7 +110,7 @@ | |||
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout); | |||
this.keepAliveFramesAcceptor = | |||
keepAliveHandler.start( | |||
keepAliveSupport, sendProcessor::onNext, this::tryTerminateOnKeepAlive); | |||
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should tighten this up in the spec, whether it's stream 0 prioritised or KEEPALIVE and LEASE only as you have implemented. I'll generally defer to @robertroeser for this, but put my thoughts on the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. I have already created an issue on that!
} | ||
|
||
@Override | ||
public int getBufferSize() { | ||
return Queues.capacity(this.queue); | ||
return Integer.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
@@ -321,23 +346,29 @@ public void cancel() { | |||
|
|||
@Override | |||
public T peek() { | |||
if (!priorityQueue.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks racy (checking before calling peek) after the change, does Reactor promise this won't be concurrent code in practice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this code is never used in reactor. Thus just a forced Queue API. But indeed, it is racy in I guess i could do nothing to that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant Reactor in the sense it is a FluxProcessor and Fuseable.QueueSubscription, and are there guarantees within the Reactor framework that indicate the threading model is safe here? It sounds like you are saying it's known safe within our rsocket project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, by default it is
interface QueueSubscription<T> extends Queue<T>, Subscription {
String NOT_SUPPORTED_MESSAGE = "Although QueueSubscription extends Queue it is purely internal" +
" and only guarantees support for poll/clear/size/isEmpty." +
" Instances shouldn't be used/exposed as Queue outside of Reactor operators.";
...
@Override
@Nullable
default T peek() {
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}
and as it is written in the error message only poll supposed to be used. So I don't think it matters at all and the best I can do is removing the peek operation at all.
@simonbasle @smaldini can you please correct me if I'm wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are correct @OlegDokuka
Akka has ControlAwaredMailbox for this too. |
caa03bb
to
cd08a7c
Compare
c3fb896
to
0ee435e
Compare
Signed-off-by: Oleh Dokuka <[email protected]>
…arks Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
0ee435e
to
39a74d7
Compare
Any news about this feature? |
@linux-china why did you need this? It is flawed - and RSocket as protocol has everything to guarantee outgoing queue is not growing unbounded |
@mostroverkhov Now we use metadataPush to push some critical information, such as configuration for spring boot app, broker cluster changing, app status changing. the metadata push is zero stream id based, and it's very good to push such critical messages and make the app to do some responding ASAP. With prioritised delivering, it's good for medatapush and keep-alive checking. For example, Now we want to implement token bucket to control message sending, if the 0 stream id messages are in this queue, and the requester and responder can not exchange some critical information by metadataPush. |
Signalling additional capacity to peer while ignoring existing enqueued messages (size may be estimated by time-to-wire delay) removes natural negative feedback loop that keeps endpoints stable - responses will start to timeout, likely to be retried by loadbalancer and overwhelm remaining ones.
Keep-alive frame can have data attached to It. Most likely this data will be RTT as in http2 ping frame. Having RTT include both RSocket peers outgoing queue latencies in addition to network latency gives true information about RSocket health. Network only RTT of 1 ms with prioritized keep-alives is not useful if 1000ms (instead of target e.g. 5ms) is spent on outgoing queues - in such case this is just false information - such RSocket is not healthy and I want to aggressively reduce allowed requests permits for It. |
this is the downside of the current implementation and nothing more.
Funny, the same HTTP/2 spec says that
So I rather say - this PR makes even more sense than before |
@OlegDokuka you are conflating rsocket keepalives with connection keep-alives as pointed by Steve Gury rsocket/rsocket#280 (comment). As I said above, network only RTT is useless for load estimation, and prioritizing keep-alives introduced by this PR just masks a problem when you have 1 ms keep-alive RTT but requests timeout after 5 sec not even hitting the network |
@mostroverkhov looking back into the history of the protocol and keepalive development, I found this discussion(rsocket/rsocket#8 (comment)) which states that keepalive is more on identifying the connection and rsocket problem rather than identifying how much messages are enqueued on the application level. Due to what @stevegury said, in order to identify queueing time, better to use simple request-response on the level of application logic. @stevegury correct me if I'm wrong. Also, looking over all the issues related to a keepalive, it seems that in all cases keepalive is mentioned in the context of client and connection and not in the context of the user's application. (rsocket/rsocket#58 (comment)). @mostroverkhov feel free to open a ticket at rsocket-spec repo if you have any concerns |
Motivation
In the current implementation,
RSocketRequester
sends all the data over theUnboundedProcessor
which in a nutshell is aMpScUnboundedArrayQueue
which in turn does not have any prioritization mechanism. In general, there is no need to have such functionality unless it comes to delivering critical internal frames/payloads such as KeepAlive Frame or Leases Frame, which SHOULD be delivered as soon as possible. The problem comes whenUnboundedProcessor
is overwhelmed by other packets:The above example shows how the client can quickly overwhelm its own queue so all the other packets as KEEPALIVE or LEASE will be simply stacked at the very top and be delivered with a significant delay. Especially, KEEPALIVE can simply cause an unwonted cancelation of the alive connection.
Proposal
To make sure that all the critical frames are delivered as soon as possible, we can add a kind of priority channel, or directly talking we can add a separate
MpScUnboundedArrayQueue
inside theUnboundedProcessor
as a way for Zero Stream frames prioritized delivering.In turn, under the hood, the Processor will be drained as in the following example:
Benchmarks
The benchmark has shown that the performance impact is insignificant (within a couple of percents when it comes to standalone
UnboundedProcessor
measurements), and no difference was observed for the standard E2e RSocket test.