-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OnSubscribeRedo - fix race conditions #2930
Conversation
@@ -235,8 +235,9 @@ public void onError(Throwable e) { | |||
@Override | |||
public void onNext(T v) { | |||
if (!done) { | |||
if (consumerCapacity.get() != Long.MAX_VALUE) { | |||
consumerCapacity.decrementAndGet(); | |||
long cap = consumerCapacity.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, one should decrement the request/capacity after the value has been emitted, because otherwise if the value becomes zero, it may open a window for a concurrent emission that is kicked off by a transition from 0 to some value. Could you see if that helps with #2863 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately still fails, now with less frequency in the 5000 attempts as before.
I'll adjust the PR anyway to reflect what you suggest.
Awesome, for once I'm happy to see a CI failure! Tilll now #2863 hasn't been repeatable except on my laptop but now has reappeared in the Travis build. |
If I put in this, I get the missing emission problem: long cc = consumerCapacity.get();
if (cc < Long.MAX_VALUE) {
consumerCapacity.compareAndSet(cc, cc - 1);
}
if (rnd.nextDouble() < 0.25) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
child.onNext(v); If I swap the operations, the test passes for me again. Could you also update the helper method in the test? static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
StringBuilder sb = new StringBuilder();
Object prev = null;
int cnt = 0;
for (Object curr : it) {
if (sb.length() > 0) {
if (!curr.equals(prev)) {
if (cnt > 1) {
sb.append(" x ").append(cnt);
cnt = 1;
}
sb.append(", ");
sb.append(curr);
} else {
cnt++;
}
} else {
sb.append(curr);
cnt++;
}
prev = curr;
}
if (cnt > 1) {
sb.append(" x ").append(cnt);
}
return sb;
} |
@akarnokd while you were putting your last comment up I synchronized the reads and writes of consumerCapacity as in the attached commit and the failures have stopped. Doesn't explain it yet but seems like consumerCapacity is important to this. I'll add your changes and revert the synchronization once you've had a quick look. |
I thought as much given our previous conversations and figured that if this is the solution you'd have suggestions for doing it properly. Anyway probably need to figure out the why still I suppose. |
child.onNext(v); | ||
long cap = consumerCapacity.get(); | ||
if (cap != Long.MAX_VALUE) { | ||
consumerCapacity.compareAndSet(cap, cap -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this is that if the CAS fails, the 'production' of the value is not accounted and the capacity seems to rise without limit. The options are 1) add a loop that makes sure the decrement actually happens or 2) switch back to decrementAndGet and don't worry about request getting to max concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep of course it would need a loop if we went for the CAS option. I can chuck one in, your call.
I'm confused by this |
I've committed the CAS loop. We can drop it as an optimization later based on perfs if we want or of course now if you think |
Can you devise a benchmark that checks the throughput on a non-throwing source? If I remember correctly, the resumeBoundary is there because an exception can happen just after the capacity reached zero but we don't immediately resubscribe until an actual request comes in. |
I've added some more comments to the code as my understanding improves. I also have marked three places with //TODO, two of which I think are potential race conditions and the third is just a marker for some of the usual optimizations done in the Another addition in the last commit is a check to ensure that the I'm sure @akarnokd will have the red pen out on this stuff then if any of the race conditions stand scrutiny I'll fix them. |
unrelated test failure |
I've ruled out one race condition (and left comments explaining why all is ok) |
I'd say if the change makes your test failure go away, that should be enough for now. I'd defer the full rewrite to 2.0. |
Righto, I'll concentrate on the fix for the #2863 test failure. Part of that is probably ruling out surprises like decrementing |
Hmm I just realized that if not emitting MissingBackpressureException then I should decrement otherwise the accounting is stuffed when more requests come through. I'll avoid the MissingBackpressureException for now but it should probably be part of some future milestone (2.0?). |
Sorry to say, no progress made. I've protected all reads and writes to |
I've pared this PR right down (after the unsuccessful hunt for the cause of #2863) so that it addresses just the race conditions that I can see and can confirm. The changes are:
#2863 is NOT fixed by these changes. |
I applied some random sleep around the resumeBoundary and these changes worked for me (on top of your changes): if (!isLocked.get() && !child.isUnsubscribed()) {
if (consumerCapacity.get() > 0) {
worker.schedule(subscribeToSource);
}
} and in the child's producer: long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
Producer producer = currentProducer.get();
if (producer != null) {
producer.request(n);
} else
if (c == 0) {
worker.schedule(subscribeToSource);
} So if there is capacity available, the source will be restarted. If there is no capacity remaining, the first 0 -> n transition will restart the source. |
I'll try it out, thanks! I'll also include a |
Nice simplification. Still getting |
3 out of 3 failures on laptop, 1 out of 25 failures on fast desktop running all tests in
|
Apparently, the Edit: forgot to mention: readd resumeBoundary as well. |
This is as far as I got. No failures when everything in the synch blocks. Then started moving out the baddies for synchronization being calls to @Override
public void onNext(Object t) {
if (!isLocked.get() && !child.isUnsubscribed()) {
final boolean scheduleNow;
synchronized (consumerCapacity) {
if (consumerCapacity.get() > 0) {
scheduleNow = true;
} else {
scheduleNow = false;
resumeBoundary.compareAndSet(false, true);
}
}
if (scheduleNow)
worker.schedule(subscribeToSource);
}
}
@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
});
}
});
child.setProducer(new Producer() {
@Override
public void request(final long n) {
final Producer producer;
final boolean requestNow;
final boolean scheduleNow;
synchronized (consumerCapacity) {
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
producer = currentProducer.get();
if (producer != null) {
requestNow = true;
scheduleNow = false;
} else {
requestNow = false;
scheduleNow = resumeBoundary.compareAndSet(true, false);
}
}
if (requestNow)
producer.request(n);
else if (scheduleNow)
worker.schedule(subscribeToSource);
}
}) |
When the test fails, how many elements are missing from the output. Does it print beginningEveryTime x 256 ? |
Yep:
|
I'm rewriting just the retry(n) variant and see if there is a general logical error with request accounting or just the OnSubscribeRedo has problems. |
Three hours without failure on three machines (i7,i7 and i5 laptop). Can you try this commit @akarnokd? |
Took 5 hours but got failure on laptop only:
This is going to make testing difficult! |
if (u < 0) { | ||
u = Long.MAX_VALUE; | ||
} | ||
u -= mprod; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akarnokd What about not doing this subtraction if u is Long.MAX_VALUE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Without it, the retry may land on MAX_VALUE - 1 on restart and not take the fast-path in certain sources.
Using jacoco as per #2937 I noticed that the subscriber to |
I've added the fix to |
The fix to |
Added more concurrency protection to |
Got this 74th run on laptop:
|
Added another race condition fix for |
54th run on desktop:
|
despite last commit, 22nd run on desktop:
|
Try disabling transparent hugepages. |
With the purpose of making the test fail more often? |
In the contrary. Linux transparent hugepages (default: on) is known to introduce latency spikes, maybe so extreme our test times out just by that. |
The catch is that we have a low probability (for the test as it stands) race condition somewhere. If I make the test fail less often then we lose the ability to find that race condition. Which timeout are you thinking of? The individual timeouts for the tests are commented out, are there scheduler timeouts? |
I think there is no race condition but your linux system gets overwhelmed by the test and it simply times out. This is why I suggested looking at JVisualVM's thread graph to detect some gaps in the execution that can be attributed to the system. |
Inspecting with jvisualvm is tricky because I have only got failures to happen using the gradle command line which means jvm restarts every thirty seconds or so. I've dug around pretty much everywhere now and may dig around a bit more in the future but perhaps we have achieved enough with this operator for it to be accepted back into the code base. The hunt certainly unearthed a few race conditions in I can rebase the commits and remove the updates to OperatorObserveOn from this commit so that it's ready for merge once you and Ben and whoever else has reviewed it. It will be good to get the fixes in #2929 merged as well of course as otherwise there will be some test flakiness on this commit. Is that a reasonable plan or would you like to keep digging? |
…re of OperatorRetry.testRetryWithBackpressureParallel
I've rebased commits. Ready for a hopefully final review. |
I'm quite a bit lost; don't know what worked and what not or did the changes work on your system or not. If the test fails with the outer pool of 1 thread, which should pose the least amount of work, then my suspect is that there is something wrong with Java on Linux or with Linux itself. Perhaps this is just another case of the recent futex bug. |
The futex stuff is interesting, I'll have a close look at that soon. One thing I'm wondering is should we break the retries when a |
I did a little test and see that the sort of The futex stuff shouldn't be an issue because it turned up in linux kernel 3.14 and I'm running 3.2.0-80 on my laptop and 3.13.0-49 on my desktop. I'll turn off transparent hugepages on my home desktop and see if the error happens. |
Turned off transparent hugepages and failed on 18th run of this command:
The run times are below. You'll notice that the runtime is not significantly larger for the final failing test which may rule out some types of OS behaviour being involved. I think we can rule out the futex bug because I run either Ubuntu 12.04 or 14.04:
|
I suggest closing this PR and start with a fresh new PR and perhaps a new discussion. |
Continuing issue in #2997. |
While searching for the cause of #2863 I bumped into this race condition (which doesn't fix #2863):
If a request is made between L238 and L239 then
consumerCapacity
may becomeLong.MAX_VALUE
on arriving at L239 in which case we don't wish to decrement it. To fix, usedcompareAndSet
.What is interesting about this fix is that in the test loop of 5000 in
OperatorRetryTest
I see many more occurrences of the failure on average (3 -> 50) presumably because the extra time to perform thecompareAndSet
action has expanded the window for the race condition causing the failures.