Skip to content

Commit

Permalink
fix: Fixes race condition exposing uninitialized query (#7627)
Browse files Browse the repository at this point in the history
* fix: Fixes race condition exposing uninitialized query
  • Loading branch information
AlanConfluent authored Jun 9, 2021
1 parent 10a60f9 commit 98b1e3c
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ private void registerQuery(
unregisterQuery(oldQuery);
}

// Initialize the query before it's exposed to other threads via the map/sets.
persistentQuery.initialize();
persistentQueries.put(queryId, persistentQuery);
if (createAsQuery) {
createAsQueries.put(persistentQuery.getSinkName(), queryId);
Expand All @@ -282,10 +284,12 @@ private void registerQuery(
insertQueries.computeIfAbsent(sourceName,
x -> Collections.synchronizedSet(new HashSet<>())).add(queryId));
}
} else {
// Initialize the query before it's exposed to other threads via {@link allLiveQueries}.
query.initialize();
}
allLiveQueries.add(query);
notifyCreate(serviceContext, metaStore, query);
query.initialize();
}

private void unregisterQuery(final QueryMetadata query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ Optional<Materialization> getMaterialization(
QueryContext.Stacker contextStacker
);

void restart();

void stop();

StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

Expand All @@ -54,9 +53,9 @@ public class PersistentQueryMetadataImpl
private final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder>
materializationProviderBuilder;
private final Optional<ScalablePushRegistry> scalablePushRegistry;
private final ProcessingLogger processingLogger;

private Optional<MaterializationProvider> materializationProvider;
private ProcessingLogger processingLogger;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public PersistentQueryMetadataImpl(
Expand Down Expand Up @@ -203,23 +202,6 @@ public Optional<Materialization> getMaterialization(
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
}

public synchronized void restart() {
if (isClosed()) {
throw new IllegalStateException(String.format(
"Query with application id %s is already closed, cannot restart.",
getQueryApplicationId()));
}

closeKafkaStreams();

final KafkaStreams newKafkaStreams = buildKafkaStreams();
materializationProvider = materializationProviderBuilder.flatMap(
builder -> builder.apply(newKafkaStreams, getTopology()));

resetKafkaStreams(newKafkaStreams);
start();
}

/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.LagInfo;
Expand Down Expand Up @@ -70,9 +69,10 @@ public class QueryMetadataImpl implements QueryMetadata {
private final RetryEvent retryEvent;
private final Listener listener;

private boolean everStarted = false;
protected boolean closed = false;
private StreamsUncaughtExceptionHandler uncaughtExceptionHandler = this::uncaughtHandler;
private volatile boolean everStarted = false;
protected volatile boolean closed = false;
// These fields don't need synchronization because they are initialized in initialize() before
// the object is made available to other threads.
private KafkaStreams kafkaStreams;
private boolean initialized = false;

Expand Down Expand Up @@ -146,7 +146,6 @@ public long read() {
this.closeTimeout = other.closeTimeout;
this.queryId = other.getQueryId();
this.errorClassifier = other.errorClassifier;
this.uncaughtExceptionHandler = other.uncaughtExceptionHandler;
this.everStarted = other.everStarted;
this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0);
this.retryEvent = new RetryEvent(
Expand Down Expand Up @@ -222,7 +221,6 @@ public String getStatementString() {
}

public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) {
this.uncaughtExceptionHandler = handler;
kafkaStreams.setUncaughtExceptionHandler(handler);
}

Expand Down Expand Up @@ -304,9 +302,9 @@ Listener getListener() {
return listener;
}

protected void resetKafkaStreams(final KafkaStreams kafkaStreams) {
private void resetKafkaStreams(final KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setUncaughtExceptionHandler(this::uncaughtHandler);
kafkaStreams.setStateListener((b, a) -> listener.onStateChange(this, b, a));
}

Expand All @@ -322,10 +320,6 @@ protected void closeKafkaStreams() {
}
}

protected KafkaStreams buildKafkaStreams() {
return kafkaStreamsBuilder.build(topology, streamsProperties);
}

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package io.confluent.ksql.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -48,7 +46,6 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -240,42 +237,6 @@ public void shouldNotCleanUpKStreamsAppOnStop() {
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldRestartKafkaStreams() {
final KafkaStreams newKafkaStreams = mock(KafkaStreams.class);
final MaterializationProvider newMaterializationProvider = mock(MaterializationProvider.class);

// Given:
when(kafkaStreamsBuilder.build(any(), any())).thenReturn(newKafkaStreams);
when(materializationProviderBuilder.apply(newKafkaStreams, topology))
.thenReturn(Optional.of(newMaterializationProvider));

// When:
query.restart();

// Then:
final InOrder inOrder = inOrder(kafkaStreams, newKafkaStreams);
inOrder.verify(kafkaStreams).close(any());
inOrder.verify(newKafkaStreams).setUncaughtExceptionHandler(
any(StreamsUncaughtExceptionHandler.class));
inOrder.verify(newKafkaStreams).start();

assertThat(query.getKafkaStreams(), is(newKafkaStreams));
assertThat(query.getMaterializationProvider(), is(Optional.of(newMaterializationProvider)));
}

@Test
public void shouldNotRestartIfQueryIsClosed() {
// Given:
query.close();

// When:
final Exception e = assertThrows(Exception.class, () -> query.restart());

// Then:
assertThat(e.getMessage(), containsString("is already closed, cannot restart."));
}

@Test
public void shouldCallProcessingLoggerOnError() {
// Given:
Expand Down

0 comments on commit 98b1e3c

Please sign in to comment.