-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: DROP stream for persistent query doesn't always drop underlying query #7601
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ | |
import io.confluent.ksql.services.KafkaTopicClient; | ||
import io.confluent.ksql.services.ServiceContext; | ||
import io.confluent.ksql.util.KsqlConfig; | ||
import io.confluent.ksql.util.KsqlConstants; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have an test for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
import io.confluent.ksql.util.PersistentQueryMetadata; | ||
import io.confluent.ksql.util.QueryMetadata; | ||
import io.confluent.ksql.util.QueryMetadataImpl; | ||
|
@@ -261,7 +262,7 @@ public void shouldBuildTransientQueryCorrectly() { | |
} | ||
|
||
@Test | ||
public void shouldBuildPersistentQueryCorrectly() { | ||
public void shouldBuildCreateAsPersistentQueryCorrectly() { | ||
// Given: | ||
final ProcessingLogger uncaughtProcessingLogger = mock(ProcessingLogger.class); | ||
when(processingLoggerFactory.getLogger( | ||
|
@@ -271,6 +272,7 @@ public void shouldBuildPersistentQueryCorrectly() { | |
|
||
// When: | ||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -295,12 +297,55 @@ public void shouldBuildPersistentQueryCorrectly() { | |
assertThat(queryMetadata.getOverriddenProperties(), equalTo(OVERRIDES)); | ||
assertThat(queryMetadata.getStreamsProperties(), equalTo(capturedStreamsProperties())); | ||
assertThat(queryMetadata.getProcessingLogger(), equalTo(uncaughtProcessingLogger)); | ||
assertThat(queryMetadata.getPersistentQueryType(), | ||
equalTo(KsqlConstants.PersistentQueryType.CREATE_AS)); | ||
} | ||
|
||
@Test | ||
public void shouldBuildInsertPersistentQueryCorrectly() { | ||
// Given: | ||
final ProcessingLogger uncaughtProcessingLogger = mock(ProcessingLogger.class); | ||
when(processingLoggerFactory.getLogger( | ||
QueryLoggerUtil.queryLoggerName(QUERY_ID, new QueryContext.Stacker() | ||
.push("ksql.logger.thread.exception.uncaught").getQueryContext()) | ||
)).thenReturn(uncaughtProcessingLogger); | ||
|
||
// When: | ||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.INSERT, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
SOURCES, | ||
physicalPlan, | ||
SUMMARY, | ||
queryListener, | ||
Collections::emptyList | ||
); | ||
queryMetadata.initialize(); | ||
|
||
// Then: | ||
assertThat(queryMetadata.getStatementString(), equalTo(STATEMENT_TEXT)); | ||
assertThat(queryMetadata.getQueryId(), equalTo(QUERY_ID)); | ||
assertThat(queryMetadata.getSinkName(), equalTo(SINK_NAME)); | ||
assertThat(queryMetadata.getPhysicalSchema(), equalTo(SINK_PHYSICAL_SCHEMA)); | ||
assertThat(queryMetadata.getResultTopic(), is(ksqlTopic)); | ||
assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES)); | ||
assertThat(queryMetadata.getDataSourceType(), equalTo(DataSourceType.KSTREAM)); | ||
assertThat(queryMetadata.getExecutionPlan(), equalTo(SUMMARY)); | ||
assertThat(queryMetadata.getTopology(), is(topology)); | ||
assertThat(queryMetadata.getOverriddenProperties(), equalTo(OVERRIDES)); | ||
assertThat(queryMetadata.getStreamsProperties(), equalTo(capturedStreamsProperties())); | ||
assertThat(queryMetadata.getProcessingLogger(), equalTo(uncaughtProcessingLogger)); | ||
assertThat(queryMetadata.getPersistentQueryType(), | ||
equalTo(KsqlConstants.PersistentQueryType.INSERT)); | ||
} | ||
|
||
@Test | ||
public void shouldBuildPersistentQueryWithCorrectStreamsApp() { | ||
// When: | ||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -321,6 +366,7 @@ public void shouldBuildPersistentQueryWithCorrectStreamsApp() { | |
public void shouldStartPersistentQueryWithCorrectMaterializationProvider() { | ||
// Given: | ||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -344,6 +390,7 @@ public void shouldStartPersistentQueryWithCorrectMaterializationProvider() { | |
public void shouldCreateKSMaterializationCorrectly() { | ||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -373,6 +420,7 @@ public void shouldCreateKSMaterializationCorrectly() { | |
public void shouldMaterializeCorrectlyOnStart() { | ||
// Given: | ||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -403,6 +451,7 @@ public void shouldNotIncludeMaterializationProviderIfNoMaterialization() { | |
when(tableHolder.getMaterializationBuilder()).thenReturn(Optional.empty()); | ||
|
||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -425,6 +474,7 @@ public void shouldNotIncludeMaterializationProviderIfNoMaterialization() { | |
public void shouldCreateExpectedServiceId() { | ||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -447,6 +497,7 @@ public void shouldCreateExpectedServiceId() { | |
public void shouldAddMetricsInterceptorsToStreamsConfig() { | ||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -478,6 +529,7 @@ private void shouldUseProvidedOptimizationConfig(final Object value) { | |
|
||
// When: | ||
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -540,6 +592,7 @@ public void shouldAddMetricsInterceptorsToExistingList() { | |
|
||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -566,6 +619,7 @@ public void shouldAddMetricsInterceptorsToExistingString() { | |
|
||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -593,6 +647,7 @@ public void shouldAddMetricsInterceptorsToExistingStringList() { | |
|
||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
@@ -624,6 +679,7 @@ public void shouldConfigureProducerErrorHandler() { | |
|
||
// When: | ||
queryBuilder.buildPersistentQuery( | ||
KsqlConstants.PersistentQueryType.CREATE_AS, | ||
STATEMENT_TEXT, | ||
QUERY_ID, | ||
sink, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.contains; | ||
import static org.hamcrest.Matchers.empty; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.mockito.ArgumentMatchers.any; | ||
|
@@ -24,6 +25,7 @@ | |
import io.confluent.ksql.query.QueryRegistryImpl.QueryExecutorFactory; | ||
import io.confluent.ksql.schema.ksql.LogicalSchema; | ||
import io.confluent.ksql.services.ServiceContext; | ||
import io.confluent.ksql.util.KsqlConstants; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we test the query gets removed from only the correct list here? It seems that would cover the case that was causing the the problem There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
import io.confluent.ksql.util.PersistentQueryMetadata; | ||
import io.confluent.ksql.util.PersistentQueryMetadataImpl; | ||
import io.confluent.ksql.util.QueryMetadata; | ||
|
@@ -196,6 +198,47 @@ public void shouldGetQueriesInsertingIntoOrReadingFromSource() { | |
assertThat(queries, contains(new QueryId("q2"), new QueryId("q3"), new QueryId("q4"))); | ||
} | ||
|
||
@Test | ||
public void shouldRemoveQueryFromCreateAsQueriesWhenTerminatingCreateAsQuery() { | ||
// Given: | ||
givenCreate(registry, "q1", "source", "sink1", true); | ||
givenCreate(registry, "i1", "source", "sink1", false); | ||
final QueryRegistry sandbox = registry.createSandbox(); | ||
final QueryMetadata q1 = sandbox.getPersistentQuery(new QueryId("q1")).get(); | ||
final QueryMetadata i1 = sandbox.getPersistentQuery(new QueryId("i1")).get(); | ||
|
||
// When: | ||
q1.close(); | ||
final Optional<QueryMetadata> createAsQueries = | ||
sandbox.getCreateAsQuery(SourceName.of("sink1")); | ||
final Set<QueryId> insertQueries = | ||
sandbox.getInsertQueries(SourceName.of("sink1"), (n, q) -> true); | ||
|
||
// Then: | ||
assertThat(createAsQueries, equalTo(Optional.empty())); | ||
assertThat(insertQueries, contains(i1.getQueryId())); | ||
} | ||
|
||
@Test | ||
public void shouldRemoveQueryFromInsertQueriesWhenTerminatingInsertQuery() { | ||
// Given: | ||
givenCreate(registry, "q1", "source", "sink1", true); | ||
givenCreate(registry, "i1", "source", "sink1", false); | ||
final QueryRegistry sandbox = registry.createSandbox(); | ||
final QueryMetadata q1 = sandbox.getPersistentQuery(new QueryId("q1")).get(); | ||
final QueryMetadata i1 = sandbox.getPersistentQuery(new QueryId("i1")).get(); | ||
|
||
// When: | ||
i1.close(); | ||
final QueryMetadata createAsQueries = sandbox.getCreateAsQuery(SourceName.of("sink1")).get(); | ||
final Set<QueryId> insertQueries = | ||
sandbox.getInsertQueries(SourceName.of("sink1"), (n, q) -> true); | ||
|
||
// Then: | ||
assertThat(createAsQueries, equalTo(q1)); | ||
assertThat(insertQueries, empty()); | ||
} | ||
|
||
@Test | ||
public void shouldCopyInsertsOnSandbox() { | ||
// Given: | ||
|
@@ -319,7 +362,7 @@ private QueryMetadata.Listener givenCreateGetListener( | |
) { | ||
givenCreate(registry, id, "source", "sink1", true); | ||
verify(executor).buildPersistentQuery( | ||
any(), any(), any(), any(), any(), any(), queryListenerCaptor.capture(), any()); | ||
any(), any(), any(), any(), any(), any(), any(), queryListenerCaptor.capture(), any()); | ||
return queryListenerCaptor.getValue(); | ||
} | ||
|
||
|
@@ -338,8 +381,11 @@ private PersistentQueryMetadata givenCreate( | |
when(query.getSinkName()).thenReturn(SourceName.of(sink)); | ||
when(query.getSink()).thenReturn(sinkSource); | ||
when(query.getSourceNames()).thenReturn(ImmutableSet.of(SourceName.of(source))); | ||
when(query.getPersistentQueryType()).thenReturn(createAs | ||
? KsqlConstants.PersistentQueryType.CREATE_AS | ||
: KsqlConstants.PersistentQueryType.INSERT); | ||
when(executor.buildPersistentQuery( | ||
any(), any(), any(), any(), any(), any(), any(), any()) | ||
any(), any(), any(), any(), any(), any(), any(), any(), any()) | ||
).thenReturn(query); | ||
registry.createOrReplacePersistentQuery( | ||
config, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this comment was not true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. I thought it was true when I implemented this feature. I don't know why it wasn't it.