From 9f394817fc9650c412efa4a0d2716730742f8ab5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 30 Aug 2015 18:32:57 +0200 Subject: [PATCH] Operator buffer with boundary and open-close, fixes to the timed variants. Added QueueDrain and QueueDrainSubscriber for common queue-drain operations. Not applied outside the buffer()s as of now. --- src/main/java/io/reactivex/Observable.java | 14 +- src/main/java/io/reactivex/Try.java | 17 +- .../disposables/SetCompositeResource.java | 2 +- .../operators/OperatorBufferBoundary.java | 307 ++++++++++++++++++ .../OperatorBufferExactBoundary.java | 216 ++++++++++++ .../operators/OperatorBufferTimed.java | 210 ++++++------ .../subscribers/QueueDrainSubscriber.java | 246 ++++++++++++++ .../subscriptions/EmptySubscription.java | 8 + .../reactivex/internal/util/QueueDrain.java | 167 ++++++++++ 9 files changed, 1054 insertions(+), 133 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/OperatorBufferBoundary.java create mode 100644 src/main/java/io/reactivex/internal/operators/OperatorBufferExactBoundary.java create mode 100644 src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java create mode 100644 src/main/java/io/reactivex/internal/util/QueueDrain.java diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 94a950d071..282c0d9087 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -1878,9 +1878,12 @@ public final > Observable Observable bufferOpenings, Function> bufferClosingSelector, Supplier bufferSupplier) { - // TODO - throw new UnsupportedOperationException(); + Objects.requireNonNull(bufferOpenings); + Objects.requireNonNull(bufferClosingSelector); + Objects.requireNonNull(bufferSupplier); + return lift(new OperatorBufferBoundary<>(bufferOpenings, bufferClosingSelector, bufferSupplier)); } + public final Observable> buffer(Observable boundary) { /* * XXX: javac complains if this is not manually cast, Eclipse is fine @@ -1888,9 +1891,10 @@ public final Observable> buffer(Observable boundary) { return buffer(boundary, (Supplier>)ArrayList::new); } - public final > Observable> buffer(Observable boundary, Supplier bufferSupplier) { - // TODO - throw new UnsupportedOperationException(); + public final > Observable buffer(Observable boundary, Supplier bufferSupplier) { + Objects.requireNonNull(boundary); + Objects.requireNonNull(bufferSupplier); + return lift(new OperatorBufferExactBoundary<>(boundary, bufferSupplier)); } public final Observable> buffer(Observable boundary, int initialCapacity) { diff --git a/src/main/java/io/reactivex/Try.java b/src/main/java/io/reactivex/Try.java index 2084b53389..c86d81043b 100644 --- a/src/main/java/io/reactivex/Try.java +++ b/src/main/java/io/reactivex/Try.java @@ -34,8 +34,8 @@ private Try(T value, Throwable error) { /** * Constructs a Try instance by wrapping the given value. * - * @param value - * @return + * @param value the value to wrap + * @return the created Try instance */ public static Try ofValue(T value) { // TODO ? Objects.requireNonNull(value); @@ -47,17 +47,16 @@ public static Try ofValue(T value) { * *

Null Throwables are replaced by NullPointerException instance in this Try. * - * @param e - * @return + * @param e the exception to wrap + * @return the new Try instance holding the exception */ public static Try ofError(Throwable e) { - // TODO ? Objects.requireNonNull(e); return new Try<>(null, e != null ? e : new NullPointerException()); } /** * Returns the value or null if the value is actually null or if this Try holds an error instead. - * @return + * @return the value contained * @see #hasValue() */ public T value() { @@ -67,7 +66,7 @@ public T value() { /** * Returns the error or null if this Try holds a value instead. * - * @return + * @return the Throwable contained or null * */ public Throwable error() { @@ -76,7 +75,7 @@ public Throwable error() { /** * Returns true if this Try holds an error. - * @return + * @return true if this Try holds an error */ public boolean hasError() { return error != null; @@ -84,7 +83,7 @@ public boolean hasError() { /** * Returns true if this Try holds a value. - * @return + * @return true if this Try holds a value */ public boolean hasValue() { return error == null; diff --git a/src/main/java/io/reactivex/internal/disposables/SetCompositeResource.java b/src/main/java/io/reactivex/internal/disposables/SetCompositeResource.java index ddc192e8e3..664d98ed76 100644 --- a/src/main/java/io/reactivex/internal/disposables/SetCompositeResource.java +++ b/src/main/java/io/reactivex/internal/disposables/SetCompositeResource.java @@ -84,7 +84,7 @@ public boolean add(T newResource) { * Removes the given resource from this composite and calls the disposer if the resource * was indeed in the composite. * @param resource the resource to remove, not-null (not verified) - * @return + * @return true if the resource was removed, false otherwise */ @Override public boolean remove(T resource) { diff --git a/src/main/java/io/reactivex/internal/operators/OperatorBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/OperatorBufferBoundary.java new file mode 100644 index 0000000000..c7a0c7c91a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/OperatorBufferBoundary.java @@ -0,0 +1,307 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators; + +import java.util.*; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.*; + +import org.reactivestreams.*; + +import io.reactivex.Observable.Operator; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.SetCompositeResource; +import io.reactivex.internal.queue.MpscLinkedQueue; +import io.reactivex.internal.subscribers.*; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.SerializedSubscriber; + +public final class OperatorBufferBoundary, Open, Close> implements Operator { + final Supplier bufferSupplier; + final Publisher bufferOpen; + final Function> bufferClose; + + public OperatorBufferBoundary(Publisher bufferOpen, + Function> bufferClose, Supplier bufferSupplier) { + this.bufferOpen = bufferOpen; + this.bufferClose = bufferClose; + this.bufferSupplier = bufferSupplier; + } + + @Override + public Subscriber apply(Subscriber t) { + return new BufferBoundarySubscriber<>( + new SerializedSubscriber<>(t), + bufferOpen, bufferClose, bufferSupplier + ); + } + + static final class BufferBoundarySubscriber, Open, Close> + extends QueueDrainSubscriber implements Subscription, Disposable { + final Publisher bufferOpen; + final Function> bufferClose; + final Supplier bufferSupplier; + final SetCompositeResource resources; + + Subscription s; + + final List buffers; + + volatile int windows; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WINDOWS = + AtomicIntegerFieldUpdater.newUpdater(BufferBoundarySubscriber.class, "windows"); + + public BufferBoundarySubscriber(Subscriber actual, + Publisher bufferOpen, + Function> bufferClose, + Supplier bufferSupplier) { + super(actual, new MpscLinkedQueue<>()); + this.bufferOpen = bufferOpen; + this.bufferClose = bufferClose; + this.bufferSupplier = bufferSupplier; + this.buffers = new LinkedList<>(); + this.resources = new SetCompositeResource<>(Disposable::dispose); + } + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validateSubscription(this.s, s)) { + return; + } + this.s = s; + + BufferOpenSubscriber bos = new BufferOpenSubscriber<>(this); + resources.add(bos); + + actual.onSubscribe(this); + + WINDOWS.lazySet(this, 1); + bufferOpen.subscribe(bos); + + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + synchronized (t) { + for (U b : buffers) { + b.add(t); + } + } + } + + @Override + public void onError(Throwable t) { + cancel(); + cancelled = true; + synchronized (this) { + buffers.clear(); + } + actual.onError(t); + } + + @Override + public void onComplete() { + cancel(); + List list; + synchronized (this) { + list = new ArrayList<>(buffers); + buffers.clear(); + } + + Queue q = queue; + for (U u : list) { + q.offer(u); + } + done = true; + if (enter()) { + drainMaxLoop(q, actual, false, this); + } + } + + @Override + public void request(long n) { + requested(n); + } + + @Override + public void dispose() { + resources.dispose(); + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + dispose(); + } + } + + @Override + public boolean accept(Subscriber a, U v) { + a.onNext(v); + return true; + } + + void open(Open window) { + if (cancelled) { + return; + } + + U b; + + try { + b = bufferSupplier.get(); + } catch (Throwable e) { + onError(e); + return; + } + + if (b == null) { + onError(new NullPointerException("The buffer supplied is null")); + return; + } + + Publisher p; + + try { + p = bufferClose.apply(window); + } catch (Throwable e) { + onError(e); + return; + } + + if (p == null) { + onError(new NullPointerException("The buffer closing publisher is null")); + return; + } + + if (cancelled) { + return; + } + + synchronized (this) { + if (cancelled) { + return; + } + buffers.add(b); + } + + BufferCloseSubscriber bcs = new BufferCloseSubscriber<>(b, this); + resources.add(bcs); + + WINDOWS.getAndIncrement(this); + + p.subscribe(bcs); + } + + void openFinished(Disposable d) { + if (resources.remove(d)) { + if (WIP.decrementAndGet(this) == 0) { + onComplete(); + } + } + } + + void close(U b, Disposable d) { + + boolean e; + synchronized (this) { + e = buffers.remove(b); + } + + if (e) { + fastpathOrderedEmitMax(b, false, this); + } + + if (resources.remove(d)) { + if (WIP.decrementAndGet(this) == 0) { + onComplete(); + } + } + } + } + + static final class BufferOpenSubscriber, Open, Close> + extends DisposableSubscriber { + final BufferBoundarySubscriber parent; + + boolean done; + + public BufferOpenSubscriber(BufferBoundarySubscriber parent) { + this.parent = parent; + } + @Override + public void onNext(Open t) { + if (done) { + return; + } + parent.open(t); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + parent.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + parent.openFinished(this); + } + } + + static final class BufferCloseSubscriber, Open, Close> + extends DisposableSubscriber { + final BufferBoundarySubscriber parent; + final U value; + boolean done; + public BufferCloseSubscriber(U value, BufferBoundarySubscriber parent) { + this.parent = parent; + this.value = value; + } + + @Override + public void onNext(Close t) { + onComplete(); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + parent.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + parent.close(value, this); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/OperatorBufferExactBoundary.java b/src/main/java/io/reactivex/internal/operators/OperatorBufferExactBoundary.java new file mode 100644 index 0000000000..026d86baa7 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/OperatorBufferExactBoundary.java @@ -0,0 +1,216 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators; + +import java.util.Collection; +import java.util.function.Supplier; + +import org.reactivestreams.*; + +import io.reactivex.Observable.Operator; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.queue.MpscLinkedQueue; +import io.reactivex.internal.subscribers.*; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.subscribers.SerializedSubscriber; + +public final class OperatorBufferExactBoundary, B> implements Operator { + final Publisher boundary; + final Supplier bufferSupplier; + + public OperatorBufferExactBoundary(Publisher boundary, Supplier bufferSupplier) { + this.boundary = boundary; + this.bufferSupplier = bufferSupplier; + } + + @Override + public Subscriber apply(Subscriber t) { + return new BufferExactBondarySubscriber<>(new SerializedSubscriber<>(t), bufferSupplier, boundary); + } + + static final class BufferExactBondarySubscriber, B> + extends QueueDrainSubscriber implements Subscriber, Subscription, Disposable { + /** */ + final Supplier bufferSupplier; + final Publisher boundary; + + Subscription s; + + Disposable other; + + U buffer; + + public BufferExactBondarySubscriber(Subscriber actual, Supplier bufferSupplier, + Publisher boundary) { + super(actual, new MpscLinkedQueue<>()); + this.bufferSupplier = bufferSupplier; + this.boundary = boundary; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validateSubscription(this.s, s)) { + return; + } + this.s = s; + + U b; + + try { + b = bufferSupplier.get(); + } catch (Throwable e) { + cancelled = true; + s.cancel(); + EmptySubscription.error(e, actual); + return; + } + + if (b == null) { + cancelled = true; + s.cancel(); + EmptySubscription.error(new NullPointerException("The buffer supplied is null"), actual); + return; + } + buffer = b; + + BufferBoundarySubscriber bs = new BufferBoundarySubscriber<>(this); + other = bs; + + actual.onSubscribe(this); + + if (!cancelled) { + s.request(Long.MAX_VALUE); + + boundary.subscribe(bs); + } + } + + @Override + public void onNext(T t) { + synchronized (this) { + U b = buffer; + if (b == null) { + return; + } + b.add(t); + } + } + + @Override + public void onError(Throwable t) { + cancel(); + actual.onError(t); + } + + @Override + public void onComplete() { + U b; + synchronized (this) { + b = buffer; + if (b == null) { + return; + } + buffer = null; + } + queue.offer(b); + done = true; + if (enter()) { + drainMaxLoop(queue, actual, false, this); + } + } + + @Override + public void request(long n) { + requested(n); + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + other.dispose(); + s.cancel(); + + if (enter()) { + queue.clear(); + } + } + } + + void next() { + + U next; + + try { + next = bufferSupplier.get(); + } catch (Throwable e) { + cancel(); + actual.onError(e); + return; + } + + if (next == null) { + cancel(); + actual.onError(new NullPointerException("The buffer supplied is null")); + return; + } + + U b; + synchronized (this) { + b = buffer; + if (b == null) { + return; + } + buffer = next; + } + + fastpathEmitMax(b, false, this); + } + + @Override + public void dispose() { + cancel(); + } + + @Override + public boolean accept(Subscriber a, U v) { + actual.onNext(v); + return true; + } + + } + + static final class BufferBoundarySubscriber, B> extends DisposableSubscriber { + final BufferExactBondarySubscriber parent; + + public BufferBoundarySubscriber(BufferExactBondarySubscriber parent) { + this.parent = parent; + } + + @Override + public void onNext(B t) { + parent.next(); + } + + @Override + public void onError(Throwable t) { + parent.onError(t); + } + + @Override + public void onComplete() { + parent.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/OperatorBufferTimed.java b/src/main/java/io/reactivex/internal/operators/OperatorBufferTimed.java index fe20a83574..2c999f3f87 100644 --- a/src/main/java/io/reactivex/internal/operators/OperatorBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/OperatorBufferTimed.java @@ -15,7 +15,7 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Supplier; import org.reactivestreams.*; @@ -24,8 +24,9 @@ import io.reactivex.Scheduler; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; +import io.reactivex.internal.queue.MpscLinkedQueue; +import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.*; -import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.subscribers.SerializedSubscriber; public final class OperatorBufferTimed> implements Operator { @@ -72,11 +73,8 @@ public Subscriber apply(Subscriber t) { bufferSupplier, timespan, timeskip, unit, w); } - static final class BufferExactUnboundedSubscriber> extends AtomicLong implements Subscriber, Subscription, Runnable { - /** */ - private static final long serialVersionUID = -2494880612098980129L; - - final Subscriber actual; + static final class BufferExactUnboundedSubscriber> + extends QueueDrainSubscriber implements Subscription, Runnable, Disposable { final Supplier bufferSupplier; final long timespan; final TimeUnit unit; @@ -94,11 +92,11 @@ static final class BufferExactUnboundedSubscriber { }; - + public BufferExactUnboundedSubscriber( Subscriber actual, Supplier bufferSupplier, long timespan, TimeUnit unit, Scheduler scheduler) { - this.actual = actual; + super(actual, new MpscLinkedQueue<>()); this.bufferSupplier = bufferSupplier; this.timespan = timespan; this.unit = unit; @@ -128,8 +126,13 @@ public void onSubscribe(Subscription s) { return; } - timer = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + + if (timer == null) { + timer = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); + } } @Override @@ -158,31 +161,21 @@ public void onComplete() { U b; synchronized (this) { b = buffer; - buffer = null; - } - if (b != null) { - long r = get(); - if (r != 0L) { - actual.onNext(b); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } - } else { - cancel(); - actual.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); + if (b == null) { return; } + buffer = null; + } + queue.offer(b); + done = true; + if (enter()) { + drainMaxLoop(queue, actual, false, this); } - - actual.onComplete(); } @Override public void request(long n) { - if (SubscriptionHelper.validateRequest(n)) { - return; - } - BackpressureHelper.add(this, n); + requested(n); } @Override @@ -195,7 +188,10 @@ public void cancel() { void disposeTimer() { Disposable d = timer; if (d != CANCELLED) { - + d = TIMER.getAndSet(this, CANCELLED); + if (d != CANCELLED && d != null) { + d.dispose(); + } } } @@ -244,25 +240,24 @@ public void run() { return; } - long r = get(); - if (r != 0L) { - actual.onNext(current); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } - } else { - selfCancel = true; - cancel(); - actual.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); - } + fastpathEmitMax(current, false, this); + } + + @Override + public boolean accept(Subscriber a, U v) { + actual.onNext(v); + return true; } + @Override + public void dispose() { + selfCancel = true; + cancel(); + } } - static final class BufferSkipBoundedSubscriber> extends AtomicLong implements Subscriber, Subscription, Runnable { - /** */ - private static final long serialVersionUID = -2714725589685327677L; - final Subscriber actual; + static final class BufferSkipBoundedSubscriber> + extends QueueDrainSubscriber implements Subscription, Runnable { final Supplier bufferSupplier; final long timespan; final long timeskip; @@ -273,12 +268,10 @@ static final class BufferSkipBoundedSubscriber buffers; - volatile boolean stop; - public BufferSkipBoundedSubscriber(Subscriber actual, Supplier bufferSupplier, long timespan, long timeskip, TimeUnit unit, Worker w) { - this.actual = actual; + super(actual, new MpscLinkedQueue<>()); this.bufferSupplier = bufferSupplier; this.timespan = timespan; this.timeskip = timeskip; @@ -313,10 +306,12 @@ public void onSubscribe(Subscription s) { } buffers.add(b); + + actual.onSubscribe(this); + s.request(Long.MAX_VALUE); + w.schedulePeriodically(this, timeskip, timeskip, unit); - - actual.onSubscribe(this); } @Override @@ -328,7 +323,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - stop = true; + done = true; w.dispose(); clear(); actual.onError(t); @@ -336,37 +331,22 @@ public void onError(Throwable t) { @Override public void onComplete() { - stop = true; - w.dispose(); List bs; synchronized (this) { bs = new ArrayList<>(buffers); buffers.clear(); } - long r = get(); - for (U u : bs) { - if (r != 0L) { - actual.onNext(u); - if (r != Long.MAX_VALUE) { - r = addAndGet(-1); - } - } else { - actual.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); - return; - } - + bs.forEach(b -> queue.add(b)); + done = true; + if (enter()) { + drainMaxLoop(queue, actual, false, w); } - - actual.onComplete(); } @Override public void request(long n) { - if (SubscriptionHelper.validateRequest(n)) { - return; - } - BackpressureHelper.add(this, n); + requested(n); } @Override @@ -384,7 +364,7 @@ void clear() { @Override public void run() { - if (stop) { + if (cancelled) { return; } U b; @@ -404,7 +384,7 @@ public void run() { } synchronized (this) { - if (stop) { + if (cancelled) { return; } buffers.add(b); @@ -415,26 +395,19 @@ public void run() { buffers.remove(b); } - long r = get(); - - if (r != 0L) { - actual.onNext(b); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } - } else { - cancel(); - actual.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); - } - + fastpathOrderedEmitMax(b, false, w); }, timespan, unit); } + + @Override + public boolean accept(Subscriber a, U v) { + a.onNext(v); + return true; + } } - static final class BufferExactBoundedSubscriber> extends AtomicLong implements Subscriber, Subscription, Runnable { - /** */ - private static final long serialVersionUID = -1778453504578862865L; - final Subscriber actual; + static final class BufferExactBoundedSubscriber> + extends QueueDrainSubscriber implements Subscription, Runnable, Disposable { final Supplier bufferSupplier; final long timespan; final TimeUnit unit; @@ -451,13 +424,13 @@ static final class BufferExactBoundedSubscriber actual, Supplier bufferSupplier, long timespan, TimeUnit unit, int maxSize, boolean restartOnMaxSize, Worker w) { - this.actual = actual; + super(actual, new MpscLinkedQueue<>()); this.bufferSupplier = bufferSupplier; this.timespan = timespan; this.unit = unit; @@ -493,9 +466,11 @@ public void onSubscribe(Subscription s) { buffer = b; - timer = w.schedulePeriodically(this, timespan, timespan, unit); - actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + + timer = w.schedulePeriodically(this, timespan, timespan, unit); } @Override @@ -520,6 +495,7 @@ public void onNext(T t) { timer.dispose(); actual.onNext(b); + fastpathOrderedEmitMax(b, false, this); try { b = bufferSupplier.get(); @@ -563,36 +539,43 @@ public void onComplete() { buffer = null; } - if (b != null) { - long r = get(); - if (r != 0L) { - actual.onNext(b); - } else { - actual.onError(new IllegalStateException("Could not deliver final buffer due to lack of requests")); - return; - } + queue.offer(b); + done = true; + if (enter()) { + drainMaxLoop(queue, actual, false, this); } - actual.onComplete(); } + @Override + public boolean accept(Subscriber a, U v) { + a.onNext(v); + return true; + } + + @Override public void request(long n) { - if (SubscriptionHelper.validateRequest(n)) { - return; - } - - BackpressureHelper.add(this, n); + requested(n); } @Override public void cancel() { + if (!cancelled) { + cancelled = true; + dispose(); + } + } + + @Override + public void dispose() { w.dispose(); synchronized (this) { buffer = null; } s.cancel(); } - + + @Override public void run() { U next; @@ -620,17 +603,8 @@ public void run() { } buffer = next; } - - long r = get(); - if (r != 0L) { - actual.onNext(current); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } - } else { - cancel(); - actual.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); - } + + fastpathOrderedEmitMax(current, false, this); } } } diff --git a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java new file mode 100644 index 0000000000..7741068ad3 --- /dev/null +++ b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java @@ -0,0 +1,246 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import java.util.Queue; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscriber; + +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.QueueDrain; + +/** + * Abstract base class for subscribers that hold another subscriber, a queue + * and requires queue-drain behavior. + * + * @param the source type to which this subscriber will be subscribed + * @param the value type in the queue + * @param the value type the child subscriber accepts + */ +public abstract class QueueDrainSubscriber extends QueueDrainSubscriberPad4 implements Subscriber, QueueDrain { + protected final Subscriber actual; + protected final Queue queue; + + protected volatile boolean cancelled; + + protected volatile boolean done; + protected Throwable error; + + public QueueDrainSubscriber(Subscriber actual, Queue queue) { + this.actual = actual; + this.queue = queue; + } + + @Override + public final boolean cancelled() { + return cancelled; + } + + @Override + public final boolean done() { + return done; + } + + @Override + public final boolean enter() { + return WIP.getAndIncrement(this) == 0; + } + + protected final void fastpathEmit(U value, boolean delayError) { + final Subscriber s = actual; + final Queue q = queue; + + if (wip == 0 && WIP.compareAndSet(this, 0, 1)) { + long r = requested; + if (r != 0L) { + if (accept(s, value)) { + if (r != Long.MAX_VALUE) { + produced(1); + } + } + if (leave(-1) == 0) { + return; + } + } + q.offer(value); + } else { + q.offer(value); + if (!enter()) { + return; + } + } + drainLoop(q, s, delayError); + } + + protected final void fastpathEmitMax(U value, boolean delayError, Disposable dispose) { + final Subscriber s = actual; + final Queue q = queue; + + if (wip == 0 && WIP.compareAndSet(this, 0, 1)) { + long r = requested; + if (r != 0L) { + if (accept(s, value)) { + if (r != Long.MAX_VALUE) { + produced(1); + } + } + if (leave(-1) == 0) { + return; + } + } else { + dispose.dispose(); + s.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); + return; + } + } else { + q.offer(value); + if (!enter()) { + return; + } + } + drainMaxLoop(q, s, delayError, dispose); + } + + protected final void fastpathOrderedEmitMax(U value, boolean delayError, Disposable dispose) { + final Subscriber s = actual; + final Queue q = queue; + + if (wip == 0 && WIP.compareAndSet(this, 0, 1)) { + long r = requested; + if (r != 0L) { + if (q.isEmpty()) { + if (accept(s, value)) { + if (r != Long.MAX_VALUE) { + produced(1); + } + } + if (leave(-1) == 0) { + return; + } + } else { + q.offer(value); + } + } else { + cancelled = true; + dispose.dispose(); + s.onError(new IllegalStateException("Could not emit buffer due to lack of requests")); + return; + } + } else { + q.offer(value); + if (!enter()) { + return; + } + } + drainMaxLoop(q, s, delayError, dispose); + } + + /** + * Makes sure the fast-path emits in order. + * @param value + * @param delayError + */ + protected final void fastpathOrderedEmit(U value, boolean delayError) { + final Subscriber s = actual; + final Queue q = queue; + + if (wip == 0 && WIP.compareAndSet(this, 0, 1)) { + if (q.isEmpty()) { + long r = requested; + if (r != 0L) { + if (accept(s, value)) { + if (r != Long.MAX_VALUE) { + produced(1); + } + } + if (leave(-1) == 0) { + return; + } + } + } + q.offer(value); + } else { + q.offer(value); + if (!enter()) { + return; + } + } + drainLoop(q, s, delayError); + } + + @Override + public final Throwable error() { + return error; + } + + @Override + public final int leave(int m) { + return WIP.addAndGet(this, m); + } + + @Override + public final long requested() { + return requested; + } + + @Override + public final long produced(long n) { + return REQUESTED.addAndGet(this, n); + } + + public final void requested(long n) { + if (SubscriptionHelper.validateRequest(n)) { + return; + } + REQUESTED.addAndGet(this, n); + } +} + +// ------------------------------------------------------------------- +// Padding superclasses +//------------------------------------------------------------------- + +/** Pads the header away from other fields. */ +class QueueDrainSubscriberPad0 { + volatile long p1, p2, p3, p4, p5, p6, p7; + volatile long p8, p9, p10, p11, p12, p13, p14, p15; +} + +/** The WIP counter. */ +class QueueDrainSubscriberWip extends QueueDrainSubscriberPad0 { + protected volatile int wip; + protected static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(QueueDrainSubscriberWip.class, "wip"); +} + +/** Pads away the wip from the other fields. */ +class QueueDrainSubscriberPad2 extends QueueDrainSubscriberWip { + volatile long p1a, p2a, p3a, p4a, p5a, p6a, p7a; + volatile long p8a, p9a, p10a, p11a, p12a, p13a, p14a, p15a; +} + +/** Contains the requested field. */ +class QueueDrainSubscriberPad3 extends QueueDrainSubscriberPad2 { + protected volatile long requested; + protected static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(QueueDrainSubscriberPad3.class, "requested"); +} + +/** Pads away the requested from the other fields. */ +class QueueDrainSubscriberPad4 extends QueueDrainSubscriberPad3 { + volatile long q1, q2, q3, q4, q5, q6, q7; + volatile long q8, q9, q10, q11, q12, q13, q14, q15; +} diff --git a/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java b/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java index ccfdc9ffa2..26c484c0cf 100644 --- a/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java +++ b/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java @@ -37,6 +37,10 @@ public void cancel() { /** * Sets the empty subscription instance on the subscriber and then * calls onError with the supplied error. + * + *

Make sure this is only called if the subscriber hasn't received a + * subscription already (there is no way of telling this). + * * @param e the error to deliver to the subscriber * @param s the target subscriber */ @@ -48,6 +52,10 @@ public static void error(Throwable e, Subscriber s) { /** * Sets the empty subscription instance on the subscriber and then * calls onComplete. + * + *

Make sure this is only called if the subscriber hasn't received a + * subscription already (there is no way of telling this). + * * @param s the target subscriber */ public static void complete(Subscriber s) { diff --git a/src/main/java/io/reactivex/internal/util/QueueDrain.java b/src/main/java/io/reactivex/internal/util/QueueDrain.java new file mode 100644 index 0000000000..ae70021474 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/QueueDrain.java @@ -0,0 +1,167 @@ +package io.reactivex.internal.util; + +import java.util.Queue; + +import org.reactivestreams.Subscriber; + +import io.reactivex.disposables.Disposable; + +public interface QueueDrain { + + boolean cancelled(); + + boolean done(); + + Throwable error(); + + boolean enter(); + + long requested(); + + long produced(long n); + + /** + * Adds m to the wip counter. + * @param m + * @return + */ + int leave(int m); + + /** + * Accept the value and return true if forwarded. + * @param a + * @param v + * @return + */ + boolean accept(Subscriber a, T v); + + default void drainLoop(Queue q, Subscriber a, boolean delayError) { + + int missed = 1; + + for (;;) { + if (checkTerminated(done(), q.isEmpty(), a, delayError, q)) { + return; + } + + long r = requested(); + boolean unbounded = r == Long.MAX_VALUE; + long e = 0L; + + while (e != r) { + boolean d = done(); + T v = q.poll(); + + boolean empty = v == null; + + if (checkTerminated(d, empty, a, delayError, q)) { + return; + } + + if (empty) { + break; + } + + if (accept(a, v)) { + e++; + } + } + + if (e != 0L && !unbounded) { + produced(e); + } + + missed = leave(-missed); + if (missed == 0) { + break; + } + } + } + + /** + * Drain the queue but give up with an error if there aren't enough requests. + * @param q + * @param a + * @param delayError + */ + default void drainMaxLoop(Queue q, Subscriber a, boolean delayError, + Disposable dispose) { + int missed = 1; + + for (;;) { + for (;;) { + boolean d = done(); + T v = q.poll(); + + boolean empty = v == null; + + if (checkTerminated(d, empty, a, delayError, q)) { + if (dispose != null) { + dispose.dispose(); + } + return; + } + + if (empty) { + break; + } + + long r = requested(); + if (r != 0L) { + if (accept(a, v)) { + if (r != Long.MAX_VALUE) { + r = produced(1); + } + } + } else { + q.clear(); + if (dispose != null) { + dispose.dispose(); + } + a.onError(new IllegalStateException("Could not emit value due to lack of requests.")); + return; + } + } + + missed = leave(-missed); + if (missed == 0) { + break; + } + } + } + + default boolean checkTerminated(boolean d, boolean empty, + Subscriber s, boolean delayError, Queue q) { + if (cancelled()) { + q.clear(); + return true; + } + + if (d) { + if (delayError) { + if (empty) { + Throwable err = error(); + if (err != null) { + s.onError(err); + } else { + s.onComplete(); + } + return true; + } + } else { + Throwable err = error(); + if (err != null) { + q.clear(); + s.onError(err); + return true; + } else + if (empty) { + s.onComplete(); + return true; + } + } + } + + return false; + } +}