Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: eager concatMap to choose safe or unsafe queue based on platform. #3510

Merged
merged 1 commit into from
Nov 10, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/main/java/rx/internal/operators/OperatorEagerConcatMap.java
Original file line number Diff line number Diff line change
@@ -24,7 +24,8 @@
import rx.Observable.Operator;
import rx.exceptions.Exceptions;
import rx.functions.*;
import rx.internal.util.unsafe.SpscArrayQueue;
import rx.internal.util.atomic.SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
import rx.subscriptions.Subscriptions;

public final class OperatorEagerConcatMap<T, R> implements Operator<R, T> {
@@ -278,7 +279,13 @@ static final class EagerInnerSubscriber<T> extends Subscriber<T> {
public EagerInnerSubscriber(EagerOuterSubscriber<?, T> parent, int bufferSize) {
super();
this.parent = parent;
this.queue = new SpscArrayQueue<T>(bufferSize);
Queue<T> q;
if (UnsafeAccess.isUnsafeAvailable()) {
q = new SpscArrayQueue<T>(bufferSize);
} else {
q = new SpscAtomicArrayQueue<T>(bufferSize);
}
this.queue = q;
request(bufferSize);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/AtomicReferenceArrayQueue.java
*/
package rx.internal.util.atomic;

import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceArray;

import rx.internal.util.unsafe.Pow2;

abstract class AtomicReferenceArrayQueue<E> extends AbstractQueue<E> {
protected final AtomicReferenceArray<E> buffer;
protected final int mask;
public AtomicReferenceArrayQueue(int capacity) {
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = actualCapacity - 1;
this.buffer = new AtomicReferenceArray<E>(actualCapacity);
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty())
;
}
protected final int calcElementOffset(long index, int mask) {
return (int)index & mask;
}
protected final int calcElementOffset(long index) {
return (int)index & mask;
}
protected final E lvElement(AtomicReferenceArray<E> buffer, int offset) {
return buffer.get(offset);
}
protected final E lpElement(AtomicReferenceArray<E> buffer, int offset) {
return buffer.get(offset); // no weaker form available
}
protected final E lpElement(int offset) {
return buffer.get(offset); // no weaker form available
}
protected final void spElement(AtomicReferenceArray<E> buffer, int offset, E value) {
buffer.lazySet(offset, value); // no weaker form available
}
protected final void spElement(int offset, E value) {
buffer.lazySet(offset, value); // no weaker form available
}
protected final void soElement(AtomicReferenceArray<E> buffer, int offset, E value) {
buffer.lazySet(offset, value);
}
protected final void soElement(int offset, E value) {
buffer.lazySet(offset, value);
}
protected final void svElement(AtomicReferenceArray<E> buffer, int offset, E value) {
buffer.set(offset, value);
}
protected final E lvElement(int offset) {
return lvElement(buffer, offset);
}
}
124 changes: 124 additions & 0 deletions src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java
*/
package rx.internal.util.atomic;

import java.util.concurrent.atomic.*;

/**
* A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
* <p>
* This implementation is a mashup of the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
* algorithm with an optimization of the offer method taken from the <a
* href="http://staff.ustc.edu.cn/~bhua/publications/IJPP_draft.pdf">BQueue</a> algorithm (a variation on Fast
* Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.<br>
* For convenience the relevant papers are available in the resources folder:<br>
* <i>2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf<br>
* 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf <br>
* </i> This implementation is wait free.
*
* @param <E>
*/
public final class SpscAtomicArrayQueue<E> extends AtomicReferenceArrayQueue<E> {
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final AtomicLong producerIndex;
protected long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscAtomicArrayQueue(int capacity) {
super(capacity);
this.producerIndex = new AtomicLong();
this.consumerIndex = new AtomicLong();
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}

@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<E> buffer = this.buffer;
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
if (index >= producerLookAhead) {
int step = lookAheadStep;
if (null == lvElement(buffer, calcElementOffset(index + step, mask))) {// LoadLoad
producerLookAhead = index + step;
}
else if (null != lvElement(buffer, offset)){
return false;
}
}
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(buffer, offset, e); // StoreStore
return true;
}

@Override
public E poll() {
final long index = consumerIndex.get();
final int offset = calcElementOffset(index);
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<E> lElementBuffer = buffer;
final E e = lvElement(lElementBuffer, offset);// LoadLoad
if (null == e) {
return null;
}
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(lElementBuffer, offset, null);// StoreStore
return e;
}

@Override
public E peek() {
return lvElement(calcElementOffset(consumerIndex.get()));
}

@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
* indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
* polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}

private void soProducerIndex(long newIndex) {
producerIndex.lazySet(newIndex);
}

private void soConsumerIndex(long newIndex) {
consumerIndex.lazySet(newIndex);
}

private long lvConsumerIndex() {
return consumerIndex.get();
}
private long lvProducerIndex() {
return producerIndex.get();
}
}