Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating queue code from JCTools #2561

Merged
merged 1 commit into from
Jan 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,33 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
+ (BUFFER_PAD << (REF_ELEMENT_SHIFT - SPARSE_SHIFT));
}
protected final int capacity;
protected final long mask;
// @Stable :(
protected final E[] buffer;

@SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) {
this.capacity = Pow2.roundToPowerOfTwo(capacity);
mask = this.capacity - 1;
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
mask = actualCapacity - 1;
// pad data on either end with some empty slots.
buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}

/**
* @param index desirable element index
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index) {
return calcElementOffset(index, mask);
}
/**
* @param index desirable element index
* @param mask
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index, long mask) {
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}

/**
* A plain store (no ordering/fences) of an element to a given offset
*
Expand Down Expand Up @@ -171,4 +177,10 @@ protected final E lvElement(E[] buffer, long offset) {
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty())
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ public abstract class ConcurrentSequencedCircularArrayQueue<E> extends Concurren

public ConcurrentSequencedCircularArrayQueue(int capacity) {
super(capacity);
int actualCapacity = (int) (this.mask + 1);
// pad data on either end with some empty slots.
sequenceBuffer = new long[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
for (long i = 0; i < this.capacity; i++) {
sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
for (long i = 0; i < actualCapacity; i++) {
soSequence(sequenceBuffer, calcSequenceOffset(i), i);
}
}
Expand All @@ -54,4 +55,4 @@ protected final long lvSequence(long[] buffer, long offset) {
return UNSAFE.getLongVolatile(buffer, offset);
}

}
}
14 changes: 7 additions & 7 deletions src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,26 @@ interface MessagePassingQueue<M> {

/**
* Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
* {@link Queue#offer(Object)} interface (but failure to offer doesn't necessitate queue is full).
* {@link Queue#offer(Object)} interface.
*
* @param message
* @return true if element was inserted into the queue, false if cannot enqueue
* @return true if element was inserted into the queue, false iff full
*/
boolean offer(M message);

/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#poll()} interface (barring the hard requirement on null returns).
* the {@link Queue#poll()} interface.
*
* @return a message from the queue if one is available, null otherwise(not necessarily empty)
* @return a message from the queue if one is available, null iff empty
*/
M poll();

/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#peek()} interface (barring the hard requirement on null returns).
* the {@link Queue#peek()} interface.
*
* @return a message from the queue if one is available, null otherwise(not necessarily empty)
* @return a message from the queue if one is available, null iff empty
*/
M peek();

Expand All @@ -71,4 +71,4 @@ interface MessagePassingQueue<M> {
*/
boolean isEmpty();

}
}
89 changes: 51 additions & 38 deletions src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ abstract class MpmcArrayQueueProducerField<E> extends MpmcArrayQueueL1Pad<E> {
private final static long P_INDEX_OFFSET;
static {
try {
P_INDEX_OFFSET =
UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class.getDeclaredField("producerIndex"));
P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class
.getDeclaredField("producerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -65,8 +65,8 @@ abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad<E> {
private final static long C_INDEX_OFFSET;
static {
try {
C_INDEX_OFFSET =
UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class
.getDeclaredField("consumerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
Expand All @@ -87,26 +87,28 @@ protected final boolean casConsumerIndex(long expect, long newValue) {
}

/**
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that any and all
* threads may call the offer/poll/peek methods and correctness is maintained. <br>
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
* any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
* This implementation follows patterns documented on the package level for False Sharing protection.<br>
* The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See <a
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original algorithm
* uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on
* Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is
* a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is
* closer structurally to the original, but sadly does not perform as well as this implementation.<br>
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original
* algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
* Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
* field of the struct. There is a further alternative in the experimental project which uses iteration phase
* markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
* well as this implementation.<br>
* Tradeoffs to keep in mind:
* <ol>
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays.
* We are trading memory to avoid false sharing(active and passive).
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array.
* This is doubling/tripling the memory allocated for the buffer.
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
* both arrays. We are trading memory to avoid false sharing(active and passive).
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
* elements array. This is doubling/tripling the memory allocated for the buffer.
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
* equal to the requested capacity.
* </ol>
*
* @param <E> type of the element stored in the {@link java.util.Queue}
* @param <E>
* type of the element stored in the {@link java.util.Queue}
*/
public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
long p40, p41, p42, p43, p44, p45, p46;
Expand All @@ -123,10 +125,11 @@ public boolean offer(final E e) {
}

// local load of field to avoid repeated loads after volatile reads
final long capacity = mask + 1;
final long[] lSequenceBuffer = sequenceBuffer;
long currentProducerIndex;
long seqOffset;

long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
currentProducerIndex = lvProducerIndex(); // LoadLoad
seqOffset = calcSequenceOffset(currentProducerIndex);
Expand All @@ -140,8 +143,10 @@ public boolean offer(final E e) {
break;
}
// failed cas, retry 1
} else if (delta < 0) {
// poll has not moved this value forward
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
return false;
}

Expand All @@ -161,16 +166,17 @@ public boolean offer(final E e) {

/**
* {@inheritDoc}
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must
* test producer index when next element is not visible.
* <p>
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
* and must test producer index when next element is not visible.
*/
@Override
public E poll() {
// local load of field to avoid repeated loads after volatile reads
final long[] lSequenceBuffer = sequenceBuffer;
long currentConsumerIndex;
long seqOffset;

long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
currentConsumerIndex = lvConsumerIndex();// LoadLoad
seqOffset = calcSequenceOffset(currentConsumerIndex);
Expand All @@ -183,12 +189,10 @@ public E poll() {
break;
}
// failed cas, retry 1
} else if (delta < 0) {
// COMMENTED OUT: strict empty check.
// if (currentConsumerIndex == lvProducerIndex()) {
// return null;
// }
// next element is not visible, probably empty
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}

Expand All @@ -202,22 +206,31 @@ public E poll() {

// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + capacity);// StoreStore
soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore

return e;
}

@Override
public E peek() {
return lpElement(calcElementOffset(lvConsumerIndex()));
long currConsumerIndex;
E e;
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElement(calcElementOffset(currConsumerIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
return e;
}

@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
* indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
* polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
Expand All @@ -229,13 +242,13 @@ public int size() {
}
}
}

@Override
public boolean isEmpty() {
// Order matters!
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is nothing we
// can do to make this an exact method.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return (lvConsumerIndex() == lvProducerIndex());
}
}
}
Loading