Skip to content

Commit

Permalink
Publisher#flatMapConcatSingle null value concurrency fix (#2323)
Browse files Browse the repository at this point in the history
Motivation:
Publisher#flatMapConcatSingle maintains a queue of elements
on top of Publisher#flatMapSingle to maintain ordering. Elements
within this queue have non-volatile state due to outer concurrency
protections, however there are two independent variables indicating
"terminal type" and "terminal value". This may result in reading
the "terminal type" the "terminal value" may not be visible yet
and return `null`.

Modifications:
- Collapse the 2 variables into a single variable, so when reading
  if it is non-null we have the "type" and "value" visible.

Result:
Fixes #2321
  • Loading branch information
Scottmitch authored Aug 13, 2022
1 parent 691bd49 commit 49bb61f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import static io.servicetalk.concurrent.api.Publisher.defer;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.api.SubscriberApiUtils.unwrapNullUnchecked;
import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull;
import static io.servicetalk.concurrent.internal.ConcurrentUtils.releaseLock;
import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock;
import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedSpscQueue;
Expand Down Expand Up @@ -132,33 +134,36 @@ private void tryPollQueue() {
private static final class Item<R> {
@Nullable
SingleSource.Subscriber<? super R> subscriber;
/**
* Visibility is provided by {@link OrderedMapper#consumerLockUpdater}. There are multiple producer threads
* modifying independent {@link Item}s, but only a single thread consumes and calls {@link #tryTerminate()}.
* Since the state is a single reference either the consumer thread sees the state, or it doesn't. If it does
* then the item can be consumed and we are done. If it doesn't then the
* {@link OrderedMapper#consumerLockUpdater} requires another lock attempt and the state becomes visible.
*/
@Nullable
private Object result;
// 0 = not terminated, 1 = success, 2 = error
private byte terminalState;

void onError(Throwable cause) {
terminalState = 2;
result = cause;
result = new ThrowableWrapper(cause);
}

void onSuccess(@Nullable R r) {
terminalState = 1;
result = r;
result = wrapNull(r);
}

@SuppressWarnings("unchecked")
boolean tryTerminate() {
assert subscriber != null; // if terminated, must have a subscriber
if (terminalState == 1) {
subscriber.onSuccess((R) result);
return true;
} else if (terminalState == 2) {
assert result != null;
subscriber.onError((Throwable) result);
return true;
final Object localResult = result;
if (localResult == null) {
return false;
} else if (ThrowableWrapper.class.equals(localResult.getClass())) {
assert subscriber != null; // if terminated, must have a subscriber
subscriber.onError(((ThrowableWrapper) localResult).unwrap());
} else {
assert subscriber != null; // if terminated, must have a subscriber
subscriber.onSuccess(unwrapNullUnchecked(localResult));
}
return false;
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void onSuccess(@Nullable final T result) {
if (result == null) {
setValue(NULL);
} else if (result instanceof Throwable) {
setValue(new ThrowableWrapper(result));
setValue(new ThrowableWrapper((Throwable) result));
} else {
setValue(result);
}
Expand All @@ -168,20 +168,4 @@ public void onComplete() {
setValue(NULL);
}
}

/**
* Used to distinguish succeeded {@code Single<Throwable>} vs failed {@code Single<T>}.
*/
private static final class ThrowableWrapper {

private final Object throwable;

ThrowableWrapper(final Object throwable) {
this.throwable = throwable;
}

Object unwrap() {
return throwable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

final class SubscriberApiUtils {
private static final Object NULL_TOKEN = new Object();
static final int SUBSCRIBER_STATE_IDLE = 0;
static final int SUBSCRIBER_STATE_ON_NEXT = 1;
static final int SUBSCRIBER_STATE_TERMINATED = 2;

private SubscriberApiUtils() {
// no instances
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

/**
* Used to distinguish between a real object and a {@link Throwable} from terminal error.
*/
final class ThrowableWrapper {
private final Throwable throwable;

ThrowableWrapper(final Throwable throwable) {
this.throwable = throwable;
}

Throwable unwrap() {
return throwable;
}
}

0 comments on commit 49bb61f

Please sign in to comment.