Skip to content

Commit

Permalink
KAFKA-17100: GlobalStreamThread#start should not busy-wait (apache#16914
Browse files Browse the repository at this point in the history
)

This PR replaces a busy-wait sleep with a CountDownLatch.

Reviewers: Greg Harris <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
raminqaf authored and bboyleonp666 committed Sep 4, 2024
1 parent da77f91 commit 05363e1
Showing 1 changed file with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand All @@ -45,9 +44,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
Expand All @@ -72,6 +71,7 @@ public class GlobalStreamThread extends Thread {
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* The states that the global stream thread can be in
Expand Down Expand Up @@ -194,12 +194,6 @@ public boolean inErrorState() {
}
}

public boolean stillInitializing() {
synchronized (stateLock) {
return state.equals(CREATED);
}
}

public GlobalStreamThread(final ProcessorTopology topology,
final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer,
Expand Down Expand Up @@ -436,6 +430,8 @@ private StateConsumer initialize() {
} catch (final Exception fatalException) {
closeStateConsumer(stateConsumer, false);
startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException);
} finally {
initializationLatch.countDown();
}
return null;
}
Expand All @@ -453,11 +449,15 @@ private void closeStateConsumer(final StateConsumer stateConsumer, final boolean
@Override
public synchronized void start() {
super.start();
while (stillInitializing()) {
Utils.sleep(1);
if (startupException != null) {
throw startupException;
}
try {
initializationLatch.await();
} catch (final InterruptedException e) {
currentThread().interrupt();
throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e);
}

if (startupException != null) {
throw startupException;
}

if (inErrorState()) {
Expand All @@ -469,6 +469,7 @@ public void shutdown() {
// one could call shutdown() multiple times, so ignore subsequent calls
// if already shutting down or dead
setState(PENDING_SHUTDOWN);
initializationLatch.countDown();
}

public Map<MetricName, Metric> consumerMetrics() {
Expand Down

0 comments on commit 05363e1

Please sign in to comment.