From 897b45f6d656156e7a4a23b96a07d3b888d8f001 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 17 Mar 2022 07:15:15 -0700 Subject: [PATCH 1/3] Add Publisher.flatMapConcatSingle Motivation: Publisher.flatMapConcatSingle provides similar semantics to Publisher.flatMapMergeSingle except the results are returned in the same order as the original Publisher. --- .../servicetalk/concurrent/api/Publisher.java | 401 ++++++++++++------ .../api/PublisherFlatMapConcatUtils.java | 148 +++++++ .../api/PublisherFlatMapConcatSingleTest.java | 149 +++++++ .../concurrent/internal/ConcurrentUtils.java | 31 ++ ...rFlatMapConcatSingleDelayErrorTckTest.java | 29 ++ ... PublisherFlatMapConcatSingleTckTest.java} | 6 +- ...rFlatMapMergeSingleDelayErrorTckTest.java} | 6 +- .../PublisherFlatMapMergeSingleTckTest.java | 29 ++ 8 files changed, 661 insertions(+), 138 deletions(-) create mode 100644 servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleDelayErrorTckTest.java rename servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/{PublisherFlatMapSingleTckTest.java => PublisherFlatMapConcatSingleTckTest.java} (79%) rename servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/{PublisherFlatMapSingleDelayErrorTckTest.java => PublisherFlatMapMergeSingleDelayErrorTckTest.java} (82%) create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 54a1d74cb5..df8316cb8e 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -886,13 +886,12 @@ public final Publisher flatMapMergeDelayError(Function * To control the amount of concurrent processing done by this operator see * {@link #flatMapMergeSingle(Function, int)}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data * transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
@@ -925,10 +924,9 @@ public final  Publisher flatMapMergeSingle(Function
-     * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data
+     * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data
      * transformation in sequential programming similar to:
      * 
{@code
      *     ExecutorService e = ...;
@@ -964,51 +962,45 @@ public final  Publisher flatMapMergeSingle(Function
      * The behavior is the same as {@link #flatMapMergeSingle(Function, int)} with the exception that if any
      * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not
      * immediately terminate. Instead, it will wait for this {@link Publisher} and all {@link Single}s to terminate and
-     * then terminate the returned {@link Publisher} with all errors emitted by the {@link Single}s produced by the
+     * then terminate the returned {@link Publisher} with errors emitted by the {@link Single}s produced by the
      * {@code mapper}.
      * 

* To control the amount of concurrent processing done by this operator see * {@link #flatMapMergeSingleDelayError(Function, int)}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data * transformation in sequential programming similar to: *

{@code
-     *     Executor e = ...;
-     *     List tResults = resultOfThisPublisher();
-     *     List rResults = ...; // assume this is thread safe
-     *     List errors = ...;  // assume this is thread safe
-     *     CountDownLatch latch =  new CountDownLatch(tResults.size());
-     *     for (T t : tResults) {
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
      *         // Note that flatMap process results in parallel.
-     *         e.execute(() -> {
-     *             try {
-     *                 R r = mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
-     *                 rResults.add(r);
-     *             } catch (Throwable cause) {
-     *                 errors.add(cause);  // Asynchronous error is flatten into an error by this operator.
-     *             } finally {
-     *                 latch.countdown();
-     *             }
-     *         });
+     *         futures.add(e.submit(() -> {
+     *             processResult(mapper.apply(t));
+     *             return null;
+     *         }));
      *     }
-     *     latch.await();
-     *     if (errors.isEmpty()) {
-     *         return rResults;
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
      *     }
-     *     createAndThrowACompositeException(errors);
+     *     throwExceptionIfNotEmpty(errors);
      * }
* * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. * @param Type of items emitted by the returned {@link Publisher}. * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. * - * @see ReactiveX merge operator. + * @see ReactiveX flatMap operator. * @see #flatMapMergeSingleDelayError(Function, int) */ public final Publisher flatMapMergeSingleDelayError( @@ -1027,32 +1019,27 @@ public final Publisher flatMapMergeSingleDelayError( * then terminate the returned {@link Publisher} with all errors emitted by the {@link Single}s produced by the * {@code mapper}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data * transformation in sequential programming similar to: *

{@code
-     *     Executor e = ...;
-     *     List tResults = resultOfThisPublisher();
-     *     List rResults = ...; // assume this is thread safe
-     *     List errors = ...;  // assume this is thread safe
-     *     CountDownLatch latch =  new CountDownLatch(tResults.size());
-     *     for (T t : tResults) {
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
      *         // Note that flatMap process results in parallel.
-     *         e.execute(() -> {
-     *             try {
-     *                 R r = mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
-     *                 rResults.add(r);
-     *             } catch (Throwable cause) {
-     *                 errors.add(cause);  // Asynchronous error is flatten into an error by this operator.
-     *             } finally {
-     *                 latch.countdown();
-     *             }
-     *         });
+     *         futures.add(e.submit(() -> {
+     *             processResult(mapper.apply(t));
+     *             return null;
+     *         }));
      *     }
-     *     latch.await();
-     *     if (errors.isEmpty()) {
-     *         return rResults;
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
      *     }
-     *     createAndThrowACompositeException(errors);
+     *     throwExceptionIfNotEmpty(errors);
      * }
* * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. @@ -1062,7 +1049,7 @@ public final Publisher flatMapMergeSingleDelayError( * @param Type of items emitted by the returned {@link Publisher}. * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. * - * @see ReactiveX merge operator. + * @see ReactiveX flatMap operator. */ public final Publisher flatMapMergeSingleDelayError( Function> mapper, int maxConcurrency) { @@ -1071,53 +1058,46 @@ public final Publisher flatMapMergeSingleDelayError( /** * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals - * emitted from each mapped {@link Single}<{@link R}> into the returned - * {@link Publisher}<{@link R}>. + * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>. *

* The behavior is the same as {@link #flatMapMergeSingle(Function, int)} with the exception that if any * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not * immediately terminate. Instead, it will wait for this {@link Publisher} and all {@link Single}s to terminate and - * then terminate the returned {@link Publisher} with all errors emitted by the {@link Single}s produced by the + * then terminate the returned {@link Publisher} with errors emitted by the {@link Single}s produced by the * {@code mapper}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data * transformation in sequential programming similar to: *

{@code
-     *     Executor e = ...;
-     *     List tResults = resultOfThisPublisher();
-     *     List rResults = ...; // assume this is thread safe
-     *     List errors = ...;  // assume this is thread safe
-     *     CountDownLatch latch =  new CountDownLatch(tResults.size());
-     *     for (T t : tResults) {
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
      *         // Note that flatMap process results in parallel.
-     *         e.execute(() -> {
-     *             try {
-     *                 R r = mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
-     *                 rResults.add(r);
-     *             } catch (Throwable cause) {
-     *                 errors.add(cause);  // Asynchronous error is flatten into an error by this operator.
-     *             } finally {
-     *                 latch.countdown();
-     *             }
-     *         });
+     *         futures.add(e.submit(() -> {
+     *             processResult(mapper.apply(t));
+     *             return null;
+     *         }));
      *     }
-     *     latch.await();
-     *     if (errors.isEmpty()) {
-     *         return rResults;
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
      *     }
-     *     createAndThrowACompositeException(errors);
+     *     throwExceptionIfNotEmpty(errors);
      * }
* * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. - * @param maxConcurrency Maximum active {@link Single}s at any time. - * Even if the number of items requested by a {@link Subscriber} is more than this number, - * this will never request more than this number at any point. + * @param maxConcurrency Maximum active {@link Single}s at any time. Even if the number of items requested by a + * {@link Subscriber} is more than this number, this will never request more than this number at any point. * @param maxDelayedErrorsHint The maximum amount of errors that will be queued. After this point exceptions maybe * discarded to reduce memory consumption. * @param Type of items emitted by the returned {@link Publisher}. * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. * - * @see ReactiveX merge operator. + * @see ReactiveX flatMap operator. */ public final Publisher flatMapMergeSingleDelayError( Function> mapper, int maxConcurrency, int maxDelayedErrorsHint) { @@ -1219,32 +1199,31 @@ public final Completable flatMapCompletable(Function{@code - * Executor e = ...; - * List errors = ...; // assume this is thread safe - * CountDownLatch latch = new CountDownLatch(tResults.size()); - * for (T t : tResults) { + * ExecutorService e = ...; + * List> futures = ...; // assume this is thread safe + * for (T t : resultOfThisPublisher()) { * // Note that flatMap process results in parallel. - * e.execute(() -> { - * try { - * mapper.apply(t); // Asynchronous result is flattened by this operator. - * } catch (Throwable cause) { - * errors.add(cause); // Asynchronous error is flatten into an error by this operator. - * } finally { - * latch.countdown(); - * } - * }); + * futures.add(e.submit(() -> { + * mapper.apply(t); + * return null; + * })); * } - * latch.await(); - * if (!errors.isEmpty()) { - * createAndThrowACompositeException(errors); + * List errors = ...; + * for (Future future : futures) { + * try { + * future.get(); // Throws if the processing for this item failed. + * } catch (Throwable cause) { + * errors.add(cause); + * } * } + * throwExceptionIfNotEmpty(errors); * }
* * @param mapper Function to convert each item emitted by this {@link Publisher} into a {@link Completable}. * @return A new {@link Completable} that terminates successfully if all the intermediate {@link Completable}s have * terminated successfully or any one of them has terminated with a failure. * - * @see ReactiveX merge operator. + * @see ReactiveX flatMap operator. * @see #flatMapMergeSingleDelayError(Function, int) */ public final Completable flatMapCompletableDelayError(Function mapper) { @@ -1262,25 +1241,24 @@ public final Completable flatMapCompletableDelayError(Function{@code - * Executor e = ...; - * List errors = ...; // assume this is thread safe - * CountDownLatch latch = new CountDownLatch(tResults.size()); - * for (T t : tResults) { + * ExecutorService e = ...; + * List> futures = ...; // assume this is thread safe + * for (T t : resultOfThisPublisher()) { * // Note that flatMap process results in parallel. - * e.execute(() -> { - * try { - * mapper.apply(t); // Asynchronous result is flattened by this operator. - * } catch (Throwable cause) { - * errors.add(cause); // Asynchronous error is flatten into an error by this operator. - * } finally { - * latch.countdown(); - * } - * }); + * futures.add(e.submit(() -> { + * mapper.apply(t); + * return null; + * })); * } - * latch.await(); - * if (!errors.isEmpty()) { - * createAndThrowACompositeException(errors); + * List errors = ...; + * for (Future future : futures) { + * try { + * future.get(); // Throws if the processing for this item failed. + * } catch (Throwable cause) { + * errors.add(cause); + * } * } + * throwExceptionIfNotEmpty(errors); * }
* * @param mapper Function to convert each item emitted by this {@link Publisher} into a {@link Completable}. @@ -1288,7 +1266,7 @@ public final Completable flatMapCompletableDelayError(FunctionReactiveX merge operator. + * @see ReactiveX flatMap operator. * @see #flatMapMergeSingleDelayError(Function, int) */ public final Completable flatMapCompletableDelayError(Function mapper, @@ -1307,25 +1285,24 @@ public final Completable flatMapCompletableDelayError(Function{@code - * Executor e = ...; - * List errors = ...; // assume this is thread safe - * CountDownLatch latch = new CountDownLatch(tResults.size()); - * for (T t : tResults) { + * ExecutorService e = ...; + * List> futures = ...; // assume this is thread safe + * for (T t : resultOfThisPublisher()) { * // Note that flatMap process results in parallel. - * e.execute(() -> { - * try { - * mapper.apply(t); // Asynchronous result is flattened by this operator. - * } catch (Throwable cause) { - * errors.add(cause); // Asynchronous error is flatten into an error by this operator. - * } finally { - * latch.countdown(); - * } - * }); + * futures.add(e.submit(() -> { + * mapper.apply(t); + * return null; + * })); * } - * latch.await(); - * if (!errors.isEmpty()) { - * createAndThrowACompositeException(errors); + * List errors = ...; + * for (Future future : futures) { + * try { + * future.get(); // Throws if the processing for this item failed. + * } catch (Throwable cause) { + * errors.add(cause); + * } * } + * throwExceptionIfNotEmpty(errors); * } * * @param mapper Function to convert each item emitted by this {@link Publisher} into a {@link Completable}. @@ -1335,7 +1312,7 @@ public final Completable flatMapCompletableDelayError(FunctionReactiveX merge operator. + * @see ReactiveX flatMap operator. * @see #flatMapMergeSingleDelayError(Function, int) */ public final Completable flatMapCompletableDelayError(Function mapper, @@ -1344,6 +1321,166 @@ public final Completable flatMapCompletableDelayError(Function + * To control the amount of concurrent processing done by this operator see + * {@link #flatMapConcatSingle(Function, int)}. + *

+ * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * transformation in sequential programming similar to: + *

{@code
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
+     *         // Note that flatMap process results in parallel.
+     *         futures.add(e.submit(() -> {
+     *             return mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *         processResult(future.get()); // Throws if the processing for this item failed.
+     *     }
+     * }
+ * + * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. + * + * @see ReactiveX flatMap operator. + * @see #flatMapConcatSingle(Function, int) + */ + public final Publisher flatMapConcatSingle(Function> mapper) { + return PublisherFlatMapConcatUtils.flatMapConcatSingle(this, mapper); + } + + /** + * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals + * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>. + * Each mapped {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in + * the same order as this {@link Publisher}. + *

+ * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * transformation in sequential programming similar to: + *

{@code
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
+     *         // Note that flatMap process results in parallel.
+     *         futures.add(e.submit(() -> {
+     *             return mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *         processResult(future.get()); // Throws if the processing for this item failed.
+     *     }
+     * }
+ * + * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. + * @param maxConcurrency Maximum active {@link Single}s at any time. Even if the number of items requested by a + * {@link Subscriber} is more than this number, this will never request more than this number at any point. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. + * + * @see ReactiveX flatMap operator. + */ + public final Publisher flatMapConcatSingle(Function> mapper, + int maxConcurrency) { + return PublisherFlatMapConcatUtils.flatMapConcatSingle(this, mapper, maxConcurrency); + } + + /** + * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals + * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>. + * Each mapped {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in + * the same order as this {@link Publisher}. + *

+ * The behavior is the same as {@link #flatMapConcatSingle(Function)} with the exception that if any + * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not + * terminate until this {@link Publisher} and all {@link Single}s to terminate. + *

+ * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * transformation in sequential programming similar to: + *

{@code
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
+     *         // Note that flatMap process results in parallel.
+     *         futures.add(e.submit(() -> {
+     *             return mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           processResult(future.get()); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     * }
+ * + * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. + * + * @see ReactiveX flatMap operator. + * @see #flatMapConcatSingleDelayError(Function, int) + */ + public final Publisher flatMapConcatSingleDelayError( + Function> mapper) { + return PublisherFlatMapConcatUtils.flatMapConcatSingleDelayError(this, mapper); + } + + /** + * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals + * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>. + * Each mapped {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in + * the same order as this {@link Publisher}. + *

+ * The behavior is the same as {@link #flatMapConcatSingle(Function)} with the exception that if any + * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not + * terminate until this {@link Publisher} and all {@link Single}s to terminate. + *

+ * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data + * transformation in sequential programming similar to: + *

{@code
+     *     ExecutorService e = ...;
+     *     List> futures = ...; // assume this is thread safe
+     *     for (T t : resultOfThisPublisher()) {
+     *         // Note that flatMap process results in parallel.
+     *         futures.add(e.submit(() -> {
+     *             return mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           processResult(future.get()); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     * }
+ * + * @param mapper {@link Function} to convert each item emitted by this {@link Publisher} into a {@link Single}. + * @param maxConcurrency Maximum active {@link Single}s at any time. Even if the number of items requested by a + * {@link Subscriber} is more than this number, this will never request more than this number at any point. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}. + * + * @see ReactiveX flatMap operator. + */ + public final Publisher flatMapConcatSingleDelayError( + Function> mapper, int maxConcurrency) { + return PublisherFlatMapConcatUtils.flatMapConcatSingleDelayError(this, mapper, maxConcurrency); + } + /** * Create a {@link Publisher} that flattens each element returned by the {@link Iterable#iterator()} from * {@code mapper}. diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java new file mode 100644 index 0000000000..19f291028b --- /dev/null +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java @@ -0,0 +1,148 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.SingleSource; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Publisher.defer; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.ConcurrentUtils.releaseLock; +import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; +import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedSpscQueue; +import static java.lang.Math.min; + +final class PublisherFlatMapConcatUtils { + private PublisherFlatMapConcatUtils() { + } + + static Publisher flatMapConcatSingle(final Publisher publisher, + final Function> mapper) { + return defer(() -> { + final Queue> results = newUnboundedSpscQueue(4); + final AtomicInteger consumerLock = new AtomicInteger(); + return publisher.flatMapMergeSingle(orderedMapper(mapper, results, consumerLock)) + .shareContextOnSubscribe(); + }); + } + + static Publisher flatMapConcatSingleDelayError( + final Publisher publisher, final Function> mapper) { + return defer(() -> { + final Queue> results = newUnboundedSpscQueue(4); + final AtomicInteger consumerLock = new AtomicInteger(); + return publisher.flatMapMergeSingleDelayError(orderedMapper(mapper, results, consumerLock)) + .shareContextOnSubscribe(); + }); + } + + static Publisher flatMapConcatSingle(final Publisher publisher, + final Function> mapper, + final int maxConcurrency) { + return defer(() -> { + final Queue> results = newUnboundedSpscQueue(min(8, maxConcurrency)); + final AtomicInteger consumerLock = new AtomicInteger(); + return publisher.flatMapMergeSingle(orderedMapper(mapper, results, consumerLock), maxConcurrency) + .shareContextOnSubscribe(); + }); + } + + static Publisher flatMapConcatSingleDelayError( + final Publisher publisher, final Function> mapper, + final int maxConcurrency) { + return defer(() -> { + final Queue> results = newUnboundedSpscQueue(min(8, maxConcurrency)); + final AtomicInteger consumerLock = new AtomicInteger(); + return publisher.flatMapMergeSingleDelayError(orderedMapper(mapper, results, consumerLock), maxConcurrency) + .shareContextOnSubscribe(); + }); + } + + private static Function> orderedMapper( + final Function> mapper, + final Queue> results, final AtomicInteger consumerLock) { + return t -> { + final Single single = mapper.apply(t); + final Item item = new Item<>(); + results.add(item); + return new Single() { + @Override + protected void handleSubscribe(final SingleSource.Subscriber subscriber) { + assert item.subscriber == null; // flatMapMergeSingle only does a single subscribe. + item.subscriber = subscriber; + toSource(single).subscribe(new SingleSource.Subscriber() { + @Override + public void onSubscribe(final Cancellable cancellable) { + subscriber.onSubscribe(cancellable); + } + + @Override + public void onSuccess(@Nullable final R result) { + item.result = result; + item.terminated = true; + tryPollQueue(); + } + + @Override + public void onError(final Throwable t) { + item.cause = t; + item.terminated = true; + tryPollQueue(); + } + + private void tryPollQueue() { + boolean tryAcquire = true; + while (tryAcquire && tryAcquireLock(consumerLock)) { + try { + Item i; + while ((i = results.peek()) != null && i.terminated) { + results.poll(); + assert i.subscriber != null; // if terminated, must have a subscriber + if (i.cause != null) { + i.subscriber.onError(i.cause); + } else { + i.subscriber.onSuccess(i.result); + } + } + // flatMapMergeSingle takes care of exception propagation / cleanup + } finally { + tryAcquire = !releaseLock(consumerLock); + } + } + } + }); + } + } + // The inner Single will determine if a copy is justified when we subscribe to it. + .shareContextOnSubscribe(); + }; + } + + private static final class Item { + @Nullable + SingleSource.Subscriber subscriber; + @Nullable + R result; + @Nullable + Throwable cause; + boolean terminated; + } +} diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java new file mode 100644 index 0000000000..0b928dcb51 --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java @@ -0,0 +1,149 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.function.Function; + +import static io.servicetalk.concurrent.api.Executors.newCachedThreadExecutor; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.time.Duration.ofMillis; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.sameInstance; + +class PublisherFlatMapConcatSingleTest { + private final TestPublisherSubscriber subscriber = new TestPublisherSubscriber<>(); + private final TestSubscription subscription = new TestSubscription(); + private static Executor executor; + + @BeforeAll + static void beforeClass() { + executor = newCachedThreadExecutor(); + } + + @AfterAll + static void afterClass() throws Exception { + executor.closeAsync().toFuture().get(); + } + + @ParameterizedTest + @CsvSource(value = {"0,5,10,false", "0,25,10,false", "0,5,10,true", "0,25,10,true"}) + void orderPreservedInRangeWithConcurrency(int begin, int end, int maxConcurrency, boolean delayError) { + toSource(concatSingle(Publisher.range(begin, end), + i -> executor.timer(ofMillis(1)).toSingle().map(__ -> i + "x"), maxConcurrency, delayError) + ).subscribe(subscriber); + String[] expected = expected(begin, end); + subscriber.awaitSubscription().request(expected.length); + assertThat(subscriber.takeOnNext(expected.length), contains(expected)); + subscriber.awaitOnComplete(); + } + + @ParameterizedTest + @CsvSource(value = {"0,5,10,false", "0,25,10,false", "0,5,10,true", "0,25,10,true"}) + void errorPropagatedInOrderLast(int begin, int end, int maxConcurrency, boolean delayError) { + final int endLessOne = end - 1; + String[] expected = expected(begin, endLessOne); + toSource(concatSingle(Publisher.range(begin, end), i -> executor.timer(ofMillis(1)).toSingle().map(__ -> { + if (i == endLessOne) { + throw DELIBERATE_EXCEPTION; + } + return i + "x"; + }), maxConcurrency, delayError) + ).subscribe(subscriber); + subscriber.awaitSubscription().request(end - begin); + assertThat(subscriber.takeOnNext(expected.length), contains(expected)); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + + @ParameterizedTest + @CsvSource(value = {"0,5,10,false", "0,25,10,false", "0,5,10,true", "0,25,10,true"}) + void errorPropagatedInOrderFirst(int begin, int end, int maxConcurrency, boolean delayError) { + toSource(concatSingle(Publisher.range(begin, end), i -> executor.timer(ofMillis(1)).toSingle().map(__ -> { + if (i == begin) { + throw DELIBERATE_EXCEPTION; + } + return i + "x"; + }), maxConcurrency, delayError) + ).subscribe(subscriber); + subscriber.awaitSubscription().request(end - begin); + if (delayError) { + String[] expected = expected(begin + 1, end); + assertThat(subscriber.takeOnNext(expected.length), contains(expected)); + } + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void cancellationPropagated(boolean delayError) throws InterruptedException { + TestPublisher source = new TestPublisher.Builder().disableAutoOnSubscribe().build( + subscriber1 -> { + subscriber1.onSubscribe(subscription); + return subscriber1; + }); + final TestCancellable cancellable = new TestCancellable(); + final TestSingle singleSource = new TestSingle.Builder().disableAutoOnSubscribe() + .build(subscriber1 -> { + subscriber1.onSubscribe(cancellable); + return subscriber1; + }); + toSource(concatSingle(source, i -> singleSource, 2, delayError)) + .subscribe(subscriber); + subscriber.awaitSubscription().request(1); + subscription.awaitRequestN(1); + source.onNext(0); + singleSource.awaitSubscribed(); + subscriber.awaitSubscription().cancel(); + cancellable.awaitCancelled(); + subscription.awaitCancelled(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void singleTerminalThrows(boolean delayError) { + toSource(concatSingle(Publisher.range(0, 2), i -> Single.succeeded(i + "x"), 2, delayError) + .map(x -> { + throw DELIBERATE_EXCEPTION; + }) + ).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + + private static Publisher concatSingle(Publisher publisher, Function> mapper, + int maxConcurrency, boolean delayError) { + return delayError ? + publisher.flatMapConcatSingleDelayError(mapper, maxConcurrency) : + publisher.flatMapConcatSingle(mapper, maxConcurrency); + } + + private static String[] expected(int begin, int end) { + String[] expected = new String[end - begin]; + for (int i = begin; i < end; ++i) { + expected[i - begin] = i + "x"; + } + return expected; + } +} diff --git a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java index 26e997afc9..70c66d4b5b 100644 --- a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java +++ b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java @@ -17,6 +17,7 @@ import io.servicetalk.concurrent.PublisherSource.Subscription; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -78,6 +79,36 @@ public static boolean releaseLock(AtomicIntegerFieldUpdater lockUpdater, return lockUpdater.getAndSet(owner, CONCURRENT_IDLE) == CONCURRENT_EMITTING; } + /** + * Acquire a lock that is exclusively held with no re-entry, but attempts to acquire the lock while it is + * held can be detected by {@link #releaseLock(AtomicInteger)}. + * @param lock The {@link AtomicInteger} used to control the lock state. + * @return {@code true} if the lock was acquired, {@code false} otherwise. + */ + public static boolean tryAcquireLock(AtomicInteger lock) { + for (;;) { + final int prevEmitting = lock.get(); + if (prevEmitting == CONCURRENT_IDLE) { + if (lock.compareAndSet(CONCURRENT_IDLE, CONCURRENT_EMITTING)) { + return true; + } + } else if (lock.compareAndSet(prevEmitting, CONCURRENT_PENDING)) { + return false; + } + } + } + + /** + * Release a lock that was previously acquired via {@link #tryAcquireLock(AtomicInteger)}. + * @param lock The {@link AtomicInteger} used to control the lock state. + * @return {@code true} if the lock was released, and no other attempts were made to acquire the lock while it + * was held. {@code false} if the lock was released but another attempt was made to acquire the lock before it was + * released. + */ + public static boolean releaseLock(AtomicInteger lock) { + return lock.getAndSet(CONCURRENT_IDLE) == CONCURRENT_EMITTING; + } + /** * Acquire a lock that allows reentry and attempts to acquire the lock while it is * held can be detected by {@link #releaseReentrantLock(AtomicLongFieldUpdater, long, Object)}. diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleDelayErrorTckTest.java new file mode 100644 index 0000000000..3d9b2cff29 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleDelayErrorTckTest.java @@ -0,0 +1,29 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; + +import org.testng.annotations.Test; + +@Test +public class PublisherFlatMapConcatSingleDelayErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher composePublisher(Publisher publisher, int elements) { + return publisher.flatMapConcatSingleDelayError(Single::succeeded); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapSingleTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleTckTest.java similarity index 79% rename from servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapSingleTckTest.java rename to servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleTckTest.java index aa0785750d..fc20910346 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapSingleTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapConcatSingleTckTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,9 @@ import org.testng.annotations.Test; @Test -public class PublisherFlatMapSingleTckTest extends AbstractPublisherOperatorTckTest { +public class PublisherFlatMapConcatSingleTckTest extends AbstractPublisherOperatorTckTest { @Override protected Publisher composePublisher(Publisher publisher, int elements) { - return publisher.flatMapMergeSingle(Single::succeeded, 10); + return publisher.flatMapConcatSingle(Single::succeeded); } } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapSingleDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleDelayErrorTckTest.java similarity index 82% rename from servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapSingleDelayErrorTckTest.java rename to servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleDelayErrorTckTest.java index b7cb3f5f1b..92b260983f 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapSingleDelayErrorTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleDelayErrorTckTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,9 @@ import org.testng.annotations.Test; @Test -public class PublisherFlatMapSingleDelayErrorTckTest extends AbstractPublisherOperatorTckTest { +public class PublisherFlatMapMergeSingleDelayErrorTckTest extends AbstractPublisherOperatorTckTest { @Override protected Publisher composePublisher(Publisher publisher, int elements) { - return publisher.flatMapMergeSingleDelayError(Single::succeeded, 10); + return publisher.flatMapMergeSingleDelayError(Single::succeeded); } } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleTckTest.java new file mode 100644 index 0000000000..2133e446cc --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFlatMapMergeSingleTckTest.java @@ -0,0 +1,29 @@ +/* + * Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; + +import org.testng.annotations.Test; + +@Test +public class PublisherFlatMapMergeSingleTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher composePublisher(Publisher publisher, int elements) { + return publisher.flatMapMergeSingle(Single::succeeded); + } +} From 87391ad808dd5bff7e4d788dcf63e469aea7de39 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 21 Mar 2022 19:10:21 -0700 Subject: [PATCH 2/3] review comments --- .../servicetalk/concurrent/api/Publisher.java | 92 ++++++++------- .../api/PublisherFlatMapConcatUtils.java | 111 ++++++++++-------- .../api/PublisherFlatMapConcatSingleTest.java | 54 +++++++-- .../concurrent/internal/ConcurrentUtils.java | 31 ----- 4 files changed, 161 insertions(+), 127 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index df8316cb8e..ae54fa4605 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -885,14 +885,15 @@ public final Publisher flatMapMergeDelayError(Function * To control the amount of concurrent processing done by this operator see * {@link #flatMapMergeSingle(Function, int)}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -917,17 +918,19 @@ public final  Publisher flatMapMergeDelayError(FunctionReactiveX flatMap operator.
      * @see #flatMapMergeSingle(Function, int)
+     * @see #flatMapConcatSingle(Function)
      */
     public final  Publisher flatMapMergeSingle(Function> mapper) {
         return new PublisherFlatMapSingle<>(this, mapper, false);
     }
 
     /**
-     * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals
-     * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>.
+     * This method is similar to {@link #map(Function)} but the result is asynchronous and results are unordered. More
+     * specifically, map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all
+     * signals emitted from each mapped {@link Single}<{@link R}> into the returned
+     * {@link Publisher}<{@link R}>.
      * 

- * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -954,6 +957,7 @@ public final  Publisher flatMapMergeSingle(FunctionReactiveX flatMap operator.
+     * @see #flatMapConcatSingle(Function, int)
      */
     public final  Publisher flatMapMergeSingle(Function> mapper,
                                                      int maxConcurrency) {
@@ -1002,6 +1006,7 @@ public final  Publisher flatMapMergeSingle(FunctionReactiveX flatMap operator.
      * @see #flatMapMergeSingleDelayError(Function, int)
+     * @see #flatMapConcatSingleDelayError(Function)
      */
     public final  Publisher flatMapMergeSingleDelayError(
             Function> mapper) {
@@ -1009,8 +1014,9 @@ public final  Publisher flatMapMergeSingleDelayError(
     }
 
     /**
-     * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals
-     * emitted from each mapped {@link Single}<{@link R}> into the returned
+     * This method is similar to {@link #map(Function)} but the result is asynchronous and results are unordered. More
+     * specifically, map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all
+     * signals emitted from each mapped {@link Single}<{@link R}> into the returned
      * {@link Publisher}<{@link R}>.
      * 

* The behavior is the same as {@link #flatMapMergeSingle(Function, int)} with the exception that if any @@ -1019,8 +1025,7 @@ public final Publisher flatMapMergeSingleDelayError( * then terminate the returned {@link Publisher} with all errors emitted by the {@link Single}s produced by the * {@code mapper}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -1050,6 +1055,7 @@ public final  Publisher flatMapMergeSingleDelayError(
      * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}.
      *
      * @see ReactiveX flatMap operator.
+     * @see #flatMapConcatSingleDelayError(Function, int)
      */
     public final  Publisher flatMapMergeSingleDelayError(
             Function> mapper, int maxConcurrency) {
@@ -1057,8 +1063,10 @@ public final  Publisher flatMapMergeSingleDelayError(
     }
 
     /**
-     * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals
-     * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>.
+     * This method is similar to {@link #map(Function)} but the result is asynchronous and results are unordered. More
+     * specifically, map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all
+     * signals emitted from each mapped {@link Single}<{@link R}> into the returned
+     * {@link Publisher}<{@link R}>.
      * 

* The behavior is the same as {@link #flatMapMergeSingle(Function, int)} with the exception that if any * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not @@ -1066,8 +1074,7 @@ public final Publisher flatMapMergeSingleDelayError( * then terminate the returned {@link Publisher} with errors emitted by the {@link Single}s produced by the * {@code mapper}. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, unordered, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -1098,6 +1105,7 @@ public final  Publisher flatMapMergeSingleDelayError(
      * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}.
      *
      * @see ReactiveX flatMap operator.
+     * @see #flatMapConcatSingleDelayError(Function, int)
      */
     public final  Publisher flatMapMergeSingleDelayError(
             Function> mapper, int maxConcurrency, int maxDelayedErrorsHint) {
@@ -1322,16 +1330,16 @@ public final Completable flatMapCompletableDelayError(Function
      * To control the amount of concurrent processing done by this operator see
      * {@link #flatMapConcatSingle(Function, int)}.
      * 

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -1352,19 +1360,20 @@ public final Completable flatMapCompletableDelayError(FunctionReactiveX flatMap operator.
      * @see #flatMapConcatSingle(Function, int)
+     * @see #flatMapMergeSingle(Function)
      */
     public final  Publisher flatMapConcatSingle(Function> mapper) {
         return PublisherFlatMapConcatUtils.flatMapConcatSingle(this, mapper);
     }
 
     /**
-     * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals
-     * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>.
-     * Each mapped {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in
-     * the same order as this {@link Publisher}.
+     * This method is similar to {@link #map(Function)} but the result is asynchronous. More specifically, map each
+     * element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals emitted from
+     * each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>. Each mapped
+     * {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in the same order as
+     * this {@link Publisher}.
      * 

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -1386,6 +1395,7 @@ public final  Publisher flatMapConcatSingle(FunctionReactiveX flatMap operator.
+     * @see #flatMapMergeSingle(Function, int) 
      */
     public final  Publisher flatMapConcatSingle(Function> mapper,
                                                       int maxConcurrency) {
@@ -1393,17 +1403,17 @@ public final  Publisher flatMapConcatSingle(Function
      * The behavior is the same as {@link #flatMapConcatSingle(Function)} with the exception that if any
      * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not
      * terminate until this {@link Publisher} and all {@link Single}s to terminate.
      * 

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -1430,6 +1440,7 @@ public final  Publisher flatMapConcatSingle(FunctionReactiveX flatMap operator.
      * @see #flatMapConcatSingleDelayError(Function, int)
+     * @see #flatMapMergeSingleDelayError(Function)
      */
     public final  Publisher flatMapConcatSingleDelayError(
             Function> mapper) {
@@ -1437,17 +1448,17 @@ public final  Publisher flatMapConcatSingleDelayError(
     }
 
     /**
-     * Map each element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals
-     * emitted from each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>.
-     * Each mapped {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in
-     * the same order as this {@link Publisher}.
+     * This method is similar to {@link #map(Function)} but the result is asynchronous. More specifically, map each
+     * element of this {@link Publisher} into a {@link Single}<{@link R}> and flatten all signals emitted from
+     * each mapped {@link Single}<{@link R}> into the returned {@link Publisher}<{@link R}>. Each mapped
+     * {@link Single}<{@link R}> maybe subscribed to concurrently but the results are emitted in the same order as
+     * this {@link Publisher}.
      * 

* The behavior is the same as {@link #flatMapConcatSingle(Function)} with the exception that if any * {@link Single} returned by {@code mapper}, terminates with an error, the returned {@link Publisher} will not * terminate until this {@link Publisher} and all {@link Single}s to terminate. *

- * This method is similar to {@link #map(Function)} but the result is asynchronous, and provides a data - * transformation in sequential programming similar to: + * Provides a data transformation in sequential programming similar to: *

{@code
      *     ExecutorService e = ...;
      *     List> futures = ...; // assume this is thread safe
@@ -1475,6 +1486,7 @@ public final  Publisher flatMapConcatSingleDelayError(
      * @return A new {@link Publisher} that emits all items emitted by each single produced by {@code mapper}.
      *
      * @see ReactiveX flatMap operator.
+     * @see #flatMapMergeSingleDelayError(Function, int)
      */
     public final  Publisher flatMapConcatSingleDelayError(
             Function> mapper, int maxConcurrency) {
diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java
index 19f291028b..b208a594ee 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java
@@ -19,7 +19,7 @@
 import io.servicetalk.concurrent.SingleSource;
 
 import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Function;
 import javax.annotation.Nullable;
 
@@ -36,50 +36,51 @@ private PublisherFlatMapConcatUtils() {
 
     static  Publisher flatMapConcatSingle(final Publisher publisher,
                                                    final Function> mapper) {
-        return defer(() -> {
-            final Queue> results = newUnboundedSpscQueue(4);
-            final AtomicInteger consumerLock = new AtomicInteger();
-            return publisher.flatMapMergeSingle(orderedMapper(mapper, results, consumerLock))
-                    .shareContextOnSubscribe();
-        });
+        return defer(() -> publisher.flatMapMergeSingle(new OrderedMapper<>(mapper, newUnboundedSpscQueue(4)))
+                .shareContextOnSubscribe());
     }
 
     static  Publisher flatMapConcatSingleDelayError(
             final Publisher publisher, final Function> mapper) {
-        return defer(() -> {
-            final Queue> results = newUnboundedSpscQueue(4);
-            final AtomicInteger consumerLock = new AtomicInteger();
-            return publisher.flatMapMergeSingleDelayError(orderedMapper(mapper, results, consumerLock))
-                    .shareContextOnSubscribe();
-        });
+        return defer(() -> publisher.flatMapMergeSingleDelayError(new OrderedMapper<>(mapper, newUnboundedSpscQueue(4)))
+                .shareContextOnSubscribe());
     }
 
     static  Publisher flatMapConcatSingle(final Publisher publisher,
                                                    final Function> mapper,
                                                    final int maxConcurrency) {
-        return defer(() -> {
-            final Queue> results = newUnboundedSpscQueue(min(8, maxConcurrency));
-            final AtomicInteger consumerLock = new AtomicInteger();
-            return publisher.flatMapMergeSingle(orderedMapper(mapper, results, consumerLock), maxConcurrency)
-                    .shareContextOnSubscribe();
-        });
+        return defer(() ->
+                publisher.flatMapMergeSingle(new OrderedMapper<>(mapper,
+                                newUnboundedSpscQueue(min(8, maxConcurrency))), maxConcurrency)
+                        .shareContextOnSubscribe());
     }
 
     static  Publisher flatMapConcatSingleDelayError(
             final Publisher publisher, final Function> mapper,
             final int maxConcurrency) {
-        return defer(() -> {
-            final Queue> results = newUnboundedSpscQueue(min(8, maxConcurrency));
-            final AtomicInteger consumerLock = new AtomicInteger();
-            return publisher.flatMapMergeSingleDelayError(orderedMapper(mapper, results, consumerLock), maxConcurrency)
-                    .shareContextOnSubscribe();
-        });
+        return defer(() ->
+                publisher.flatMapMergeSingleDelayError(new OrderedMapper<>(mapper,
+                                newUnboundedSpscQueue(min(8, maxConcurrency))), maxConcurrency)
+                        .shareContextOnSubscribe());
     }
 
-    private static  Function> orderedMapper(
-            final Function> mapper,
-            final Queue> results, final AtomicInteger consumerLock) {
-        return t -> {
+    private static final class OrderedMapper implements Function> {
+        @SuppressWarnings("rawtypes")
+        private static final AtomicIntegerFieldUpdater consumerLockUpdater =
+                AtomicIntegerFieldUpdater.newUpdater(OrderedMapper.class, "consumerLock");
+        private final Function> mapper;
+        private final Queue> results;
+        @SuppressWarnings("unused")
+        private volatile int consumerLock;
+
+        private OrderedMapper(final Function> mapper,
+                              final Queue> results) {
+            this.mapper = mapper;
+            this.results = results;
+        }
+
+        @Override
+        public Single apply(final T t) {
             final Single single = mapper.apply(t);
             final Item item = new Item<>();
             results.add(item);
@@ -96,35 +97,27 @@ public void onSubscribe(final Cancellable cancellable) {
 
                         @Override
                         public void onSuccess(@Nullable final R result) {
-                            item.result = result;
-                            item.terminated = true;
+                            item.onSuccess(result);
                             tryPollQueue();
                         }
 
                         @Override
                         public void onError(final Throwable t) {
-                            item.cause = t;
-                            item.terminated = true;
+                            item.onError(t);
                             tryPollQueue();
                         }
 
                         private void tryPollQueue() {
                             boolean tryAcquire = true;
-                            while (tryAcquire && tryAcquireLock(consumerLock)) {
+                            while (tryAcquire && tryAcquireLock(consumerLockUpdater, OrderedMapper.this)) {
                                 try {
                                     Item i;
-                                    while ((i = results.peek()) != null && i.terminated) {
+                                    while ((i = results.peek()) != null && i.tryTerminate()) {
                                         results.poll();
-                                        assert i.subscriber != null; // if terminated, must have a subscriber
-                                        if (i.cause != null) {
-                                            i.subscriber.onError(i.cause);
-                                        } else {
-                                            i.subscriber.onSuccess(i.result);
-                                        }
                                     }
                                     // flatMapMergeSingle takes care of exception propagation / cleanup
                                 } finally {
-                                    tryAcquire = !releaseLock(consumerLock);
+                                    tryAcquire = !releaseLock(consumerLockUpdater, OrderedMapper.this);
                                 }
                             }
                         }
@@ -133,16 +126,40 @@ private void tryPollQueue() {
             }
             // The inner Single will determine if a copy is justified when we subscribe to it.
             .shareContextOnSubscribe();
-        };
+        }
     }
 
+
     private static final class Item {
         @Nullable
         SingleSource.Subscriber subscriber;
         @Nullable
-        R result;
-        @Nullable
-        Throwable cause;
-        boolean terminated;
+        private Object result;
+        // 0 = not terminated, 1 = success, 2 = error
+        private byte terminalState;
+
+        void onError(Throwable cause) {
+            terminalState = 2;
+            result = cause;
+        }
+
+        void onSuccess(@Nullable R r) {
+            terminalState = 1;
+            result = r;
+        }
+
+        @SuppressWarnings("unchecked")
+        boolean tryTerminate() {
+            assert subscriber != null; // if terminated, must have a subscriber
+            if (terminalState == 1) {
+                subscriber.onSuccess((R) result);
+                return true;
+            } else if (terminalState == 2) {
+                assert result != null;
+                subscriber.onError((Throwable) result);
+                return true;
+            }
+            return false;
+        }
     }
 }
diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java
index 0b928dcb51..3e86bbef2f 100644
--- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java
+++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatSingleTest.java
@@ -16,6 +16,7 @@
 package io.servicetalk.concurrent.api;
 
 import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;
+import io.servicetalk.context.api.ContextMap;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -23,17 +24,24 @@
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 
 import static io.servicetalk.concurrent.api.Executors.newCachedThreadExecutor;
+import static io.servicetalk.concurrent.api.Single.succeeded;
 import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
 import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
 
 class PublisherFlatMapConcatSingleTest {
+    private static final ContextMap.Key K1 = ContextMap.Key.newKey("k1", String.class);
+    private static final ContextMap.Key K2 = ContextMap.Key.newKey("k2", String.class);
     private final TestPublisherSubscriber subscriber = new TestPublisherSubscriber<>();
     private final TestSubscription subscription = new TestSubscription();
     private static Executor executor;
@@ -48,11 +56,11 @@ static void afterClass() throws Exception {
         executor.closeAsync().toFuture().get();
     }
 
-    @ParameterizedTest
+    @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1} maxConcurrency={2} delayError={3}")
     @CsvSource(value = {"0,5,10,false", "0,25,10,false", "0,5,10,true", "0,25,10,true"})
     void orderPreservedInRangeWithConcurrency(int begin, int end, int maxConcurrency, boolean delayError) {
         toSource(concatSingle(Publisher.range(begin, end),
-                i -> executor.timer(ofMillis(1)).toSingle().map(__ -> i + "x"), maxConcurrency, delayError)
+                i -> executor.timer(getDuration()).toSingle().map(__ -> i + "x"), maxConcurrency, delayError)
         ).subscribe(subscriber);
         String[] expected = expected(begin, end);
         subscriber.awaitSubscription().request(expected.length);
@@ -60,12 +68,12 @@ void orderPreservedInRangeWithConcurrency(int begin, int end, int maxConcurrency
         subscriber.awaitOnComplete();
     }
 
-    @ParameterizedTest
+    @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1} maxConcurrency={2} delayError={3}")
     @CsvSource(value = {"0,5,10,false", "0,25,10,false", "0,5,10,true", "0,25,10,true"})
     void errorPropagatedInOrderLast(int begin, int end, int maxConcurrency, boolean delayError) {
         final int endLessOne = end - 1;
         String[] expected = expected(begin, endLessOne);
-        toSource(concatSingle(Publisher.range(begin, end), i -> executor.timer(ofMillis(1)).toSingle().map(__ -> {
+        toSource(concatSingle(Publisher.range(begin, end), i -> executor.timer(getDuration()).toSingle().map(__ -> {
                     if (i == endLessOne) {
                         throw DELIBERATE_EXCEPTION;
                     }
@@ -77,10 +85,10 @@ void errorPropagatedInOrderLast(int begin, int end, int maxConcurrency, boolean
         assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
     }
 
-    @ParameterizedTest
+    @ParameterizedTest(name = "{displayName} [{index}] begin={0} end={1} maxConcurrency={2} delayError={3}")
     @CsvSource(value = {"0,5,10,false", "0,25,10,false", "0,5,10,true", "0,25,10,true"})
     void errorPropagatedInOrderFirst(int begin, int end, int maxConcurrency, boolean delayError) {
-        toSource(concatSingle(Publisher.range(begin, end), i -> executor.timer(ofMillis(1)).toSingle().map(__ -> {
+        toSource(concatSingle(Publisher.range(begin, end), i -> executor.timer(getDuration()).toSingle().map(__ -> {
                     if (i == begin) {
                         throw DELIBERATE_EXCEPTION;
                     }
@@ -95,7 +103,7 @@ void errorPropagatedInOrderFirst(int begin, int end, int maxConcurrency, boolean
         assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
     }
 
-    @ParameterizedTest
+    @ParameterizedTest(name = "{displayName} [{index}] delayError={0}")
     @ValueSource(booleans = {true, false})
     void cancellationPropagated(boolean delayError) throws InterruptedException {
         TestPublisher source = new TestPublisher.Builder().disableAutoOnSubscribe().build(
@@ -120,10 +128,10 @@ void cancellationPropagated(boolean delayError) throws InterruptedException {
         subscription.awaitCancelled();
     }
 
-    @ParameterizedTest
+    @ParameterizedTest(name = "{displayName} [{index}] delayError={0}")
     @ValueSource(booleans = {true, false})
     void singleTerminalThrows(boolean delayError) {
-        toSource(concatSingle(Publisher.range(0, 2), i -> Single.succeeded(i + "x"), 2, delayError)
+        toSource(concatSingle(Publisher.range(0, 2), i -> succeeded(i + "x"), 2, delayError)
                 .map(x -> {
                     throw DELIBERATE_EXCEPTION;
                 })
@@ -132,6 +140,34 @@ void singleTerminalThrows(boolean delayError) {
         assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
     }
 
+    @ParameterizedTest(name = "{displayName} [{index}] delayError={0}")
+    @ValueSource(booleans = {true, false})
+    void asyncContext(boolean delayError) {
+        assumeFalse(AsyncContext.isDisabled());
+        final int begin = 0;
+        final int end = 2;
+        final int elements = end - begin;
+        AsyncContext.put(K1, "v1");
+        toSource(concatSingle(Publisher.range(begin, end), i -> {
+                    AsyncContext.put(K2, "v2");
+                    return succeeded(i + "x");
+                }, elements, delayError)
+                .map(x -> {
+                    assertThat(AsyncContext.get(K1), equalTo("v1"));
+                    assertThat(AsyncContext.get(K2), equalTo("v2"));
+                    return x;
+                })
+        ).subscribe(subscriber);
+        subscriber.awaitSubscription().request(elements);
+        assertThat(subscriber.takeOnNext(elements), contains(expected(begin, end)));
+        subscriber.awaitOnComplete();
+    }
+
+    private static Duration getDuration() {
+        // Introduce randomness to increase the likelihood of out of order task completion.
+        return ofMillis(ThreadLocalRandom.current().nextInt(5));
+    }
+
     private static  Publisher concatSingle(Publisher publisher, Function> mapper,
                                                     int maxConcurrency, boolean delayError) {
         return delayError ?
diff --git a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java
index 70c66d4b5b..26e997afc9 100644
--- a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java
+++ b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java
@@ -17,7 +17,6 @@
 
 import io.servicetalk.concurrent.PublisherSource.Subscription;
 
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
@@ -79,36 +78,6 @@ public static  boolean releaseLock(AtomicIntegerFieldUpdater lockUpdater,
         return lockUpdater.getAndSet(owner, CONCURRENT_IDLE) == CONCURRENT_EMITTING;
     }
 
-    /**
-     * Acquire a lock that is exclusively held with no re-entry, but attempts to acquire the lock while it is
-     * held can be detected by {@link #releaseLock(AtomicInteger)}.
-     * @param lock The {@link AtomicInteger} used to control the lock state.
-     * @return {@code true} if the lock was acquired, {@code false} otherwise.
-     */
-    public static boolean tryAcquireLock(AtomicInteger lock) {
-        for (;;) {
-            final int prevEmitting = lock.get();
-            if (prevEmitting == CONCURRENT_IDLE) {
-                if (lock.compareAndSet(CONCURRENT_IDLE, CONCURRENT_EMITTING)) {
-                    return true;
-                }
-            } else if (lock.compareAndSet(prevEmitting, CONCURRENT_PENDING)) {
-                return false;
-            }
-        }
-    }
-
-    /**
-     * Release a lock that was previously acquired via {@link #tryAcquireLock(AtomicInteger)}.
-     * @param lock The {@link AtomicInteger} used to control the lock state.
-     * @return {@code true} if the lock was released, and no other attempts were made to acquire the lock while it
-     * was held. {@code false} if the lock was released but another attempt was made to acquire the lock before it was
-     * released.
-     */
-    public static boolean releaseLock(AtomicInteger lock) {
-        return lock.getAndSet(CONCURRENT_IDLE) == CONCURRENT_EMITTING;
-    }
-
     /**
      * Acquire a lock that allows reentry and attempts to acquire the lock while it is
      * held can be detected by {@link #releaseReentrantLock(AtomicLongFieldUpdater, long, Object)}.

From 391d69e8b577cdfcf125ab65da13a05f4bae0a0a Mon Sep 17 00:00:00 2001
From: Scott Mitchell 
Date: Mon, 21 Mar 2022 19:19:47 -0700
Subject: [PATCH 3/3] checkstyle

---
 .../src/main/java/io/servicetalk/concurrent/api/Publisher.java  | 2 +-
 .../servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
index ae54fa4605..b96c402a5a 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
@@ -1395,7 +1395,7 @@ public final  Publisher flatMapConcatSingle(FunctionReactiveX flatMap operator.
-     * @see #flatMapMergeSingle(Function, int) 
+     * @see #flatMapMergeSingle(Function, int)
      */
     public final  Publisher flatMapConcatSingle(Function> mapper,
                                                       int maxConcurrency) {
diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java
index b208a594ee..e81843b033 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapConcatUtils.java
@@ -129,7 +129,6 @@ private void tryPollQueue() {
         }
     }
 
-
     private static final class Item {
         @Nullable
         SingleSource.Subscriber subscriber;