Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-17624200: Add kill switch to the non-blocking stream writer (#703) #706

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ public void start() {
.withName(name), DEFAULT_SELECTOR_THREAD_COUNT);
workerScheduler = getWorkerScheduler(schedulersConfig.withName(name + ".requester.workers"));

if (streamingEnabled) {
if (streamingEnabled && nonBlockingStreamWriter.isEnabled()) {
// Only use a dedicated thread to the stream writer if streaming is enabled.
// Also, only use it if the feature is enabled.
workerScheduler.submit(nonBlockingStreamWriter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.mule.service.http.impl.service.client;

import static java.lang.Boolean.getBoolean;
import static java.lang.Math.min;
import static java.lang.Thread.currentThread;
import static java.lang.Thread.sleep;
Expand Down Expand Up @@ -38,19 +39,27 @@
*/
public class NonBlockingStreamWriter implements Runnable {

private static final boolean KILL_SWITCH = getBoolean("mule.http.client.responseStreaming.nonBlockingWriter");

private static final Logger LOGGER = getLogger(NonBlockingStreamWriter.class);
private static final int DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS = 100;

private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final BlockingQueue<InternalWriteTask> tasks = new LinkedBlockingQueue<>();
private final int timeToSleepWhenCouldNotWriteMillis;
private final boolean isEnabled;

public NonBlockingStreamWriter(int timeToSleepWhenCouldNotWriteMillis) {
public NonBlockingStreamWriter(int timeToSleepWhenCouldNotWriteMillis, boolean isEnabled) {
this.timeToSleepWhenCouldNotWriteMillis = timeToSleepWhenCouldNotWriteMillis;
this.isEnabled = isEnabled;
}

public NonBlockingStreamWriter() {
this(DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS);
this(DEFAULT_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS, KILL_SWITCH);
}

public boolean isEnabled() {
return isEnabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception
private STATE writeBodyPartToPipe(HttpResponseBodyPart bodyPart) throws IOException {
int bodyLength = bodyPart.length();
int spaceInPipe = availableSpaceInPipe();
if (spaceInPipe >= 0 && spaceInPipe < bodyLength) {
if (nonBlockingStreamWriter.isEnabled() && spaceInPipe >= 0 && spaceInPipe < bodyLength) {
// There is no room to write everything, so we defer the content writing to the output stream. Also, to avoid
// receiving more bodyParts temporarily, we have to pause the READ events.
final PauseHandler pauseHandler = bodyPart.getPauseHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class NonBlockingStreamWriterTestCase extends AbstractMuleTestCase {
@Before
public void setUp() {
// not scheduling always to test the sync test cases
nonBlockingStreamWriter = new NonBlockingStreamWriter(TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS);
nonBlockingStreamWriter = new NonBlockingStreamWriter(TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS, true);
}

@After
Expand Down Expand Up @@ -206,7 +206,7 @@ public void interruptTheThreadDoesntInterruptTheWriterIfNotStopped() throws Inte
@Test
public void interruptTheThreadAfterStopWillInterruptTheSleep() throws InterruptedException {
int ridiculouslyBigSleepMillis = Integer.MAX_VALUE;
NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis);
NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis, true);
Thread threadOutsideTheStaticExecutor = new Thread(writer);
threadOutsideTheStaticExecutor.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class ResponseBodyDeferringAsyncHandlerTestCase extends AbstractMuleTestC
private final PollingProber prober = new PollingProber(PROBE_TIMEOUT, POLL_DELAY);

private final ExecutorService workersExecutor = newFixedThreadPool(5);
private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter();
private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter(100, true);

private static final String READ_TIMEOUT_PROPERTY_NAME = "mule.http.responseStreaming.pipeReadTimeoutMillis";

Expand Down