diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java index b86222480a0f..609f2c2935ce 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java @@ -273,6 +273,8 @@ private void registerQuery( unregisterQuery(oldQuery); } + // Initialize the query before it's exposed to other threads via the map/sets. + persistentQuery.initialize(); persistentQueries.put(queryId, persistentQuery); if (createAsQuery) { createAsQueries.put(persistentQuery.getSinkName(), queryId); @@ -282,10 +284,12 @@ private void registerQuery( insertQueries.computeIfAbsent(sourceName, x -> Collections.synchronizedSet(new HashSet<>())).add(queryId)); } + } else { + // Initialize the query before it's exposed to other threads via {@link allLiveQueries}. + query.initialize(); } allLiveQueries.add(query); notifyCreate(serviceContext, metaStore, query); - query.initialize(); } private void unregisterQuery(final QueryMetadata query) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index 7a53413c2daa..ad50bbb83f66 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -55,8 +55,6 @@ Optional getMaterialization( QueryContext.Stacker contextStacker ); - void restart(); - void stop(); StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java index 3ea6e7e705df..f3a3dcd99f57 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -54,9 +53,9 @@ public class PersistentQueryMetadataImpl private final Optional materializationProviderBuilder; private final Optional scalablePushRegistry; + private final ProcessingLogger processingLogger; private Optional materializationProvider; - private ProcessingLogger processingLogger; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck public PersistentQueryMetadataImpl( @@ -203,23 +202,6 @@ public Optional getMaterialization( return materializationProvider.map(builder -> builder.build(queryId, contextStacker)); } - public synchronized void restart() { - if (isClosed()) { - throw new IllegalStateException(String.format( - "Query with application id %s is already closed, cannot restart.", - getQueryApplicationId())); - } - - closeKafkaStreams(); - - final KafkaStreams newKafkaStreams = buildKafkaStreams(); - materializationProvider = materializationProviderBuilder.flatMap( - builder -> builder.apply(newKafkaStreams, getTopology())); - - resetKafkaStreams(newKafkaStreams); - start(); - } - /** * Stops the query without cleaning up the external resources * so that it can be resumed when we call {@link #start()}. diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java index 733bcd1b633a..4c33781a2b33 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java @@ -39,7 +39,6 @@ import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; - import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -70,9 +69,10 @@ public class QueryMetadataImpl implements QueryMetadata { private final RetryEvent retryEvent; private final Listener listener; - private boolean everStarted = false; - protected boolean closed = false; - private StreamsUncaughtExceptionHandler uncaughtExceptionHandler = this::uncaughtHandler; + private volatile boolean everStarted = false; + protected volatile boolean closed = false; + // These fields don't need synchronization because they are initialized in initialize() before + // the object is made available to other threads. private KafkaStreams kafkaStreams; private boolean initialized = false; @@ -146,7 +146,6 @@ public long read() { this.closeTimeout = other.closeTimeout; this.queryId = other.getQueryId(); this.errorClassifier = other.errorClassifier; - this.uncaughtExceptionHandler = other.uncaughtExceptionHandler; this.everStarted = other.everStarted; this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0); this.retryEvent = new RetryEvent( @@ -222,7 +221,6 @@ public String getStatementString() { } public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) { - this.uncaughtExceptionHandler = handler; kafkaStreams.setUncaughtExceptionHandler(handler); } @@ -304,9 +302,9 @@ Listener getListener() { return listener; } - protected void resetKafkaStreams(final KafkaStreams kafkaStreams) { + private void resetKafkaStreams(final KafkaStreams kafkaStreams) { this.kafkaStreams = kafkaStreams; - setUncaughtExceptionHandler(uncaughtExceptionHandler); + setUncaughtExceptionHandler(this::uncaughtHandler); kafkaStreams.setStateListener((b, a) -> listener.onStateChange(this, b, a)); } @@ -322,10 +320,6 @@ protected void closeKafkaStreams() { } } - protected KafkaStreams buildKafkaStreams() { - return kafkaStreamsBuilder.build(topology, streamsProperties); - } - /** * Closes the {@code QueryMetadata} and cleans up any of * the resources associated with it (e.g. internal topics, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index 963b222e5078..caa8d3225b06 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -16,9 +16,7 @@ package io.confluent.ksql.util; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -48,7 +46,6 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -240,42 +237,6 @@ public void shouldNotCleanUpKStreamsAppOnStop() { verify(kafkaStreams, never()).cleanUp(); } - @Test - public void shouldRestartKafkaStreams() { - final KafkaStreams newKafkaStreams = mock(KafkaStreams.class); - final MaterializationProvider newMaterializationProvider = mock(MaterializationProvider.class); - - // Given: - when(kafkaStreamsBuilder.build(any(), any())).thenReturn(newKafkaStreams); - when(materializationProviderBuilder.apply(newKafkaStreams, topology)) - .thenReturn(Optional.of(newMaterializationProvider)); - - // When: - query.restart(); - - // Then: - final InOrder inOrder = inOrder(kafkaStreams, newKafkaStreams); - inOrder.verify(kafkaStreams).close(any()); - inOrder.verify(newKafkaStreams).setUncaughtExceptionHandler( - any(StreamsUncaughtExceptionHandler.class)); - inOrder.verify(newKafkaStreams).start(); - - assertThat(query.getKafkaStreams(), is(newKafkaStreams)); - assertThat(query.getMaterializationProvider(), is(Optional.of(newMaterializationProvider))); - } - - @Test - public void shouldNotRestartIfQueryIsClosed() { - // Given: - query.close(); - - // When: - final Exception e = assertThrows(Exception.class, () -> query.restart()); - - // Then: - assertThat(e.getMessage(), containsString("is already closed, cannot restart.")); - } - @Test public void shouldCallProcessingLoggerOnError() { // Given: