From 1c463da9974575711b032d57988c62724facb395 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 4 Oct 2022 12:11:21 -0700 Subject: [PATCH] PublisherAsBlockingIterable LinkedBlockingQueue -> LinkedTransferQueue (#2386) Motivation: LinkedBlockingQueue goes through LockSupport park and unpark methods which can incur relatively expensive context switching if the EventLoop thread has to unpark an application thread. This has been shown to be a bottleneck as throughput increases. Modifications: - Use LinkedTransferQueue which does a `Thread.yield()` before parking on the consumer thread. This may use more CPU on the consumer thread but the assumption is there will be many more application threads than EventLoop threads and we want to minimize producer costs. Results: LinkedTransferQueue ``` Running 30s test @ http://localhost:8080/medium, using 'ServiceTalkGrpcBlockingClientStrAgg' client 1024 threads and 1024 connections Thread Stats Avg Stdev Max +/- Stdev Latency - - - - Req/Sec 0.01k - 0.01k - 290977 requests in 30s Requests/sec: 9699.23 Transfer/sec: - OK: 290977 KO: 0 ``` LinkedBlockingQueue ``` Running 30s test @ http://localhost:8080/medium, using 'ServiceTalkGrpcBlockingClientStrAgg' client 1024 threads and 1024 connections Thread Stats Avg Stdev Max +/- Stdev Latency - - - - Req/Sec 0.01k - 0.01k - 256778 requests in 30s Requests/sec: 8559.27 Transfer/sec: - OK: 256778 KO: 0 ``` --- .../concurrent/api/PublisherAsBlockingIterable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java index 229470f018..d35b53cead 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java @@ -30,7 +30,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; @@ -39,7 +39,7 @@ import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; -import static io.servicetalk.utils.internal.PlatformDependent.throwException; +import static io.servicetalk.utils.internal.ThrowableUtils.throwException; import static java.lang.Math.min; import static java.lang.Thread.currentThread; import static java.util.Objects.requireNonNull; @@ -101,7 +101,7 @@ private static final class SubscriberAndIterator implements Subscriber, Bl SubscriberAndIterator(int queueCapacity) { requestN = queueCapacity; - data = new LinkedBlockingQueue<>(); + data = new LinkedTransferQueue<>(); } @Override