Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
W-17624200: Add kill switch to the non-blocking stream writer (#703)
Browse files Browse the repository at this point in the history
(cherry picked from commit ef462ae)
eze210 committed Jan 17, 2025
1 parent e4e06c7 commit d9b36c7
Showing 5 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -179,8 +179,9 @@ public void start() {
.withName(name), DEFAULT_SELECTOR_THREAD_COUNT);
workerScheduler = getWorkerScheduler(schedulersConfig.withName(name + ".requester.workers"));

if (streamingEnabled) {
if (streamingEnabled && nonBlockingStreamWriter.isEnabled()) {
// Only use a dedicated thread to the stream writer if streaming is enabled.
// Also, only use it if the feature is enabled.
workerScheduler.submit(nonBlockingStreamWriter);
}

Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
*/
package org.mule.service.http.impl.service.client;

import static java.lang.Boolean.getBoolean;
import static java.lang.Math.min;
import static java.lang.Thread.currentThread;
import static java.lang.Thread.sleep;
@@ -38,19 +39,27 @@
*/
public class NonBlockingStreamWriter implements Runnable {

private static final boolean KILL_SWITCH = getBoolean("mule.http.client.responseStreaming.nonBlockingWriter");

private static final Logger LOGGER = getLogger(NonBlockingStreamWriter.class);
private static final int DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS = 100;

private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final BlockingQueue<InternalWriteTask> tasks = new LinkedBlockingQueue<>();
private final int timeToSleepWhenCouldNotWriteMillis;
private final boolean isEnabled;

public NonBlockingStreamWriter(int timeToSleepWhenCouldNotWriteMillis) {
public NonBlockingStreamWriter(int timeToSleepWhenCouldNotWriteMillis, boolean isEnabled) {
this.timeToSleepWhenCouldNotWriteMillis = timeToSleepWhenCouldNotWriteMillis;
this.isEnabled = isEnabled;
}

public NonBlockingStreamWriter() {
this(DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS);
this(DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS, KILL_SWITCH);
}

public boolean isEnabled() {
return isEnabled;
}

/**
Original file line number Diff line number Diff line change
@@ -260,7 +260,7 @@ public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception
private STATE writeBodyPartToPipe(HttpResponseBodyPart bodyPart) throws IOException {
int bodyLength = bodyPart.length();
int spaceInPipe = availableSpaceInPipe();
if (spaceInPipe >= 0 && spaceInPipe < bodyLength) {
if (nonBlockingStreamWriter.isEnabled() && spaceInPipe >= 0 && spaceInPipe < bodyLength) {
// There is no room to write everything, so we defer the content writing to the output stream. Also, to avoid
// receiving more bodyParts temporarily, we have to pause the READ events.
final PauseHandler pauseHandler = bodyPart.getPauseHandler();
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ public class NonBlockingStreamWriterTestCase extends AbstractMuleTestCase {
@Before
public void setUp() {
// not scheduling always to test the sync test cases
nonBlockingStreamWriter = new NonBlockingStreamWriter(TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS);
nonBlockingStreamWriter = new NonBlockingStreamWriter(TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS, true);
}

@After
@@ -206,7 +206,7 @@ public void interruptTheThreadDoesntInterruptTheWriterIfNotStopped() throws Inte
@Test
public void interruptTheThreadAfterStopWillInterruptTheSleep() throws InterruptedException {
int ridiculouslyBigSleepMillis = Integer.MAX_VALUE;
NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis);
NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis, true);
Thread threadOutsideTheStaticExecutor = new Thread(writer);
threadOutsideTheStaticExecutor.start();

Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ public class ResponseBodyDeferringAsyncHandlerTestCase extends AbstractMuleTestC
private final PollingProber prober = new PollingProber(PROBE_TIMEOUT, POLL_DELAY);

private final ExecutorService workersExecutor = newFixedThreadPool(5);
private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter();
private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter(100, true);

private static final String READ_TIMEOUT_PROPERTY_NAME = "mule.http.responseStreaming.pipeReadTimeoutMillis";

0 comments on commit d9b36c7

Please sign in to comment.