Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add KSQL processing log message on uncaught streams exceptions #6253

Merged
merged 3 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/developer-guide/ksqldb-reference/show-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Host Query Status : {192.168.1.6:8088=RUNNING, 192.168.1.6:8089=RUNNING}
LOGGER | VARCHAR(STRING)
LEVEL | VARCHAR(STRING)
TIME | BIGINT
MESSAGE | STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORDB64 VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORD VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING)>>
MESSAGE | STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORDB64 VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORD VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING)>, KAFKASTREAMSTHREADERROR STRUCT<THREADNAME VARCHAR(STRING), ERRORMESSAGE VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>>
-------------------------------------------------------------------------------------

Sources that this query reads from:
Expand Down
20 changes: 19 additions & 1 deletion docs/developer-guide/test-and-debug/processing-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ message.serializationError.topic (STRING)
: The {{ site.ak }} topic to which the ksqlDB row that failed to serialize
would have been produced.

message.kafkaStreamsThreadError.message (STRING)

: A string containing a human-readable error message detailing the
error encountered.

message.kafkaStreamsThreadError.name (STRING)

: A string containing the thread name.

message.kafkaStreamsThreadError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

Log Stream
----------

Expand Down Expand Up @@ -307,7 +321,11 @@ ksql> CREATE STREAM PROCESSING_LOG_STREAM (
errorMessage STRING,
record STRING,
cause ARRAY<STRING>,
`topic` STRING>>)
`topic` STRING>>,
kafkaStreamsError STRUCT<
threadName STRING,
errorMessage STRING,
cause ARRAY<STRING>)
WITH (KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,24 @@ public final class ProcessingLogMessageSchema {
.optional()
.build();

public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "threadName";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE = "cause";

private static final Schema KAFKA_STREAMS_THREAD_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "KafkaStreamsThreadError")
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.optional()
.build();

public enum MessageType {
DESERIALIZATION_ERROR(0, DESERIALIZATION_ERROR_SCHEMA),
RECORD_PROCESSING_ERROR(1, RECORD_PROCESSING_ERROR_SCHEMA),
PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA),
SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA);
SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA),
KAFKA_STREAMS_THREAD_ERROR(4, KAFKA_STREAMS_THREAD_ERROR_SCHEMA);

private final int typeId;
private final Schema schema;
Expand All @@ -104,6 +117,7 @@ public Schema getSchema() {
public static final String RECORD_PROCESSING_ERROR = "recordProcessingError";
public static final String PRODUCTION_ERROR = "productionError";
public static final String SERIALIZATION_ERROR = "serializationError";
public static final String KAFKA_STREAMS_THREAD_ERROR = "kafkaStreamsThreadError";

public static final Schema PROCESSING_LOG_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "ProcessingLogRecord")
Expand All @@ -112,6 +126,7 @@ public Schema getSchema() {
.field(RECORD_PROCESSING_ERROR, RECORD_PROCESSING_ERROR_SCHEMA)
.field(PRODUCTION_ERROR, PRODUCTION_ERROR_SCHEMA)
.field(SERIALIZATION_ERROR, SERIALIZATION_ERROR_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR, KAFKA_STREAMS_THREAD_ERROR_SCHEMA)
.optional()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.materialization.MaterializationInfo.Builder;
import io.confluent.ksql.execution.plan.ExecutionStep;
Expand Down Expand Up @@ -70,6 +72,9 @@

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public final class QueryExecutor {
private static final String KSQL_THREAD_EXCEPTION_UNCAUGHT_LOGGER
= "ksql.logger.thread.exception.uncaught";

// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
private final SessionConfig config;
private final ProcessingLogContext processingLogContext;
Expand Down Expand Up @@ -231,10 +236,19 @@ public PersistentQueryMetadata buildPersistentQuery(
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG),
classifier,
physicalPlan,
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE)
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE),
getUncaughtExceptionProcessingLogger(queryId)
);
}

private ProcessingLogger getUncaughtExceptionProcessingLogger(final QueryId queryId) {
final QueryContext.Stacker stacker = new QueryContext.Stacker()
.push(KSQL_THREAD_EXCEPTION_UNCAUGHT_LOGGER);

return processingLogContext.getLoggerFactory().getLogger(
QueryLoggerUtil.queryLoggerName(queryId, stacker.getQueryContext()));
}

private TransientQueryQueue buildTransientQueryQueue(
final QueryId queryId,
final ExecutionStep<?> physicalPlan,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import java.util.Objects;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;

public final class KafkaStreamsThreadError implements ProcessingLogger.ErrorMessage {
public static KafkaStreamsThreadError of(
final String errorMsg,
final Thread thread,
final Throwable exception
) {
return new KafkaStreamsThreadError(errorMsg, thread, exception);
}

private final String errorMsg;
private final Thread thread;
private final Throwable exception;

private KafkaStreamsThreadError(
final String errorMsg,
final Thread thread,
final Throwable exception
) {
this.errorMsg = requireNonNull(errorMsg, "errorMsg");
this.thread = requireNonNull(thread, "thread");
this.exception = requireNonNull(exception, "exception");
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaStreamsThreadError that = (KafkaStreamsThreadError) o;
return Objects.equals(errorMsg, that.errorMsg)
&& Objects.equals(thread.getName(), thread.getName())
&& Objects.equals(exception.getClass(), that.exception.getClass())
&& Objects.equals(exception.toString(), that.exception.toString());
}

@Override
public int hashCode() {
return Objects.hash(thread, exception);
}

@Override
public SchemaAndValue get(final ProcessingLogConfig config) {
final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)
.put(ProcessingLogMessageSchema.TYPE,
MessageType.KAFKA_STREAMS_THREAD_ERROR.getTypeId())
.put(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR,
streamsThreadError());

return new SchemaAndValue(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA, struct);
}

private Struct streamsThreadError() {
final Struct threadError = new Struct(MessageType.KAFKA_STREAMS_THREAD_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME,
thread.getName())
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE,
errorMsg)
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE,
ErrorMessageUtil.getErrorMessages(exception));

return threadError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class PersistentQueryMetadata extends QueryMetadata {
materializationProviderBuilder;

private Optional<MaterializationProvider> materializationProvider;
private ProcessingLogger processingLogger;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public PersistentQueryMetadata(
Expand All @@ -72,7 +74,8 @@ public PersistentQueryMetadata(
final long closeTimeout,
final QueryErrorClassifier errorClassifier,
final ExecutionStep<?> physicalPlan,
final int maxQueryErrorsQueueSize
final int maxQueryErrorsQueueSize,
final ProcessingLogger processingLogger
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
Expand All @@ -98,9 +101,12 @@ public PersistentQueryMetadata(
this.physicalPlan = requireNonNull(physicalPlan, "physicalPlan");
this.materializationProviderBuilder =
requireNonNull(materializationProviderBuilder, "materializationProviderBuilder");
this.processingLogger = requireNonNull(processingLogger, "processingLogger");

this.materializationProvider = materializationProviderBuilder
.flatMap(builder -> builder.apply(getKafkaStreams()));

setUncaughtExceptionHandler(this::uncaughtHandler);
}

protected PersistentQueryMetadata(
Expand All @@ -114,6 +120,15 @@ protected PersistentQueryMetadata(
this.materializationProvider = other.materializationProvider;
this.physicalPlan = other.physicalPlan;
this.materializationProviderBuilder = other.materializationProviderBuilder;
this.processingLogger = other.processingLogger;
}

@Override
protected void uncaughtHandler(final Thread thread, final Throwable error) {
super.uncaughtHandler(thread, error);

processingLogger.error(KafkaStreamsThreadError.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider logging this at the fatal level, since this is a thread-killing error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I log at fatal level?

The ProcessingLogger has only error, an the inner logger has only error, info, debug.

"Unhandled exception caught in streams thread", thread, error));
}

public DataSourceType getDataSourceType() {
Expand Down Expand Up @@ -149,6 +164,11 @@ Optional<MaterializationProvider> getMaterializationProvider() {
return materializationProvider;
}

@VisibleForTesting
public ProcessingLogger getProcessingLogger() {
return processingLogger;
}

public Optional<Materialization> getMaterialization(
final QueryId queryId,
final QueryContext.Stacker contextStacker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void onStop(final Consumer<Boolean> onStop) {
this.onStop = onStop;
}

private void uncaughtHandler(final Thread t, final Throwable e) {
protected void uncaughtHandler(final Thread t, final Throwable e) {
LOG.error("Unhandled exception caught in streams thread {}.", t.getName(), e);
final QueryError queryError =
new QueryError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
Expand Down Expand Up @@ -170,6 +172,8 @@ public class QueryExecutorTest {
private SessionConfig config;
@Captor
private ArgumentCaptor<Map<String, Object>> propertyCaptor;
@Captor
private ArgumentCaptor<String> processingLoggerNameCaptor;

private QueryExecutor queryBuilder;
private final Stacker stacker = new Stacker();
Expand Down Expand Up @@ -197,9 +201,6 @@ public void setup() {
.thenReturn(PERSISTENT_PREFIX);
when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn(SERVICE_ID);
when(physicalPlan.build(any())).thenReturn(tableHolder);
when(topology.describe()).thenReturn(topoDesc);
when(topoDesc.subtopologies()).thenReturn(ImmutableSet.of());
when(serviceContext.getTopicClient()).thenReturn(topicClient);
when(streamsBuilder.build(any())).thenReturn(topology);
when(config.getConfig(true)).thenReturn(ksqlConfig);
when(config.getOverrides()).thenReturn(OVERRIDES);
Expand Down Expand Up @@ -247,6 +248,13 @@ public void shouldBuildTransientQueryCorrectly() {

@Test
public void shouldBuildPersistentQueryCorrectly() {
// Given:
final ProcessingLogger uncaughtProcessingLogger = mock(ProcessingLogger.class);
when(processingLoggerFactory.getLogger(
QueryLoggerUtil.queryLoggerName(QUERY_ID, new QueryContext.Stacker()
.push("ksql.logger.thread.exception.uncaught").getQueryContext())
)).thenReturn(uncaughtProcessingLogger);

// When:
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery(
STATEMENT_TEXT,
Expand All @@ -269,6 +277,7 @@ public void shouldBuildPersistentQueryCorrectly() {
assertThat(queryMetadata.getTopology(), is(topology));
assertThat(queryMetadata.getOverriddenProperties(), equalTo(OVERRIDES));
assertThat(queryMetadata.getStreamsProperties(), equalTo(capturedStreamsProperties()));
assertThat(queryMetadata.getProcessingLogger(), equalTo(uncaughtProcessingLogger));
}

@Test
Expand Down
Loading