Skip to content

Commit

Permalink
2.x: Fix truncation bugs in replay() and ReplaySubject/Processor (#6602)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jul 30, 2019
1 parent 70f25df commit a0290b0
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ final void removeFirst() {
}

setFirst(head);
// correct the tail if all items have been removed
head = get();
if (head.get() == null) {
tail = head;
}
}
/**
* Arranges the given node is the new head from now on.
Expand Down Expand Up @@ -1015,7 +1020,7 @@ void truncate() {
int e = 0;
for (;;) {
if (next != null) {
if (size > limit) {
if (size > limit && size > 1) { // never truncate the very last item just added
e++;
size--;
prev = next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ final void trimHead() {
}

setFirst(head);
// correct the tail if all items have been removed
head = get();
if (head.get() == null) {
tail = head;
}
}
/**
* Arranges the given node is the new head from now on.
Expand Down Expand Up @@ -839,7 +844,7 @@ void truncate() {
int e = 0;
for (;;) {
if (next != null) {
if (size > limit) {
if (size > limit && size > 1) { // never truncate the very last item just added
e++;
size--;
prev = next;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,10 @@ void trim() {
TimedNode<T> h = head;

for (;;) {
if (size <= 1) {
head = h;
break;
}
TimedNode<T> next = h.get();
if (next == null) {
head = h;
Expand All @@ -1082,6 +1086,7 @@ void trim() {
}

h = next;
size--;
}

}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,10 @@ void trim() {
TimedNode<Object> h = head;

for (;;) {
if (size <= 1) {
head = h;
break;
}
TimedNode<Object> next = h.get();
if (next == null) {
head = h;
Expand All @@ -1083,6 +1087,7 @@ void trim() {
}

h = next;
size--;
}

}
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/io/reactivex/TimesteppingScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.concurrent.TimeUnit;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;

/**
* Basic scheduler that produces an ever increasing {@link #now(TimeUnit)} value.
* Use this scheduler only as a time source!
*/
public class TimesteppingScheduler extends Scheduler {

final class TimesteppingWorker extends Worker {
@Override
public void dispose() {
}

@Override
public boolean isDisposed() {
return false;
}

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run.run();
return Disposables.disposed();
}

@Override
public long now(TimeUnit unit) {
return time++;
}
}

long time;

@Override
public Worker createWorker() {
return new TimesteppingWorker();
}

@Override
public long now(TimeUnit unit) {
return time++;
}
}
75 changes: 75 additions & 0 deletions src/test/java/io/reactivex/processors/ReplayProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1750,4 +1750,79 @@ public void accept(byte[] v) throws Exception {
+ " -> " + after.get() / 1024.0 / 1024.0);
}
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange() {
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);

TestSubscriber<Integer> ts = rp.test();

rp.onNext(1);
rp.cleanupBuffer();
rp.onComplete();

ts.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange2() {
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);

TestSubscriber<Integer> ts = rp.test();

rp.onNext(1);
rp.cleanupBuffer();
rp.onNext(2);
rp.cleanupBuffer();
rp.onComplete();

ts.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange3() {
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);

TestSubscriber<Integer> ts = rp.test();

rp.onNext(1);
rp.onNext(2);
rp.onComplete();

ts.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange4() {
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10);

TestSubscriber<Integer> ts = rp.test();

rp.onNext(1);
rp.onNext(2);
rp.onComplete();

ts.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeRemoveCorrectNumberOfOld() {
TestScheduler scheduler = new TestScheduler();
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

rp.onNext(1);
rp.onNext(2);
rp.onNext(3);

scheduler.advanceTimeBy(2, TimeUnit.SECONDS);

rp.onNext(4);
rp.onNext(5);

rp.test().assertValuesOnly(4, 5);
}
}
75 changes: 75 additions & 0 deletions src/test/java/io/reactivex/subjects/ReplaySubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1342,4 +1342,79 @@ public void accept(byte[] v) throws Exception {
+ " -> " + after.get() / 1024.0 / 1024.0);
}
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange() {
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);

TestObserver<Integer> to = rs.test();

rs.onNext(1);
rs.cleanupBuffer();
rs.onComplete();

to.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange2() {
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);

TestObserver<Integer> to = rs.test();

rs.onNext(1);
rs.cleanupBuffer();
rs.onNext(2);
rs.cleanupBuffer();
rs.onComplete();

to.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange3() {
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);

TestObserver<Integer> to = rs.test();

rs.onNext(1);
rs.onNext(2);
rs.onComplete();

to.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeNoTerminalTruncationOnTimechange4() {
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10);

TestObserver<Integer> to = rs.test();

rs.onNext(1);
rs.onNext(2);
rs.onComplete();

to.assertNoErrors()
.assertComplete();
}

@Test
public void timeAndSizeRemoveCorrectNumberOfOld() {
TestScheduler scheduler = new TestScheduler();
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

rs.onNext(1);
rs.onNext(2);
rs.onNext(3); // remove 1 due to maxSize, size == 2

scheduler.advanceTimeBy(2, TimeUnit.SECONDS);

rs.onNext(4); // remove 2 due to maxSize, remove 3 due to age, size == 1
rs.onNext(5); // size == 2

rs.test().assertValuesOnly(4, 5);
}
}

0 comments on commit a0290b0

Please sign in to comment.