You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This started a couple days ago. The error is similar to
java.lang.AssertionError:
Failed to await result.msg: Table 'ID_18' is not materialized. KSQL currently only supports static queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields> FROM <sources> GROUP BY <key>' style statement.
at io.confluent.ksql.rest.integration.StaticQueryFunctionalTest.makeKsqlRequest(StaticQueryFunctionalTest.java:209)
at io.confluent.ksql.rest.integration.StaticQueryFunctionalTest.makeStaticQueryRequest(StaticQueryFunctionalTest.java:197)
at io.confluent.ksql.rest.integration.StaticQueryFunctionalTest.shouldGetSingleKeyFromBothNodes(StaticQueryFunctionalTest.java:155)
and affects both tests in the test file.
A (fairly) reliable way to reproduce the failure is to delete the Kafka Streams state dir prior to running the test. At least one of the two tests fails ~90% of the time for me locally. Jenkins builds have been seeing ~60% failure rate.
These failures are because of a race between the test's two rest apps trying to start Kafka streams apps for the same CTAS query at the same time. Specifically, when trying to create the same state dir, one of them gets locked out and fails with:
[2019-09-27 18:21:14,194] ERROR Failed to handle: Command{statement='CREATE TABLE ID_18 WITH (KAFKA_TOPIC='ID_18', PARTITIONS=2, REPLICAS=1) AS SELECT COUNT(1) "COUNT"
FROM USERS USERS
GROUP BY USERS.USERID
EMIT CHANGES;',useOffsetAsQueryID=true, overwriteProperties={}} (io.confluent.ksql.rest.server.computation.StatementExecutor:212)
io.confluent.ksql.util.KsqlStatementException: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/tmp/kafka-streams/_confluent-ksql-default_query_CTAS_ID_18_1] doesn't exist and couldn't be created
Statement: CREATE TABLE ID_18 WITH (KAFKA_TOPIC='ID_18', PARTITIONS=2, REPLICAS=1) AS SELECT COUNT(1) "COUNT"
FROM USERS USERS
GROUP BY USERS.USERID
EMIT CHANGES;
at io.confluent.ksql.engine.EngineExecutor.execute(EngineExecutor.java:115)
at io.confluent.ksql.engine.KsqlEngine.execute(KsqlEngine.java:176)
at io.confluent.ksql.engine.KsqlEngine.execute(KsqlEngine.java:166)
at io.confluent.ksql.rest.server.computation.StatementExecutor.startQuery(StatementExecutor.java:337)
at io.confluent.ksql.rest.server.computation.StatementExecutor.executeStatement(StatementExecutor.java:234)
at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatementWithTerminatedQueries(StatementExecutor.java:209)
at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatement(StatementExecutor.java:123)
at io.confluent.ksql.rest.server.computation.CommandRunner.lambda$executeStatement$3(CommandRunner.java:172)
at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:84)
at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:57)
at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:40)
at io.confluent.ksql.rest.server.computation.CommandRunner.executeStatement(CommandRunner.java:177)
at io.confluent.ksql.rest.server.computation.CommandRunner.fetchAndRunCommands(CommandRunner.java:161)
at io.confluent.ksql.rest.server.computation.CommandRunner$Runner.run(CommandRunner.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/tmp/kafka-streams/_confluent-ksql-default_query_CTAS_ID_18_1] doesn't exist and couldn't be created
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:694)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:563)
at io.confluent.ksql.physical.KafkaStreamsBuilderImpl.buildKafkaStreams(KafkaStreamsBuilderImpl.java:41)
at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPlanForStructuredOutputNode(PhysicalPlanBuilder.java:283)
at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:170)
at io.confluent.ksql.engine.QueryEngine.buildPhysicalPlan(QueryEngine.java:127)
at io.confluent.ksql.engine.EngineExecutor.execute(EngineExecutor.java:100)
... 18 more
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/tmp/kafka-streams/_confluent-ksql-default_query_CTAS_ID_18_1] doesn't exist and couldn't be created
at org.apache.kafka.streams.processor.internals.StateDirectory.<init>(StateDirectory.java:90)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:692)
... 24 more
This error occurs part way through building the physical plan in KSQL, after the sink has been added to the metastore but before the query was added. This leads to the observed test assertion failure because the test can't find the query in metastore, and the check to verify that table is materialized fails as a result.
This flakiness started after #3354 was merged. (These tests do not fail on the commit right before this.) This is because the query ID PR makes it so that query ID used during request validation is different from query ID during execution. Prior to this change (when query ID was the same in validation and execution), the necessary state dir was created at the time of validation by the single rest app validating the request, so the state dir already existed by the time the two rest apps needed it during execution, so there was no race to create the directory.
This seems like a bug in Kafka Streams? Two instances starting the same app at the same time shouldn't hit a file system race like this. (Relevant issue that is curiously marked as resolved, though I don't see a resolution: https://issues.apache.org/jira/browse/KAFKA-5038) Actually maybe not since this doesn't really fit with the intended usage model for Kafka Streams...
This flakiness is not easy to fix in KSQL with the new query ID change since it is not possible to, at the time of validation, predict what the query ID during execution will be, in order to create the state dir in advance as was previously the case.
Thoughts/suggestions on how to resolve this flakiness?
The text was updated successfully, but these errors were encountered:
This started a couple days ago. The error is similar to
and affects both tests in the test file.
A (fairly) reliable way to reproduce the failure is to delete the Kafka Streams state dir prior to running the test. At least one of the two tests fails ~90% of the time for me locally. Jenkins builds have been seeing ~60% failure rate.
These failures are because of a race between the test's two rest apps trying to start Kafka streams apps for the same CTAS query at the same time. Specifically, when trying to create the same state dir, one of them gets locked out and fails with:
(Line where the error is being thrown: https://github.com/apache/kafka/blob/8818a7037dff8b099b2516460b34353ca0c38a2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java#L89)
This error occurs part way through building the physical plan in KSQL, after the sink has been added to the metastore but before the query was added. This leads to the observed test assertion failure because the test can't find the query in metastore, and the check to verify that table is materialized fails as a result.
This flakiness started after #3354 was merged. (These tests do not fail on the commit right before this.) This is because the query ID PR makes it so that query ID used during request validation is different from query ID during execution. Prior to this change (when query ID was the same in validation and execution), the necessary state dir was created at the time of validation by the single rest app validating the request, so the state dir already existed by the time the two rest apps needed it during execution, so there was no race to create the directory.
This seems like a bug in Kafka Streams? Two instances starting the same app at the same time shouldn't hit a file system race like this. (Relevant issue that is curiously marked as resolved, though I don't see a resolution: https://issues.apache.org/jira/browse/KAFKA-5038) Actually maybe not since this doesn't really fit with the intended usage model for Kafka Streams...
This flakiness is not easy to fix in KSQL with the new query ID change since it is not possible to, at the time of validation, predict what the query ID during execution will be, in order to create the state dir in advance as was previously the case.
Thoughts/suggestions on how to resolve this flakiness?
The text was updated successfully, but these errors were encountered: