From 143a2bba5389eb96088c5d6610e1d08fbd6fdd34 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:15:31 -0700 Subject: [PATCH 1/5] Stop polling from the request queue if there are still requests in flight in download directory to prevent potential thread starvation issue. --- .../s3/internal/AsyncBufferingSubscriber.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index f4394331563f..633bcb2049df 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -86,18 +86,24 @@ private void flushBufferIfNeeded() { break; } - switch (next.get().type()) { + StoringSubscriber.Event event = next.get(); + + if (shouldStopPolling(event)) { + break; + } + + switch (event.type()) { case ON_COMPLETE: handleCompleteEvent(); break; case ON_ERROR: - handleError(next.get().runtimeError()); + handleError(event.runtimeError()); break; case ON_NEXT: - handleOnNext(next.get().value()); + handleOnNext(event.value()); break; default: - handleError(new IllegalStateException("Unknown stored type: " + next.get().type())); + handleError(new IllegalStateException("Unknown stored type: " + event.type())); break; } @@ -109,6 +115,15 @@ private void flushBufferIfNeeded() { } } + /** + * If the last event in queue is ON_COMPLETE and there are still in-flight requests, + * we should stop polling. + */ + private boolean shouldStopPolling(StoringSubscriber.Event event) { + return numRequestsInFlight.get() != 0 && + event.type() == StoringSubscriber.EventType.ON_COMPLETE; + } + private void handleOnNext(T item) { storingSubscriber.poll(); From 90dcea64736c2f27ced5494a4ae190cbb16c6539 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 29 Mar 2023 13:04:18 -0700 Subject: [PATCH 2/5] Refactor async buffering subscriber --- .../s3/internal/AsyncBufferingSubscriber.java | 106 +++--------------- 1 file changed, 17 insertions(+), 89 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 633bcb2049df..2f810aa5532f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -15,9 +15,7 @@ package software.amazon.awssdk.transfer.s3.internal; -import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.reactivestreams.Subscriber; @@ -25,8 +23,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; -import software.amazon.awssdk.utils.async.DemandIgnoringSubscription; -import software.amazon.awssdk.utils.async.StoringSubscriber; /** * An implementation of {@link Subscriber} that execute the provided function for every event and limits the number of concurrent @@ -41,12 +37,9 @@ public class AsyncBufferingSubscriber implements Subscriber { private final Function> consumer; private final int maxConcurrentExecutions; private final AtomicInteger numRequestsInFlight; - private final AtomicBoolean isDelivering = new AtomicBoolean(false); - private volatile boolean isStreamingDone; + private volatile boolean upstreamDone; private Subscription subscription; - private final StoringSubscriber storingSubscriber; - public AsyncBufferingSubscriber(Function> consumer, CompletableFuture returnFuture, int maxConcurrentExecutions) { @@ -54,7 +47,6 @@ public AsyncBufferingSubscriber(Function> consumer, this.consumer = consumer; this.maxConcurrentExecutions = maxConcurrentExecutions; this.numRequestsInFlight = new AtomicInteger(0); - this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE); } @Override @@ -65,104 +57,40 @@ public void onSubscribe(Subscription subscription) { subscription.cancel(); return; } - storingSubscriber.onSubscribe(new DemandIgnoringSubscription(subscription)); this.subscription = subscription; subscription.request(maxConcurrentExecutions); } @Override public void onNext(T item) { - storingSubscriber.onNext(item); - flushBufferIfNeeded(); - } - - private void flushBufferIfNeeded() { - if (isDelivering.compareAndSet(false, true)) { - try { - Optional> next = storingSubscriber.peek(); - while (numRequestsInFlight.get() < maxConcurrentExecutions) { - if (!next.isPresent()) { - subscription.request(1); - break; - } - - StoringSubscriber.Event event = next.get(); - - if (shouldStopPolling(event)) { - break; - } - - switch (event.type()) { - case ON_COMPLETE: - handleCompleteEvent(); - break; - case ON_ERROR: - handleError(event.runtimeError()); - break; - case ON_NEXT: - handleOnNext(event.value()); - break; - default: - handleError(new IllegalStateException("Unknown stored type: " + event.type())); - break; - } - - next = storingSubscriber.peek(); - } - } finally { - isDelivering.set(false); - } - } - } - - /** - * If the last event in queue is ON_COMPLETE and there are still in-flight requests, - * we should stop polling. - */ - private boolean shouldStopPolling(StoringSubscriber.Event event) { - return numRequestsInFlight.get() != 0 && - event.type() == StoringSubscriber.EventType.ON_COMPLETE; - } - - private void handleOnNext(T item) { - storingSubscriber.poll(); - - int numberOfRequestInFlight = numRequestsInFlight.incrementAndGet(); - log.debug(() -> "Delivering next item, numRequestInFlight=" + numberOfRequestInFlight); - + numRequestsInFlight.incrementAndGet(); consumer.apply(item).whenComplete((r, t) -> { numRequestsInFlight.decrementAndGet(); - if (!isStreamingDone) { - subscription.request(1); - } else { - flushBufferIfNeeded(); - } + checkForCompletion(); }); } - private void handleCompleteEvent() { - if (numRequestsInFlight.get() == 0) { - returnFuture.complete(null); - storingSubscriber.poll(); - } - } - @Override public void onError(Throwable t) { - handleError(t); - storingSubscriber.onError(t); - } - - private void handleError(Throwable t) { + upstreamDone = true; returnFuture.completeExceptionally(t); - storingSubscriber.poll(); } @Override public void onComplete() { - isStreamingDone = true; - storingSubscriber.onComplete(); - flushBufferIfNeeded(); + upstreamDone = true; + checkForCompletion(); + } + + private void checkForCompletion() { + if (!upstreamDone) { + subscription.request(1); + return; + } + + if (upstreamDone && numRequestsInFlight.get() == 0) { + returnFuture.complete(null); + } } /** From 5ec364c25d9e5c58d9f155e6b5e968682a540939 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 29 Mar 2023 14:42:34 -0700 Subject: [PATCH 3/5] Address feedback --- .../s3/internal/AsyncBufferingSubscriber.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 2f810aa5532f..388109d78cdc 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -64,32 +64,35 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T item) { numRequestsInFlight.incrementAndGet(); - consumer.apply(item).whenComplete((r, t) -> { - numRequestsInFlight.decrementAndGet(); - checkForCompletion(); - }); + consumer.apply(item) + .whenComplete((r, t) -> + checkForCompletion(numRequestsInFlight.decrementAndGet())); } @Override public void onError(Throwable t) { - upstreamDone = true; + // Need to complete future exceptionally first to prevent + // accidental successful completion by a concurrent checkForCompletion. returnFuture.completeExceptionally(t); + upstreamDone = true; } @Override public void onComplete() { upstreamDone = true; - checkForCompletion(); + checkForCompletion(numRequestsInFlight.get()); } - private void checkForCompletion() { - if (!upstreamDone) { - subscription.request(1); + private void checkForCompletion(int requestsInFlight) { + if (upstreamDone && requestsInFlight == 0) { + // This could get invoked multiple times, but it doesn't matter + // because future.complete is idempotent. + returnFuture.complete(null); return; } - if (upstreamDone && numRequestsInFlight.get() == 0) { - returnFuture.complete(null); + synchronized (this) { + subscription.request(1); } } From fbe70b9e79a88743b54f75b5b164fc3179253406 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 29 Mar 2023 15:00:55 -0700 Subject: [PATCH 4/5] Improve existing test --- .../internal/TransferManagerLoggingTest.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerLoggingTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerLoggingTest.java index a1998973c7ea..4f2be4d00063 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerLoggingTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerLoggingTest.java @@ -17,42 +17,52 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.LogEvent; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.testutils.LogCaptor; import software.amazon.awssdk.transfer.s3.S3TransferManager; -public class TransferManagerLoggingTest { +class TransferManagerLoggingTest { @Test - public void transferManager_withCrtClient_shouldNotLogWarnMessages(){ - LogCaptor logCaptor = LogCaptor.create(Level.WARN); - S3AsyncClient s3Crt = S3AsyncClient.crtCreate(); - S3TransferManager tm = S3TransferManager.builder().s3Client(s3Crt).build(); + void transferManager_withCrtClient_shouldNotLogWarnMessages() { - List events = logCaptor.loggedEvents(); - assertThat(events).isEmpty(); - logCaptor.clear(); - logCaptor.close(); + try (S3AsyncClient s3Crt = S3AsyncClient.crtBuilder() + .region(Region.US_WEST_2) + .credentialsProvider(() -> AwsBasicCredentials.create("foo", "bar")) + .build(); + LogCaptor logCaptor = LogCaptor.create(Level.WARN); + S3TransferManager tm = S3TransferManager.builder().s3Client(s3Crt).build()) { + List events = logCaptor.loggedEvents(); + assertThat(events).isEmpty(); + } } @Test - public void transferManager_withJavaClient_shouldLogWarnMessage(){ - LogCaptor logCaptor = LogCaptor.create(Level.WARN); - S3AsyncClient s3Java = S3AsyncClient.create(); - S3TransferManager tm = S3TransferManager.builder().s3Client(s3Java).build(); + void transferManager_withJavaClient_shouldLogWarnMessage() { - List events = logCaptor.loggedEvents(); - assertLogged(events, Level.WARN, "The provided DefaultS3AsyncClient is not an instance of S3CrtAsyncClient, and " - + "thus multipart upload/download feature is not enabled and resumable file upload is " - + "not supported. To benefit from maximum throughput, consider using " - + "S3AsyncClient.crtBuilder().build() instead."); - logCaptor.clear(); - logCaptor.close(); + + try (S3AsyncClient s3Crt = S3AsyncClient.builder() + .region(Region.US_WEST_2) + .credentialsProvider(() -> AwsBasicCredentials.create("foo", "bar")) + .build(); + LogCaptor logCaptor = LogCaptor.create(Level.WARN); + S3TransferManager tm = S3TransferManager.builder().s3Client(s3Crt).build()) { + List events = logCaptor.loggedEvents(); + assertLogged(events, Level.WARN, "The provided DefaultS3AsyncClient is not an instance of S3CrtAsyncClient, and " + + "thus multipart upload/download feature is not enabled and resumable file upload" + + " is " + + "not supported. To benefit from maximum throughput, consider using " + + "S3AsyncClient.crtBuilder().build() instead."); + } } private static void assertLogged(List events, org.apache.logging.log4j.Level level, String message) { From ce32b2a4b72a72681244fcf75948b1be51b423f1 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 29 Mar 2023 20:08:36 -0700 Subject: [PATCH 5/5] move request to onNext --- .../s3/internal/AsyncBufferingSubscriber.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 388109d78cdc..4245ff5591f3 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -64,9 +64,12 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T item) { numRequestsInFlight.incrementAndGet(); - consumer.apply(item) - .whenComplete((r, t) -> - checkForCompletion(numRequestsInFlight.decrementAndGet())); + consumer.apply(item).whenComplete((r, t) -> { + checkForCompletion(numRequestsInFlight.decrementAndGet()); + synchronized (this) { + subscription.request(1); + } + }); } @Override @@ -88,11 +91,6 @@ private void checkForCompletion(int requestsInFlight) { // This could get invoked multiple times, but it doesn't matter // because future.complete is idempotent. returnFuture.complete(null); - return; - } - - synchronized (this) { - subscription.request(1); } }