Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StaticQueryFunctionalTest on latest master is flaky #3441

Closed
vcrfxia opened this issue Sep 28, 2019 · 0 comments · Fixed by #3442
Closed

StaticQueryFunctionalTest on latest master is flaky #3441

vcrfxia opened this issue Sep 28, 2019 · 0 comments · Fixed by #3442

Comments

@vcrfxia
Copy link
Contributor

vcrfxia commented Sep 28, 2019

This started a couple days ago. The error is similar to

java.lang.AssertionError: 
Failed to await result.msg: Table 'ID_18' is not materialized. KSQL currently only supports static queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields> FROM <sources> GROUP BY <key>' style statement.

	at io.confluent.ksql.rest.integration.StaticQueryFunctionalTest.makeKsqlRequest(StaticQueryFunctionalTest.java:209)
	at io.confluent.ksql.rest.integration.StaticQueryFunctionalTest.makeStaticQueryRequest(StaticQueryFunctionalTest.java:197)
	at io.confluent.ksql.rest.integration.StaticQueryFunctionalTest.shouldGetSingleKeyFromBothNodes(StaticQueryFunctionalTest.java:155)

and affects both tests in the test file.

A (fairly) reliable way to reproduce the failure is to delete the Kafka Streams state dir prior to running the test. At least one of the two tests fails ~90% of the time for me locally. Jenkins builds have been seeing ~60% failure rate.

These failures are because of a race between the test's two rest apps trying to start Kafka streams apps for the same CTAS query at the same time. Specifically, when trying to create the same state dir, one of them gets locked out and fails with:

[2019-09-27 18:21:14,194] ERROR Failed to handle: Command{statement='CREATE TABLE ID_18 WITH (KAFKA_TOPIC='ID_18', PARTITIONS=2, REPLICAS=1) AS SELECT COUNT(1) "COUNT"
FROM USERS USERS
GROUP BY USERS.USERID
EMIT CHANGES;',useOffsetAsQueryID=true, overwriteProperties={}} (io.confluent.ksql.rest.server.computation.StatementExecutor:212)
io.confluent.ksql.util.KsqlStatementException: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/tmp/kafka-streams/_confluent-ksql-default_query_CTAS_ID_18_1] doesn't exist and couldn't be created
Statement: CREATE TABLE ID_18 WITH (KAFKA_TOPIC='ID_18', PARTITIONS=2, REPLICAS=1) AS SELECT COUNT(1) "COUNT"
FROM USERS USERS
GROUP BY USERS.USERID
EMIT CHANGES;
	at io.confluent.ksql.engine.EngineExecutor.execute(EngineExecutor.java:115)
	at io.confluent.ksql.engine.KsqlEngine.execute(KsqlEngine.java:176)
	at io.confluent.ksql.engine.KsqlEngine.execute(KsqlEngine.java:166)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.startQuery(StatementExecutor.java:337)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.executeStatement(StatementExecutor.java:234)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatementWithTerminatedQueries(StatementExecutor.java:209)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatement(StatementExecutor.java:123)
	at io.confluent.ksql.rest.server.computation.CommandRunner.lambda$executeStatement$3(CommandRunner.java:172)
	at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:84)
	at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:57)
	at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:40)
	at io.confluent.ksql.rest.server.computation.CommandRunner.executeStatement(CommandRunner.java:177)
	at io.confluent.ksql.rest.server.computation.CommandRunner.fetchAndRunCommands(CommandRunner.java:161)
	at io.confluent.ksql.rest.server.computation.CommandRunner$Runner.run(CommandRunner.java:214)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/tmp/kafka-streams/_confluent-ksql-default_query_CTAS_ID_18_1] doesn't exist and couldn't be created
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:694)
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:563)
	at io.confluent.ksql.physical.KafkaStreamsBuilderImpl.buildKafkaStreams(KafkaStreamsBuilderImpl.java:41)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPlanForStructuredOutputNode(PhysicalPlanBuilder.java:283)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:170)
	at io.confluent.ksql.engine.QueryEngine.buildPhysicalPlan(QueryEngine.java:127)
	at io.confluent.ksql.engine.EngineExecutor.execute(EngineExecutor.java:100)
	... 18 more
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/tmp/kafka-streams/_confluent-ksql-default_query_CTAS_ID_18_1] doesn't exist and couldn't be created
	at org.apache.kafka.streams.processor.internals.StateDirectory.<init>(StateDirectory.java:90)
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:692)
	... 24 more

(Line where the error is being thrown: https://github.com/apache/kafka/blob/8818a7037dff8b099b2516460b34353ca0c38a2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java#L89)

This error occurs part way through building the physical plan in KSQL, after the sink has been added to the metastore but before the query was added. This leads to the observed test assertion failure because the test can't find the query in metastore, and the check to verify that table is materialized fails as a result.

This flakiness started after #3354 was merged. (These tests do not fail on the commit right before this.) This is because the query ID PR makes it so that query ID used during request validation is different from query ID during execution. Prior to this change (when query ID was the same in validation and execution), the necessary state dir was created at the time of validation by the single rest app validating the request, so the state dir already existed by the time the two rest apps needed it during execution, so there was no race to create the directory.

This seems like a bug in Kafka Streams? Two instances starting the same app at the same time shouldn't hit a file system race like this. (Relevant issue that is curiously marked as resolved, though I don't see a resolution: https://issues.apache.org/jira/browse/KAFKA-5038) Actually maybe not since this doesn't really fit with the intended usage model for Kafka Streams...

This flakiness is not easy to fix in KSQL with the new query ID change since it is not possible to, at the time of validation, predict what the query ID during execution will be, in order to create the state dir in advance as was previously the case.

Thoughts/suggestions on how to resolve this flakiness?

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Sep 28, 2019
Fixes: confluentinc#3441

Fix is to use different tmp directories per app.  Two instances should not be using the same dir anyway.
big-andy-coates added a commit that referenced this issue Sep 29, 2019
Fixes: #3441

Fix is to use different tmp directories per app.  Two instances should not be using the same dir anyway.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant