From f78903f4e5ddbe5a18820efec03b4382568a8565 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 29 Jan 2015 10:42:47 +0100 Subject: [PATCH] Updating queue code from JCTools --- .../unsafe/ConcurrentCircularArrayQueue.java | 24 +++-- ...ConcurrentSequencedCircularArrayQueue.java | 7 +- .../util/unsafe/MessagePassingQueue.java | 14 +-- .../internal/util/unsafe/MpmcArrayQueue.java | 89 +++++++++++-------- .../internal/util/unsafe/SpmcArrayQueue.java | 45 ++++++---- .../internal/util/unsafe/SpscArrayQueue.java | 68 ++++++++------ .../rx/internal/util/JCToolsQueueTests.java | 52 +++++++++++ 7 files changed, 202 insertions(+), 97 deletions(-) create mode 100644 src/test/java/rx/internal/util/JCToolsQueueTests.java diff --git a/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java b/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java index 59a5800859..86a8db0b19 100644 --- a/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java @@ -60,17 +60,16 @@ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircular REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << (REF_ELEMENT_SHIFT - SPARSE_SHIFT)); } - protected final int capacity; protected final long mask; // @Stable :( protected final E[] buffer; @SuppressWarnings("unchecked") public ConcurrentCircularArrayQueue(int capacity) { - this.capacity = Pow2.roundToPowerOfTwo(capacity); - mask = this.capacity - 1; + int actualCapacity = Pow2.roundToPowerOfTwo(capacity); + mask = actualCapacity - 1; // pad data on either end with some empty slots. - buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; + buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; } /** @@ -78,9 +77,16 @@ public ConcurrentCircularArrayQueue(int capacity) { * @return the offset in bytes within the array for a given index. */ protected final long calcElementOffset(long index) { + return calcElementOffset(index, mask); + } + /** + * @param index desirable element index + * @param mask + * @return the offset in bytes within the array for a given index. + */ + protected final long calcElementOffset(long index, long mask) { return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); } - /** * A plain store (no ordering/fences) of an element to a given offset * @@ -171,4 +177,10 @@ protected final E lvElement(E[] buffer, long offset) { public Iterator iterator() { throw new UnsupportedOperationException(); } -} \ No newline at end of file + @Override + public void clear() { + // we have to test isEmpty because of the weaker poll() guarantee + while (poll() != null || !isEmpty()) + ; + } +} diff --git a/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java b/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java index eb559297ad..eaf824e95e 100644 --- a/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java @@ -35,9 +35,10 @@ public abstract class ConcurrentSequencedCircularArrayQueue extends Concurren public ConcurrentSequencedCircularArrayQueue(int capacity) { super(capacity); + int actualCapacity = (int) (this.mask + 1); // pad data on either end with some empty slots. - sequenceBuffer = new long[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - for (long i = 0; i < this.capacity; i++) { + sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; + for (long i = 0; i < actualCapacity; i++) { soSequence(sequenceBuffer, calcSequenceOffset(i), i); } } @@ -54,4 +55,4 @@ protected final long lvSequence(long[] buffer, long offset) { return UNSAFE.getLongVolatile(buffer, offset); } -} \ No newline at end of file +} diff --git a/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java b/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java index b9f9b60ee3..a6fad8b455 100644 --- a/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java +++ b/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java @@ -33,26 +33,26 @@ interface MessagePassingQueue { /** * Called from a producer thread subject to the restrictions appropriate to the implementation and according to the - * {@link Queue#offer(Object)} interface (but failure to offer doesn't necessitate queue is full). + * {@link Queue#offer(Object)} interface. * * @param message - * @return true if element was inserted into the queue, false if cannot enqueue + * @return true if element was inserted into the queue, false iff full */ boolean offer(M message); /** * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#poll()} interface (barring the hard requirement on null returns). + * the {@link Queue#poll()} interface. * - * @return a message from the queue if one is available, null otherwise(not necessarily empty) + * @return a message from the queue if one is available, null iff empty */ M poll(); /** * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#peek()} interface (barring the hard requirement on null returns). + * the {@link Queue#peek()} interface. * - * @return a message from the queue if one is available, null otherwise(not necessarily empty) + * @return a message from the queue if one is available, null iff empty */ M peek(); @@ -71,4 +71,4 @@ interface MessagePassingQueue { */ boolean isEmpty(); -} \ No newline at end of file +} diff --git a/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java b/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java index 1ce7796104..8333723036 100644 --- a/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java @@ -31,8 +31,8 @@ abstract class MpmcArrayQueueProducerField extends MpmcArrayQueueL1Pad { private final static long P_INDEX_OFFSET; static { try { - P_INDEX_OFFSET = - UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class.getDeclaredField("producerIndex")); + P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class + .getDeclaredField("producerIndex")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } @@ -65,8 +65,8 @@ abstract class MpmcArrayQueueConsumerField extends MpmcArrayQueueL2Pad { private final static long C_INDEX_OFFSET; static { try { - C_INDEX_OFFSET = - UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); + C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class + .getDeclaredField("consumerIndex")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } @@ -87,26 +87,28 @@ protected final boolean casConsumerIndex(long expect, long newValue) { } /** - * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that any and all - * threads may call the offer/poll/peek methods and correctness is maintained.
+ * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that + * any and all threads may call the offer/poll/peek methods and correctness is maintained.
* This implementation follows patterns documented on the package level for False Sharing protection.
* The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original algorithm - * uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on - * Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is - * a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is - * closer structurally to the original, but sadly does not perform as well as this implementation.
+ * href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here). The original + * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in + * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each + * field of the struct. There is a further alternative in the experimental project which uses iteration phase + * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as + * well as this implementation.
* Tradeoffs to keep in mind: *
    - *
  1. Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays. - * We are trading memory to avoid false sharing(active and passive). - *
  2. 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array. - * This is doubling/tripling the memory allocated for the buffer. + *
  3. Padding for false sharing: counter fields and queue fields are all padded as well as either side of + * both arrays. We are trading memory to avoid false sharing(active and passive). + *
  4. 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the + * elements array. This is doubling/tripling the memory allocated for the buffer. *
  5. Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or * equal to the requested capacity. *
* - * @param type of the element stored in the {@link java.util.Queue} + * @param + * type of the element stored in the {@link java.util.Queue} */ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { long p40, p41, p42, p43, p44, p45, p46; @@ -123,10 +125,11 @@ public boolean offer(final E e) { } // local load of field to avoid repeated loads after volatile reads + final long capacity = mask + 1; final long[] lSequenceBuffer = sequenceBuffer; long currentProducerIndex; long seqOffset; - + long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it while (true) { currentProducerIndex = lvProducerIndex(); // LoadLoad seqOffset = calcSequenceOffset(currentProducerIndex); @@ -140,8 +143,10 @@ public boolean offer(final E e) { break; } // failed cas, retry 1 - } else if (delta < 0) { - // poll has not moved this value forward + } else if (delta < 0 && // poll has not moved this value forward + currentProducerIndex - capacity <= cIndex && // test against cached cIndex + currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] return false; } @@ -161,8 +166,9 @@ public boolean offer(final E e) { /** * {@inheritDoc} - * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must - * test producer index when next element is not visible. + *

+ * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll + * and must test producer index when next element is not visible. */ @Override public E poll() { @@ -170,7 +176,7 @@ public E poll() { final long[] lSequenceBuffer = sequenceBuffer; long currentConsumerIndex; long seqOffset; - + long pIndex = -1; // start with bogus value, hope we don't need it while (true) { currentConsumerIndex = lvConsumerIndex();// LoadLoad seqOffset = calcSequenceOffset(currentConsumerIndex); @@ -183,12 +189,10 @@ public E poll() { break; } // failed cas, retry 1 - } else if (delta < 0) { - // COMMENTED OUT: strict empty check. -// if (currentConsumerIndex == lvProducerIndex()) { -// return null; -// } - // next element is not visible, probably empty + } else if (delta < 0 && // slot has not been moved by producer + currentConsumerIndex >= pIndex && // test against cached pIndex + currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] return null; } @@ -202,22 +206,31 @@ public E poll() { // Move sequence ahead by capacity, preparing it for next offer // (seeing this value from a consumer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + capacity);// StoreStore + soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore return e; } @Override public E peek() { - return lpElement(calcElementOffset(lvConsumerIndex())); + long currConsumerIndex; + E e; + do { + currConsumerIndex = lvConsumerIndex(); + // other consumers may have grabbed the element, or queue might be empty + e = lpElement(calcElementOffset(currConsumerIndex)); + // only return null if queue is empty + } while (e == null && currConsumerIndex != lvProducerIndex()); + return e; } @Override public int size() { /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer - * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent - * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index. + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. */ long after = lvConsumerIndex(); while (true) { @@ -229,13 +242,13 @@ public int size() { } } } - + @Override public boolean isEmpty() { - // Order matters! + // Order matters! // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is nothing we - // can do to make this an exact method. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. return (lvConsumerIndex() == lvProducerIndex()); } -} \ No newline at end of file +} diff --git a/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java index 932d0e6460..ebf79f0708 100644 --- a/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java @@ -16,6 +16,8 @@ */ package rx.internal.util.unsafe; +import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; + abstract class SpmcArrayQueueL1Pad extends ConcurrentCircularArrayQueue { long p10, p11, p12, p13, p14, p15, p16; long p30, p31, p32, p33, p34, p35, p36, p37; @@ -30,7 +32,7 @@ abstract class SpmcArrayQueueProducerField extends SpmcArrayQueueL1Pad { static { try { P_INDEX_OFFSET = - UnsafeAccess.UNSAFE.objectFieldOffset(SpmcArrayQueueProducerField.class.getDeclaredField("producerIndex")); + UNSAFE.objectFieldOffset(SpmcArrayQueueProducerField.class.getDeclaredField("producerIndex")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } @@ -42,7 +44,7 @@ protected final long lvProducerIndex() { } protected final void soTail(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); + UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); } public SpmcArrayQueueProducerField(int capacity) { @@ -64,7 +66,7 @@ abstract class SpmcArrayQueueConsumerField extends SpmcArrayQueueL2Pad { static { try { C_INDEX_OFFSET = - UnsafeAccess.UNSAFE.objectFieldOffset(SpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); + UNSAFE.objectFieldOffset(SpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } @@ -80,7 +82,7 @@ protected final long lvConsumerIndex() { } protected final boolean casHead(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); + return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); } } @@ -132,17 +134,18 @@ public boolean offer(final E e) { throw new NullPointerException("Null is not a valid element"); } final E[] lb = buffer; + final long lMask = mask; final long currProducerIndex = lvProducerIndex(); final long offset = calcElementOffset(currProducerIndex); if (null != lvElement(lb, offset)) { - // strict check as per https://github.com/JCTools/JCTools/issues/21#issuecomment-50204120 - int size = (int) (currProducerIndex - lvConsumerIndex()); - if (size == capacity) { + long size = currProducerIndex - lvConsumerIndex(); + + if(size > lMask) { return false; } else { // spin wait for slot to clear, buggers wait freedom - while (null != lvElement(lb, offset)); + while(null != lvElement(lb, offset)); } } spElement(lb, offset, e); @@ -152,14 +155,6 @@ public boolean offer(final E e) { return true; } - /** - * {@inheritDoc} - *

- * Note that we are not doing the the whole poll/tryPoll thing here like we do in MPMC/MPSC, that is because the - * problem we try to solve there is caused by having multiple producers making progress concurrently which can - * create 'bubbles' of claimed but not fully visible elements in the queue. For a single producer the problem - * doesn't exist. - */ @Override public E poll() { long currentConsumerIndex; @@ -188,7 +183,21 @@ public E poll() { @Override public E peek() { - return lvElement(calcElementOffset(lvConsumerIndex())); + long currentConsumerIndex; + final long currProducerIndexCache = lvProducerIndexCache(); + E e; + do { + currentConsumerIndex = lvConsumerIndex(); + if (currentConsumerIndex >= currProducerIndexCache) { + long currProducerIndex = lvProducerIndex(); + if (currentConsumerIndex >= currProducerIndex) { + return null; + } else { + svProducerIndexCache(currProducerIndex); + } + } + } while (null == (e = lvElement(calcElementOffset(currentConsumerIndex)))); + return e; } @Override @@ -217,4 +226,4 @@ public boolean isEmpty() { // something we can fix here. return (lvConsumerIndex() == lvProducerIndex()); } -} \ No newline at end of file +} diff --git a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java index 3410a625f6..6064106503 100644 --- a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java @@ -16,8 +16,10 @@ */ package rx.internal.util.unsafe; +import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; + abstract class SpscArrayQueueColdField extends ConcurrentCircularArrayQueue { - private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctoolts.spsc.max.lookahead.step", 4096); + private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); protected final int lookAheadStep; public SpscArrayQueueColdField(int capacity) { super(capacity); @@ -34,11 +36,11 @@ public SpscArrayQueueL1Pad(int capacity) { } abstract class SpscArrayQueueProducerFields extends SpscArrayQueueL1Pad { - private final static long P_INDEX_OFFSET; + protected final static long P_INDEX_OFFSET; static { try { P_INDEX_OFFSET = - UnsafeAccess.UNSAFE.objectFieldOffset(SpscArrayQueueProducerFields.class.getDeclaredField("producerIndex")); + UNSAFE.objectFieldOffset(SpscArrayQueueProducerFields.class.getDeclaredField("producerIndex")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } @@ -49,9 +51,6 @@ abstract class SpscArrayQueueProducerFields extends SpscArrayQueueL1Pad { public SpscArrayQueueProducerFields(int capacity) { super(capacity); } - protected final long lvProducerIndex() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); - } } abstract class SpscArrayQueueL2Pad extends SpscArrayQueueProducerFields { @@ -65,11 +64,11 @@ public SpscArrayQueueL2Pad(int capacity) { abstract class SpscArrayQueueConsumerField extends SpscArrayQueueL2Pad { protected long consumerIndex; - private final static long C_INDEX_OFFSET; + protected final static long C_INDEX_OFFSET; static { try { C_INDEX_OFFSET = - UnsafeAccess.UNSAFE.objectFieldOffset(SpscArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); + UNSAFE.objectFieldOffset(SpscArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } @@ -77,9 +76,6 @@ abstract class SpscArrayQueueConsumerField extends SpscArrayQueueL2Pad { public SpscArrayQueueConsumerField(int capacity) { super(capacity); } - protected final long lvConsumerIndex() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); - } } abstract class SpscArrayQueueL3Pad extends SpscArrayQueueConsumerField { @@ -92,14 +88,16 @@ public SpscArrayQueueL3Pad(int capacity) { } /** - * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
This implementation is a mashup of the - * Fast Flow algorithm with an optimization of the offer - * method taken from the BQueue algorithm (a - * variation on Fast Flow).
- * For convenience the relevant papers are available in the resources folder:
- * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
- * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
- * This implementation is wait free. + * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. + *

+ * This implementation is a mashup of the Fast Flow + * algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast + * Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
+ * For convenience the relevant papers are available in the resources folder:
+ * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
+ * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
+ *
This implementation is wait free. * * @author nitsanw * @@ -120,14 +118,16 @@ public SpscArrayQueue(final int capacity) { public boolean offer(final E e) { // local load of field to avoid repeated loads after volatile reads final E[] lElementBuffer = buffer; - final long offset = calcElementOffset(producerIndex); + final long index = producerIndex; + final long offset = calcElementOffset(index); if (null != lvElement(lElementBuffer, offset)){ return false; } - producerIndex++; // do increment here so the ordered store give both a barrier - soElement(lElementBuffer, offset, e);// StoreStore + soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(lElementBuffer, offset, e); // StoreStore return true; - } + } + /** * {@inheritDoc} *

@@ -135,14 +135,15 @@ public boolean offer(final E e) { */ @Override public E poll() { - final long offset = calcElementOffset(consumerIndex); + final long index = consumerIndex; + final long offset = calcElementOffset(index); // local load of field to avoid repeated loads after volatile reads final E[] lElementBuffer = buffer; final E e = lvElement(lElementBuffer, offset);// LoadLoad if (null == e) { return null; } - consumerIndex++; // do increment here so the ordered store give both a barrier + soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() soElement(lElementBuffer, offset, null);// StoreStore return e; } @@ -174,4 +175,21 @@ public int size() { } } } + + private void soProducerIndex(long v) { + UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); + } + + private void soConsumerIndex(long v) { + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); + } + + private long lvProducerIndex() { + return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + } + + private long lvConsumerIndex() { + return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); + } } + diff --git a/src/test/java/rx/internal/util/JCToolsQueueTests.java b/src/test/java/rx/internal/util/JCToolsQueueTests.java new file mode 100644 index 0000000000..2645dcd1c1 --- /dev/null +++ b/src/test/java/rx/internal/util/JCToolsQueueTests.java @@ -0,0 +1,52 @@ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import rx.internal.util.unsafe.*; + +public class JCToolsQueueTests { + @Test + public void testMpmcOfferUpToCapacity() { + int n = 128; + MpmcArrayQueue queue = new MpmcArrayQueue(n); + for (int i = 0; i < n; i++) { + assertTrue(queue.offer(i)); + } + assertFalse(queue.offer(n)); + } + @Test + public void testSpscOfferUpToCapacity() { + int n = 128; + SpscArrayQueue queue = new SpscArrayQueue(n); + for (int i = 0; i < n; i++) { + assertTrue(queue.offer(i)); + } + assertFalse(queue.offer(n)); + } + @Test + public void testSpmcOfferUpToCapacity() { + int n = 128; + SpmcArrayQueue queue = new SpmcArrayQueue(n); + for (int i = 0; i < n; i++) { + assertTrue(queue.offer(i)); + } + assertFalse(queue.offer(n)); + } +}