Skip to content

Commit

Permalink
Make Publish Operator Release RingBuffer
Browse files Browse the repository at this point in the history
- it was retaining the RxRingBuffer reference between subscribes which meant it was never released to the object pool
  • Loading branch information
benjchristensen committed Dec 24, 2014
1 parent 4a08644 commit e39599c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
30 changes: 21 additions & 9 deletions src/main/java/rx/internal/operators/OperatorPublish.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void call() {
public void connect(Action1<? super Subscription> connection) {
// each time we connect we create a new Subscription
boolean shouldSubscribe = false;

// subscription is the state of whether we are connected or not
OriginSubscriber<T> origin = requestHandler.state.getOrigin();
if (origin == null) {
Expand All @@ -113,7 +113,7 @@ public void connect(Action1<? super Subscription> connection) {
connection.call(Subscriptions.create(new Action0() {
@Override
public void call() {
Subscription s = requestHandler.state.getOrigin();
OriginSubscriber<T> s = requestHandler.state.getOrigin();
requestHandler.state.setOrigin(null);
if (s != null) {
s.unsubscribe();
Expand All @@ -135,9 +135,11 @@ private static class OriginSubscriber<T> extends Subscriber<T> {
private final RequestHandler<T> requestHandler;
private final AtomicLong originOutstanding = new AtomicLong();
private final long THRESHOLD = RxRingBuffer.SIZE / 4;
private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();

OriginSubscriber(RequestHandler<T> requestHandler) {
this.requestHandler = requestHandler;
add(buffer);
}

@Override
Expand Down Expand Up @@ -199,6 +201,8 @@ public void onNext(T t) {
* with a complicated state machine so I'm sticking with mutex locks and just trying to make sure the work done while holding the
* lock is small (such as never emitting data).
*
* This does however mean we can't rely on a reference to State being consistent. For example, it can end up with a null OriginSubscriber.
*
* @param <T>
*/
private static class State<T> {
Expand Down Expand Up @@ -288,7 +292,7 @@ private long resetAfterSubscriberUpdate() {

private static class RequestHandler<T> {
private final NotificationLite<T> notifier = NotificationLite.instance();
private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();

private final State<T> state = new State<T>();
@SuppressWarnings("unused")
volatile long wip;
Expand All @@ -297,16 +301,24 @@ private static class RequestHandler<T> {

public void requestFromChildSubscriber(Subscriber<? super T> subscriber, Long request) {
state.requestFromSubscriber(subscriber, request);
drainQueue();
OriginSubscriber<T> originSubscriber = state.getOrigin();
if(originSubscriber != null) {
drainQueue(originSubscriber);
}
}

public void emit(Object t) throws MissingBackpressureException {
OriginSubscriber<T> originSubscriber = state.getOrigin();
if(originSubscriber == null) {
// unsubscribed so break ... we are done
return;
}
if (notifier.isCompleted(t)) {
buffer.onCompleted();
originSubscriber.buffer.onCompleted();
} else {
buffer.onNext(notifier.getValue(t));
originSubscriber.buffer.onNext(notifier.getValue(t));
}
drainQueue();
drainQueue(originSubscriber);
}

private void requestMoreAfterEmission(int emitted) {
Expand All @@ -319,7 +331,7 @@ private void requestMoreAfterEmission(int emitted) {
}
}

public void drainQueue() {
public void drainQueue(OriginSubscriber<T> originSubscriber) {
if (WIP.getAndIncrement(this) == 0) {
int emitted = 0;
do {
Expand All @@ -338,7 +350,7 @@ public void drainQueue() {
if (!shouldEmit) {
break;
}
Object o = buffer.poll();
Object o = originSubscriber.buffer.poll();
if (o == null) {
// nothing in buffer so increment outstanding back again
state.incrementOutstandingAfterFailedEmit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void call(Integer l) {
s2.unsubscribe(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
s1.unsubscribe();

System.out.println("onNext: " + nextCount.get());
System.out.println("onNext Count: " + nextCount.get());

// it will emit twice because it is synchronous
assertEquals(nextCount.get(), receivedCount.get() * 2);
Expand Down

0 comments on commit e39599c

Please sign in to comment.