Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Implementation of SerializedObserver #999

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 113 additions & 87 deletions rxjava-core/src/main/java/rx/observers/SerializedObserver.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package rx.observers;

import java.util.ArrayList;

import rx.Observer;
import rx.operators.NotificationLite;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
Expand All @@ -22,137 +19,166 @@ public class SerializedObserver<T> implements Observer<T> {

private boolean emitting = false;
private boolean terminated = false;
private ArrayList<Object> queue = new ArrayList<Object>();
private NotificationLite<T> on = NotificationLite.instance();
private FastList queue;

private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE;
private static final Object NULL_SENTINEL = new Object();
private static final Object COMPLETE_SENTINEL = new Object();

static final class FastList {
Object[] array;
int size;

public void add(Object o) {
int s = size;
Object[] a = array;
if (a == null) {
a = new Object[16];
array = a;
} else if (s == a.length) {
Object[] array2 = new Object[s + (s >> 2)];
System.arraycopy(a, 0, array2, 0, s);
a = array2;
array = a;
}
a[s] = o;
size = s + 1;
}
}

private static final class ErrorSentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}

public SerializedObserver(Observer<? super T> s) {
this.actual = s;
}

@Override
public void onCompleted() {
boolean canEmit = false;
ArrayList<Object> list = null;
FastList list;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it
queue.add(on.completed());
}
}

if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
actual.onCompleted();
} finally {
synchronized (this) {
emitting = false;
if (emitting) {
if (queue == null) {
queue = new FastList();
}
queue.add(COMPLETE_SENTINEL);
return;
}
emitting = true;
list = queue;
queue = null;
}
drainQueue(list);
actual.onCompleted();
}

@Override
public void onError(final Throwable e) {
boolean canEmit = false;
ArrayList<Object> list = null;
FastList list;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
queue.clear();
queue.add(on.error(e));
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
actual.onError(e);
} finally {
synchronized (this) {
emitting = false;
if (emitting) {
if (queue == null) {
queue = new FastList();
}
queue.add(new ErrorSentinel(e));
return;
}
emitting = true;
list = queue;
queue = null;
}
drainQueue(list);
actual.onError(e);
}

@Override
public void onNext(T t) {
boolean canEmit = false;
ArrayList<Object> list = null;
FastList list;

synchronized (this) {
if (terminated) {
return;
}
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
if (emitting) {
if (queue == null) {
queue = new FastList();
}
} else {
// someone else is already emitting so just queue it
queue.add(on.next(t));
queue.add(t != null ? t : NULL_SENTINEL);
// another thread is emitting so we add to the queue and return
return;
}
// we can emit
emitting = true;
// reference to the list to drain before emitting our value
list = queue;
queue = null;
}
if (canEmit) {
// we won the right to emit
try {

// we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
try {
int iter = MAX_DRAIN_ITERATION;
do {
drainQueue(list);
actual.onNext(t);
} finally {
synchronized (this) {
if (terminated) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
} else {
// release this thread
emitting = false;
canEmit = false;
if (iter == MAX_DRAIN_ITERATION) {
// after the first draining we emit our own value
actual.onNext(t);
}
--iter;
if (iter > 0) {
synchronized (this) {
list = queue;
queue = null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If list is null here, there is a window between this sync block and the finally sync block where emitting is still true and events are queued and not replayed until a subsequent event appears. A better way would be:

synchronized (this) {
list = queue;
queue = null;
if (list = null) {
emitting = false;
break;
}
}

But then the finally block should be changed to avoid setting emitting to false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean, but the intricacies with onComplete/onError and the race for terminated then becomes quite complicated.

If I understand correctly, the finally block would not only need to not touch emitting in this case when not terminated, but it would also have to check if terminated and !emitting to reclaim the right to drain the queue, correct? Otherwise the terminal state could result in duplicate emission.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like that. I'll post the proposed correction within a day.

if (list == null) {
break;
}
}
} while (iter > 0);
} finally {
synchronized (this) {
if (terminated) {
list = queue;
queue = null;
} else {
emitting = false;
list = null;
}
}
}

// if terminated this will still be true so let's drain the rest of the queue
if (canEmit) {
// this will only drain if terminated (done here outside of synchronized block)
drainQueue(list);
}
}

public void drainQueue(ArrayList<Object> list) {
if (list == null || list.size() == 0) {
void drainQueue(FastList list) {
if (list == null || list.size == 0) {
return;
}
for (Object v : list) {
on.accept(actual, v);
for (Object v : list.array) {
if (v == null) {
break;
}
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else if (v.getClass() == ErrorSentinel.class) {
actual.onError(((ErrorSentinel) v).e);
} else {
actual.onNext((T) v);
}
}
}
}
}
Loading