diff --git a/src/main/java/rx/internal/operators/BufferUntilSubscriberV2.java b/src/main/java/rx/internal/operators/BufferUntilSubscriberV2.java new file mode 100644 index 0000000000..68c0568a1e --- /dev/null +++ b/src/main/java/rx/internal/operators/BufferUntilSubscriberV2.java @@ -0,0 +1,348 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.Queue; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.exceptions.*; +import rx.functions.*; +import rx.internal.util.atomic.*; +import rx.internal.util.unsafe.*; +import rx.subjects.Subject; +import rx.subscriptions.Subscriptions; + +/** + * A Subject variant which buffers events until a single Subscriber arrives and replays them to it + * and potentially switches to direct delivery once the Subscriber caught up and requested an unlimited + * amount. In this case, the buffered values are no longer retained. If the Subscriber + * requests a limited amount, queueing is involved and only those values are retained which + * weren't requested by the Subscriber at that time. + */ +public final class BufferUntilSubscriberV2 extends Subject { + + /** + * Constructs an empty BufferUntilSubscriber instance with the default capacity hint of 16 elements. + * + * @return the created BufferUntilSubscriber instance + */ + public static BufferUntilSubscriberV2 create() { + return create(16); + } + /** + * Constructs an empty BufferUntilSubscriber instance with a capacity hint. + *

The capacity hint determines the internal queue's island size: the larger + * it is the less frequent allocation will happen if there is no subscriber + * or the subscriber hasn't caught up. + * @param capacityHint the capacity hint for the internal queue + * @return the created BufferUntilSubscriber instance + */ + public static BufferUntilSubscriberV2 create(int capacityHint) { + State state = new State(capacityHint); + OnSubscribeBUS onSubscribe = new OnSubscribeBUS(state); + return new BufferUntilSubscriberV2(onSubscribe, state); + } + + final State state; + + private BufferUntilSubscriberV2(OnSubscribe onSubscribe, State state) { + super(onSubscribe); + this.state = state; + } + + @Override + public void onNext(T t) { + state.onNext(t); + } + + @Override + public void onError(Throwable e) { + state.onError(e); + } + + @Override + public void onCompleted() { + state.onCompleted(); + } + + @Override + public boolean hasObservers() { + return state.subscriber.get() != null; + } + + /** + * The single-consumption replaying state. + * + * @param the value type + */ + static final class State extends AtomicLong implements Producer, Observer, Action0 { + /** */ + private static final long serialVersionUID = -9044104859202255786L; + /** The single subscriber. */ + final AtomicReference> subscriber; + /** The queue holding values until the subscriber arrives and catches up. */ + final Queue queue; + /** JCTools queues don't accept nulls. */ + final NotificationLite nl; + /** In case the source emitted an error. */ + Throwable error; + /** Indicates the source has terminated. */ + volatile boolean done; + /** Emitter loop: emitting indicator. Guarded by this. */ + boolean emitting; + /** Emitter loop: missed emission indicator. Guarded by this. */ + boolean missed; + /** Indicates the queue can be bypassed because the child has caught up with the replay. */ + volatile boolean caughtUp; + /** + * Constructor. + * @param capacityHint indicates how large each island in the Spsc queue should be to + * reduce allocation frequency + */ + public State(int capacityHint) { + this.nl = NotificationLite.instance(); + this.subscriber = new AtomicReference>(); + Queue q; + if (capacityHint > 1) { + q = UnsafeAccess.isUnsafeAvailable() + ? new SpscUnboundedArrayQueue(capacityHint) + : new SpscUnboundedAtomicArrayQueue(capacityHint); + } else { + q = UnsafeAccess.isUnsafeAvailable() + ? new SpscLinkedQueue() + : new SpscLinkedAtomicQueue(); + } + this.queue = q; + } + + @Override + public void onNext(T t) { + if (!done) { + if (!caughtUp) { + boolean stillReplay = false; + /* + * We need to offer while holding the lock because + * we have to atomically switch caughtUp to true + * that can only happen if there isn't any concurrent + * offer() happening while the emission is in replayLoop(). + */ + synchronized (this) { + if (!caughtUp) { + queue.offer(nl.next(t)); + stillReplay = true; + } + } + if (stillReplay) { + replay(); + return; + } + } + Subscriber s = subscriber.get(); + try { + s.onNext(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.onError(OnErrorThrowable.addValueAsLastCause(ex, t)); + } + } + } + @Override + public void onError(Throwable e) { + if (!done) { + error = e; + done = true; + if (!caughtUp) { + boolean stillReplay = false; + synchronized (this) { + stillReplay = !caughtUp; + } + if (stillReplay) { + replay(); + return; + } + } + subscriber.get().onError(e); + } + } + @Override + public void onCompleted() { + if (!done) { + done = true; + if (!caughtUp) { + boolean stillReplay = false; + synchronized (this) { + stillReplay = !caughtUp; + } + if (stillReplay) { + replay(); + return; + } + } + subscriber.get().onCompleted(); + } + } + + @Override + public void request(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required"); + } else + if (n > 0L) { + BackpressureUtils.getAndAddRequest(this, n); + replay(); + } else + if (done) { // terminal events can be delivered for zero requests + replay(); + } + } + /** + * Tries to set the given subscriber if not already set, sending an + * IllegalStateException to the subscriber otherwise. + * @param subscriber + */ + public void setSubscriber(Subscriber subscriber) { + if (this.subscriber.compareAndSet(null, subscriber)) { + subscriber.add(Subscriptions.create(this)); + subscriber.setProducer(this); + } else { + subscriber.onError(new IllegalStateException("Only a single subscriber is allowed")); + } + } + /** + * Tries to replay the contents of the queue. + */ + void replay() { + synchronized (this) { + if (emitting) { + missed = true; + return; + } + emitting = true; + } + Queue q = queue; + for (;;) { + Subscriber s = subscriber.get(); + boolean unlimited = false; + if (s != null) { + boolean d = done; + boolean empty = q.isEmpty(); + + if (checkTerminated(d, empty, s)) { + return; + } + long r = get(); + unlimited = r == Long.MAX_VALUE; + long e = 0L; + + while (r != 0) { + d = done; + Object v = q.poll(); + empty = v == null; + if (checkTerminated(d, empty, s)) { + return; + } + if (empty) { + break; + } + T value = nl.getValue(v); + try { + s.onNext(value); + } catch (Throwable ex) { + q.clear(); + Exceptions.throwIfFatal(ex); + s.onError(OnErrorThrowable.addValueAsLastCause(ex, value)); + return; + } + r--; + e++; + } + if (!unlimited && e != 0L) { + addAndGet(-e); + } + } + + synchronized (this) { + if (!missed) { + if (unlimited && q.isEmpty()) { + caughtUp = true; + } + emitting = false; + return; + } + missed = false; + } + } + } + /** + * Terminates the state by setting the done flag and tries to clear the queue. + * Should be called only when the child unsubscribes + */ + @Override + public void call() { + done = true; + synchronized (this) { + if (emitting) { + return; + } + emitting = true; + } + queue.clear(); + } + /** + * Checks if one of the terminal conditions have been met: child unsubscribed, + * an error happened or the source terminated and the queue is empty + * @param done + * @param empty + * @param s + * @return + */ + boolean checkTerminated(boolean done, boolean empty, Subscriber s) { + if (s.isUnsubscribed()) { + queue.clear(); + return true; + } + if (done) { + Throwable e = error; + if (e != null) { + queue.clear(); + s.onError(e); + return true; + } else + if (empty) { + s.onCompleted(); + return true; + } + } + return false; + } + } + /** + * The OnSubscribe implementation of the BufferUntilSubscriber. + * + * @param the value type + */ + static final class OnSubscribeBUS implements OnSubscribe { + final State state; + public OnSubscribeBUS(State state) { + this.state = state; + } + @Override + public void call(Subscriber child) { + state.setSubscriber(child); + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java index 3b7e1c1cac..3b6f9c089c 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java @@ -154,7 +154,7 @@ void replaceSubject() { child.onNext(producer); } void createNewWindow() { - BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); consumer = bus; producer = bus; } diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java index a764850c79..f0a0a0c1c5 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java @@ -160,7 +160,7 @@ void replaceSubject() { child.onNext(producer); } void createNewWindow() { - BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); consumer = bus; producer = bus; Observable other; diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java index 62763f1948..26111fbfd0 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java @@ -60,7 +60,7 @@ public Subscriber call(Subscriber> child) { final class ExactSubscriber extends Subscriber { final Subscriber> child; int count; - BufferUntilSubscriber window; + BufferUntilSubscriberV2 window; volatile boolean noWindow = true; public ExactSubscriber(Subscriber> child) { /** @@ -107,7 +107,7 @@ void requestMore(long n) { public void onNext(T t) { if (window == null) { noWindow = false; - window = BufferUntilSubscriber.create(); + window = BufferUntilSubscriberV2.create(); child.onNext(window); } window.onNext(t); @@ -242,7 +242,7 @@ public void onCompleted() { } CountedSubject createCountedSubject() { - final BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + final BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); return new CountedSubject(bus, bus); } } diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java index 82d1474163..e2180bc814 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java @@ -233,7 +233,7 @@ void endWindow(SerializedSubject window) { } } SerializedSubject createSerializedSubject() { - BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); return new SerializedSubject(bus, bus); } } diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithTime.java b/src/main/java/rx/internal/operators/OperatorWindowWithTime.java index cac94c5ba0..bcfa2fdac2 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithTime.java @@ -214,7 +214,7 @@ boolean replaceSubject() { unsubscribe(); return false; } - BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); state = state.create(bus, bus); child.onNext(bus); return true; @@ -492,7 +492,7 @@ void terminateChunk(CountedSerializedSubject chunk) { } } CountedSerializedSubject createCountedSerializedSubject() { - BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); return new CountedSerializedSubject(bus, bus); } } diff --git a/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java new file mode 100644 index 0000000000..ee9cf180d2 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java @@ -0,0 +1,252 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscUnboundedAtomicArrayQueue.java + */ +package rx.internal.util.atomic; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import rx.internal.util.unsafe.*; + +public class SpscUnboundedAtomicArrayQueue extends AbstractQueue implements QueueProgressIndicators { + static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); + protected final AtomicLong producerIndex; + protected int producerLookAheadStep; + protected long producerLookAhead; + protected int producerMask; + protected AtomicReferenceArray producerBuffer; + protected int consumerMask; + protected AtomicReferenceArray consumerBuffer; + protected final AtomicLong consumerIndex; + private static final Object HAS_NEXT = new Object(); + + public SpscUnboundedAtomicArrayQueue(final int bufferSize) { + int p2capacity = Pow2.roundToPowerOfTwo(bufferSize); + int mask = p2capacity - 1; + AtomicReferenceArray buffer = new AtomicReferenceArray(p2capacity + 1); + producerBuffer = buffer; + producerMask = mask; + adjustLookAheadStep(p2capacity); + consumerBuffer = buffer; + consumerMask = mask; + producerLookAhead = mask - 1; // we know it's all empty to start with + producerIndex = new AtomicLong(); + consumerIndex = new AtomicLong(); + soProducerIndex(0L); + } + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single producer thread use only. + */ + @Override + public final boolean offer(final E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + // local load of field to avoid repeated loads after volatile reads + final AtomicReferenceArray buffer = producerBuffer; + final long index = lpProducerIndex(); + final int mask = producerMask; + final int offset = calcWrappedOffset(index, mask); + if (index < producerLookAhead) { + return writeToQueue(buffer, e, index, offset); + } else { + final int lookAheadStep = producerLookAheadStep; + // go around the buffer or resize if full (unless we hit max capacity) + int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask); + if (null == lvElement(buffer, lookAheadElementOffset)) {// LoadLoad + producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room + return writeToQueue(buffer, e, index, offset); + } else if (null != lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full + return writeToQueue(buffer, e, index, offset); + } else { + resize(buffer, index, offset, e, mask); // add a buffer and link old to new + return true; + } + } + } + + private boolean writeToQueue(final AtomicReferenceArray buffer, final E e, final long index, final int offset) { + soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms + soElement(buffer, offset, e);// StoreStore + return true; + } + + private void resize(final AtomicReferenceArray oldBuffer, final long currIndex, final int offset, final E e, + final long mask) { + final int capacity = oldBuffer.length(); + final AtomicReferenceArray newBuffer = new AtomicReferenceArray(capacity); + producerBuffer = newBuffer; + producerLookAhead = currIndex + mask - 1; + soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms + soElement(newBuffer, offset, e);// StoreStore + soNext(oldBuffer, newBuffer); + soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is + // inserted + } + + private void soNext(AtomicReferenceArray curr, AtomicReferenceArray next) { + soElement(curr, calcDirectOffset(curr.length() - 1), next); + } + @SuppressWarnings("unchecked") + private AtomicReferenceArray lvNext(AtomicReferenceArray curr) { + return (AtomicReferenceArray)lvElement(curr, calcDirectOffset(curr.length() - 1)); + } + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public final E poll() { + // local load of field to avoid repeated loads after volatile reads + final AtomicReferenceArray buffer = consumerBuffer; + final long index = lpConsumerIndex(); + final int mask = consumerMask; + final int offset = calcWrappedOffset(index, mask); + final Object e = lvElement(buffer, offset);// LoadLoad + boolean isNextBuffer = e == HAS_NEXT; + if (null != e && !isNextBuffer) { + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms + soElement(buffer, offset, null);// StoreStore + return (E) e; + } else if (isNextBuffer) { + return newBufferPoll(lvNext(buffer), index, mask); + } + + return null; + } + + @SuppressWarnings("unchecked") + private E newBufferPoll(AtomicReferenceArray nextBuffer, final long index, final int mask) { + consumerBuffer = nextBuffer; + final int offsetInNew = calcWrappedOffset(index, mask); + final E n = (E) lvElement(nextBuffer, offsetInNew);// LoadLoad + if (null == n) { + return null; + } else { + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms + soElement(nextBuffer, offsetInNew, null);// StoreStore + return n; + } + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public final E peek() { + final AtomicReferenceArray buffer = consumerBuffer; + final long index = lpConsumerIndex(); + final int mask = consumerMask; + final int offset = calcWrappedOffset(index, mask); + final Object e = lvElement(buffer, offset);// LoadLoad + if (e == HAS_NEXT) { + return newBufferPeek(lvNext(buffer), index, mask); + } + + return (E) e; + } + + @SuppressWarnings("unchecked") + private E newBufferPeek(AtomicReferenceArray nextBuffer, final long index, final int mask) { + consumerBuffer = nextBuffer; + final int offsetInNew = calcWrappedOffset(index, mask); + return (E) lvElement(nextBuffer, offsetInNew);// LoadLoad + } + + @Override + public final int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + private void adjustLookAheadStep(int capacity) { + producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); + } + + private long lvProducerIndex() { + return producerIndex.get(); + } + + private long lvConsumerIndex() { + return consumerIndex.get(); + } + + private long lpProducerIndex() { + return producerIndex.get(); + } + + private long lpConsumerIndex() { + return consumerIndex.get(); + } + + private void soProducerIndex(long v) { + producerIndex.lazySet(v); + } + + private void soConsumerIndex(long v) { + consumerIndex.lazySet(v); + } + + private static final int calcWrappedOffset(long index, int mask) { + return calcDirectOffset((int)index & mask); + } + private static final int calcDirectOffset(int index) { + return index; + } + private static final void soElement(AtomicReferenceArray buffer, int offset, Object e) { + buffer.lazySet(offset, e); + } + + private static final Object lvElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); + } + + @Override + public long currentProducerIndex() { + return lvProducerIndex(); + } + + @Override + public long currentConsumerIndex() { + return lvConsumerIndex(); + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/unsafe/QueueProgressIndicators.java b/src/main/java/rx/internal/util/unsafe/QueueProgressIndicators.java new file mode 100644 index 0000000000..185f0bd612 --- /dev/null +++ b/src/main/java/rx/internal/util/unsafe/QueueProgressIndicators.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/QueueProgressIndicators.java + */ +package rx.internal.util.unsafe; + +/** + * This interface is provided for monitoring purposes only and is only available on queues where it is easy to + * provide it. The producer/consumer progress indicators usually correspond with the number of elements + * offered/polled, but they are not guaranteed to maintain that semantic. + * + * @author nitsanw + * + */ +public interface QueueProgressIndicators { + + /** + * This method has no concurrent visibility semantics. The value returned may be negative. Under normal + * circumstances 2 consecutive calls to this method can offer an idea of progress made by producer threads + * by subtracting the 2 results though in extreme cases (if producers have progressed by more than 2^64) + * this may also fail.
+ * This value will normally indicate number of elements passed into the queue, but may under some + * circumstances be a derivative of that figure. This method should not be used to derive size or + * emptiness. + * + * @return the current value of the producer progress index + */ + public long currentProducerIndex(); + + /** + * This method has no concurrent visibility semantics. The value returned may be negative. Under normal + * circumstances 2 consecutive calls to this method can offer an idea of progress made by consumer threads + * by subtracting the 2 results though in extreme cases (if consumers have progressed by more than 2^64) + * this may also fail.
+ * This value will normally indicate number of elements taken out of the queue, but may under some + * circumstances be a derivative of that figure. This method should not be used to derive size or + * emptiness. + * + * @return the current value of the consumer progress index + */ + public long currentConsumerIndex(); +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java new file mode 100644 index 0000000000..c579864549 --- /dev/null +++ b/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java @@ -0,0 +1,290 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscUnboundedArrayQueue.java + */ +package rx.internal.util.unsafe; + +import static rx.internal.util.unsafe.UnsafeAccess.*; + +import java.lang.reflect.Field; +import java.util.AbstractQueue; +import java.util.Iterator; + +abstract class SpscUnboundedArrayQueueProducerFields extends AbstractQueue { + protected long producerIndex; +} + +abstract class SpscUnboundedArrayQueueProducerColdFields extends SpscUnboundedArrayQueueProducerFields { + protected int producerLookAheadStep; + protected long producerLookAhead; + protected long producerMask; + protected E[] producerBuffer; +} + +abstract class SpscUnboundedArrayQueueL2Pad extends SpscUnboundedArrayQueueProducerColdFields { + long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12; +} + +abstract class SpscUnboundedArrayQueueConsumerColdField extends SpscUnboundedArrayQueueL2Pad { + protected long consumerMask; + protected E[] consumerBuffer; +} + +abstract class SpscUnboundedArrayQueueConsumerField extends SpscUnboundedArrayQueueConsumerColdField { + protected long consumerIndex; +} + +public class SpscUnboundedArrayQueue extends SpscUnboundedArrayQueueConsumerField + implements QueueProgressIndicators{ + static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); + private final static long P_INDEX_OFFSET; + private final static long C_INDEX_OFFSET; + private static final long REF_ARRAY_BASE; + private static final int REF_ELEMENT_SHIFT; + private static final Object HAS_NEXT = new Object(); + static { + final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); + if (4 == scale) { + REF_ELEMENT_SHIFT = 2; + } else if (8 == scale) { + REF_ELEMENT_SHIFT = 3; + } else { + throw new IllegalStateException("Unknown pointer size"); + } + // Including the buffer pad in the array base offset + REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class); + try { + Field iField = SpscUnboundedArrayQueueProducerFields.class.getDeclaredField("producerIndex"); + P_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + try { + Field iField = SpscUnboundedArrayQueueConsumerField.class.getDeclaredField("consumerIndex"); + C_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + public SpscUnboundedArrayQueue(final int bufferSize) { + int p2capacity = Pow2.roundToPowerOfTwo(bufferSize); + long mask = p2capacity - 1; + E[] buffer = (E[]) new Object[p2capacity + 1]; + producerBuffer = buffer; + producerMask = mask; + adjustLookAheadStep(p2capacity); + consumerBuffer = buffer; + consumerMask = mask; + producerLookAhead = mask - 1; // we know it's all empty to start with + soProducerIndex(0l); + } + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single producer thread use only. + */ + @Override + public final boolean offer(final E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + // local load of field to avoid repeated loads after volatile reads + final E[] buffer = producerBuffer; + final long index = producerIndex; + final long mask = producerMask; + final long offset = calcWrappedOffset(index, mask); + if (index < producerLookAhead) { + return writeToQueue(buffer, e, index, offset); + } else { + final int lookAheadStep = producerLookAheadStep; + // go around the buffer or resize if full (unless we hit max capacity) + long lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask); + if (null == lvElement(buffer, lookAheadElementOffset)) {// LoadLoad + producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room + return writeToQueue(buffer, e, index, offset); + } else if (null != lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full + return writeToQueue(buffer, e, index, offset); + } else { + resize(buffer, index, offset, e, mask); // add a buffer and link old to new + return true; + } + } + } + + private boolean writeToQueue(final E[] buffer, final E e, final long index, final long offset) { + soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms + soElement(buffer, offset, e);// StoreStore + return true; + } + + @SuppressWarnings("unchecked") + private void resize(final E[] oldBuffer, final long currIndex, final long offset, final E e, + final long mask) { + final int capacity = oldBuffer.length; + final E[] newBuffer = (E[]) new Object[capacity]; + producerBuffer = newBuffer; + producerLookAhead = currIndex + mask - 1; + soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms + soElement(newBuffer, offset, e);// StoreStore + soNext(oldBuffer, newBuffer); + soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is + // inserted + } + + private void soNext(E[] curr, E[] next) { + soElement(curr, calcDirectOffset(curr.length -1), next); + } + @SuppressWarnings("unchecked") + private E[] lvNext(E[] curr) { + return (E[]) lvElement(curr, calcDirectOffset(curr.length -1)); + } + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public final E poll() { + // local load of field to avoid repeated loads after volatile reads + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + final long offset = calcWrappedOffset(index, mask); + final Object e = lvElement(buffer, offset);// LoadLoad + boolean isNextBuffer = e == HAS_NEXT; + if (null != e && !isNextBuffer) { + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms + soElement(buffer, offset, null);// StoreStore + return (E) e; + } else if (isNextBuffer) { + return newBufferPoll(lvNext(buffer), index, mask); + } + + return null; + } + + @SuppressWarnings("unchecked") + private E newBufferPoll(E[] nextBuffer, final long index, final long mask) { + consumerBuffer = nextBuffer; + final long offsetInNew = calcWrappedOffset(index, mask); + final E n = (E) lvElement(nextBuffer, offsetInNew);// LoadLoad + if (null == n) { + return null; + } else { + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms + soElement(nextBuffer, offsetInNew, null);// StoreStore + return n; + } + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public final E peek() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + final long offset = calcWrappedOffset(index, mask); + final Object e = lvElement(buffer, offset);// LoadLoad + if (e == HAS_NEXT) { + return newBufferPeek(lvNext(buffer), index, mask); + } + + return (E) e; + } + + @SuppressWarnings("unchecked") + private E newBufferPeek(E[] nextBuffer, final long index, final long mask) { + consumerBuffer = nextBuffer; + final long offsetInNew = calcWrappedOffset(index, mask); + return (E) lvElement(nextBuffer, offsetInNew);// LoadLoad + } + + @Override + public final int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + private void adjustLookAheadStep(int capacity) { + producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); + } + + private long lvProducerIndex() { + return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + } + + private long lvConsumerIndex() { + return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); + } + + private void soProducerIndex(long v) { + UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); + } + + private void soConsumerIndex(long v) { + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); + } + + private static final long calcWrappedOffset(long index, long mask) { + return calcDirectOffset(index & mask); + } + private static final long calcDirectOffset(long index) { + return REF_ARRAY_BASE + (index << REF_ELEMENT_SHIFT); + } + private static final void soElement(Object[] buffer, long offset, Object e) { + UNSAFE.putOrderedObject(buffer, offset, e); + } + + private static final Object lvElement(E[] buffer, long offset) { + return UNSAFE.getObjectVolatile(buffer, offset); + } + + @Override + public long currentProducerIndex() { + return lvProducerIndex(); + } + + @Override + public long currentConsumerIndex() { + return lvConsumerIndex(); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java b/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java index 50be759581..d36d353f73 100644 --- a/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java +++ b/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java @@ -15,20 +15,19 @@ */ package rx.internal.operators; -import org.junit.Assert; -import org.junit.Test; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; + import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.exceptions.TestException; +import rx.functions.*; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - public class BufferUntilSubscriberTest { @Test @@ -82,4 +81,98 @@ public void call(List integers) { else Assert.assertEquals(NITERS, counter.get()); } -} + + @Test + public void testBackpressure() { + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); + for (int i = 0; i < 32; i++) { + bus.onNext(i); + } + + TestSubscriber ts = TestSubscriber.create(0); + + bus.subscribe(ts); + + ts.assertValueCount(0); + ts.assertNoTerminalEvent(); + + ts.requestMore(10); + + ts.assertValueCount(10); + + ts.requestMore(22); + ts.assertValueCount(32); + + Assert.assertFalse(bus.state.caughtUp); + + ts.requestMore(Long.MAX_VALUE); + + Assert.assertTrue(bus.state.caughtUp); + + for (int i = 32; i < 64; i++) { + bus.onNext(i); + } + bus.onCompleted(); + + ts.assertValueCount(64); + ts.assertNoErrors(); + ts.assertCompleted(); + } + @Test + public void testErrorCutsAhead() { + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); + for (int i = 0; i < 32; i++) { + bus.onNext(i); + } + bus.onError(new TestException()); + + TestSubscriber ts = TestSubscriber.create(0); + + bus.subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } + @Test + public void testErrorCutsAheadAfterSubscribed() { + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); + for (int i = 0; i < 32; i++) { + bus.onNext(i); + } + + TestSubscriber ts = TestSubscriber.create(0); + + bus.subscribe(ts); + + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + + bus.onError(new TestException()); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } + @Test + public void testUnsubscribeClearsQueue() { + BufferUntilSubscriberV2 bus = BufferUntilSubscriberV2.create(); + for (int i = 0; i < 32; i++) { + bus.onNext(i); + } + + TestSubscriber ts = TestSubscriber.create(0); + ts.unsubscribe(); + + bus.subscribe(ts); + + ts.assertNoTerminalEvent(); + ts.assertNoValues(); + + Assert.assertTrue(bus.state.queue.isEmpty()); + + bus.onNext(32); + + Assert.assertTrue(bus.state.queue.isEmpty()); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index c04b6dd910..488cf16e06 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -728,7 +728,7 @@ public Observable call(Integer t) { Observable observable = Observable.just(t) .subscribeOn(sch) ; - Subject subject = BufferUntilSubscriber.create(); + Subject subject = BufferUntilSubscriberV2.create(); observable.subscribe(subject); return subject; } @@ -822,4 +822,4 @@ public Observable call(Integer t) { } } -} +} \ No newline at end of file