diff --git a/src/main/java/org/mule/service/http/impl/service/client/GrizzlyHttpClient.java b/src/main/java/org/mule/service/http/impl/service/client/GrizzlyHttpClient.java index 53fc8183..780d2022 100644 --- a/src/main/java/org/mule/service/http/impl/service/client/GrizzlyHttpClient.java +++ b/src/main/java/org/mule/service/http/impl/service/client/GrizzlyHttpClient.java @@ -179,8 +179,9 @@ public void start() { .withName(name), DEFAULT_SELECTOR_THREAD_COUNT); workerScheduler = getWorkerScheduler(schedulersConfig.withName(name + ".requester.workers")); - if (streamingEnabled) { + if (streamingEnabled && nonBlockingStreamWriter.isEnabled()) { // Only use a dedicated thread to the stream writer if streaming is enabled. + // Also, only use it if the feature is enabled. workerScheduler.submit(nonBlockingStreamWriter); } diff --git a/src/main/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriter.java b/src/main/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriter.java index b9e9d33e..f471ec44 100644 --- a/src/main/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriter.java +++ b/src/main/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriter.java @@ -6,6 +6,7 @@ */ package org.mule.service.http.impl.service.client; +import static java.lang.Boolean.getBoolean; import static java.lang.Math.min; import static java.lang.Thread.currentThread; import static java.lang.Thread.sleep; @@ -38,19 +39,27 @@ */ public class NonBlockingStreamWriter implements Runnable { + private static final boolean KILL_SWITCH = getBoolean("mule.http.client.responseStreaming.nonBlockingWriter"); + private static final Logger LOGGER = getLogger(NonBlockingStreamWriter.class); private static final int DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS = 100; private final AtomicBoolean isStopped = new AtomicBoolean(false); private final BlockingQueue tasks = new LinkedBlockingQueue<>(); private final int timeToSleepWhenCouldNotWriteMillis; + private final boolean isEnabled; - public NonBlockingStreamWriter(int timeToSleepWhenCouldNotWriteMillis) { + public NonBlockingStreamWriter(int timeToSleepWhenCouldNotWriteMillis, boolean isEnabled) { this.timeToSleepWhenCouldNotWriteMillis = timeToSleepWhenCouldNotWriteMillis; + this.isEnabled = isEnabled; } public NonBlockingStreamWriter() { - this(DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS); + this(DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS, KILL_SWITCH); + } + + public boolean isEnabled() { + return isEnabled; } /** diff --git a/src/main/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandler.java b/src/main/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandler.java index d7944132..5ed221c9 100644 --- a/src/main/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandler.java +++ b/src/main/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandler.java @@ -260,7 +260,7 @@ public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception private STATE writeBodyPartToPipe(HttpResponseBodyPart bodyPart) throws IOException { int bodyLength = bodyPart.length(); int spaceInPipe = availableSpaceInPipe(); - if (spaceInPipe >= 0 && spaceInPipe < bodyLength) { + if (nonBlockingStreamWriter.isEnabled() && spaceInPipe >= 0 && spaceInPipe < bodyLength) { // There is no room to write everything, so we defer the content writing to the output stream. Also, to avoid // receiving more bodyParts temporarily, we have to pause the READ events. final PauseHandler pauseHandler = bodyPart.getPauseHandler(); diff --git a/src/test/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriterTestCase.java b/src/test/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriterTestCase.java index 748bcb98..ff690131 100644 --- a/src/test/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriterTestCase.java +++ b/src/test/java/org/mule/service/http/impl/service/client/NonBlockingStreamWriterTestCase.java @@ -59,7 +59,7 @@ public class NonBlockingStreamWriterTestCase extends AbstractMuleTestCase { @Before public void setUp() { // not scheduling always to test the sync test cases - nonBlockingStreamWriter = new NonBlockingStreamWriter(TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS); + nonBlockingStreamWriter = new NonBlockingStreamWriter(TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS, true); } @After @@ -206,7 +206,7 @@ public void interruptTheThreadDoesntInterruptTheWriterIfNotStopped() throws Inte @Test public void interruptTheThreadAfterStopWillInterruptTheSleep() throws InterruptedException { int ridiculouslyBigSleepMillis = Integer.MAX_VALUE; - NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis); + NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis, true); Thread threadOutsideTheStaticExecutor = new Thread(writer); threadOutsideTheStaticExecutor.start(); diff --git a/src/test/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandlerTestCase.java b/src/test/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandlerTestCase.java index cf1c03de..a3eccb84 100644 --- a/src/test/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandlerTestCase.java +++ b/src/test/java/org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandlerTestCase.java @@ -86,7 +86,7 @@ public class ResponseBodyDeferringAsyncHandlerTestCase extends AbstractMuleTestC private final PollingProber prober = new PollingProber(PROBE_TIMEOUT, POLL_DELAY); private final ExecutorService workersExecutor = newFixedThreadPool(5); - private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter(); + private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter(100, true); private static final String READ_TIMEOUT_PROPERTY_NAME = "mule.http.responseStreaming.pipeReadTimeoutMillis";