From b9c8d631013a2a3b46edeff6daa7d06dc856df72 Mon Sep 17 00:00:00 2001 From: Rohan Date: Sun, 16 May 2021 23:27:00 -0700 Subject: [PATCH] fix: npe on transient query close (#7530) Sometimes when queries close the closing thread times out and leaves behind the cleanup thread in streams. Then, this thread calls the state change callback which causes our metrics listener to throw an NPE. This patch changes the listener to deal with this case by checking for null values --- .../QueryStateMetricsReportingListener.java | 14 +++++++++++-- ...ueryStateMetricsReportingListenerTest.java | 20 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java index be965dfd14d5..1b7fa7f66b31 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java @@ -66,12 +66,22 @@ public void onCreate( @Override public void onStateChange(final QueryMetadata query, final State before, final State after) { - perQuery.get(query.getQueryId()).onChange(before, after); + // this may be called after the query is deregistered, because shutdown is ansynchronous and + // may time out. when ths happens, the shutdown thread in streams may call this method. + final PerQueryListener listener = perQuery.get(query.getQueryId()); + if (listener != null) { + listener.onChange(before, after); + } } @Override public void onError(final QueryMetadata query, final QueryError error) { - perQuery.get(query.getQueryId()).onError(error); + // this may be called after the query is deregistered, because shutdown is ansynchronous and + // may time out. when ths happens, the shutdown thread in streams may call this method. + final PerQueryListener listener = perQuery.get(query.getQueryId()); + if (listener != null) { + listener.onError(error); + } } @Override diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java index a63df836c80b..340e2b652cc5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java @@ -99,6 +99,26 @@ public void shouldAddMetricOnCreation() { verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class)); } + @Test + public void shouldGracefullyHandleStateCallbackAfterDeregister() { + // Given: + listener.onCreate(serviceContext, metaStore, query); + listener.onDeregister(query); + + // When/Then(don't throw) + listener.onStateChange(query, State.RUNNING, State.NOT_RUNNING); + } + + @Test + public void shouldGracefullyHandleErrorCallbackAfterDeregister() { + // Given: + listener.onCreate(serviceContext, metaStore, query); + listener.onDeregister(query); + + // When/Then(don't throw) + listener.onError(query, new QueryError(123, "foo", Type.USER)); + } + @Test public void shouldAddMetricWithSuppliedPrefix() { // Given: