Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add KSQL processing log message on uncaught streams exceptions #6253

Merged
merged 3 commits into from
Sep 23, 2020

Conversation

spena
Copy link
Member

@spena spena commented Sep 18, 2020

Description

Fixes #6130

Calls the ProcessingLogger on uncaught errors by Kafka streams. The processing log uncaught handler is registered by the PersistentQueryMetadata only. Transient queries do not need to log errors to the processing log (they're at sent back to the user instead).

The error sent to the logger uses a new KafkaStreamsThreadErrorMessage type that creates an structured error message. It contains the error message, the thread and the exception.

Testing done

Added unit tests

ksql> select * from KSQL_PROCESSING_LOG emit changes;
+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|LOGGER                                       |LEVEL                                        |TIME                                         |MESSAGE                                      |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|processing.CSAS_S1_0.ksql.logger.thread.excep|ERROR                                        |1600872731171                                |{TYPE=4, DESERIALIZATIONERROR=null, RECORDPRO|
|tion.uncaught                                |                                             |                                             |CESSINGERROR=null, PRODUCTIONERROR=null, SERI|
|                                             |                                             |                                             |ALIZATIONERROR=null, KAFKASTREAMSTHREADERROR=|
|                                             |                                             |                                             |{ERRORMESSAGE=Unhandled exception caught in s|
|                                             |                                             |                                             |treams thread, THREADNAME=_confluent-ksql-def|
|                                             |                                             |                                             |ault_query_CSAS_S1_0-22278e08-e1ab-475f-84fe-|
|                                             |                                             |                                             |c4c97815c5d4-StreamThread-4, CAUSE=[One or mo|
|                                             |                                             |                                             |re source topics were missing during rebalanc|
|                                             |                                             |                                             |e]}}                                         |

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena added enhancement P0 Denotes must-have for a given milestone labels Sep 18, 2020
@spena spena added this to the 0.13.0 milestone Sep 18, 2020
@spena spena requested review from rodesai and a team September 18, 2020 03:41
@spena spena force-pushed the add_processing_log_msg branch 2 times, most recently from 9219c6d to 430c7ac Compare September 21, 2020 14:36
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. Why are we only doing this for persistent queries?

@@ -76,11 +76,23 @@
.optional()
.build();

public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "name";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the name "threadName"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -229,7 +229,9 @@ public PersistentQueryMetadata buildPersistentQuery(
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG),
classifier,
physicalPlan,
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE)
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE),
(ProcessingLogger) streamsProperties.get(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a different logger here. The logger is important because its name tells you a lot about the context of the error. In this case, the logger's name is ksql.logger.production.error, which tells you this is an error hit while trying to produce a record to Kafka. Other loggers' names include the node in the query topology. Eventually we'll let users adjust the log levels for some loggers at runtime, so you could see very detailed logging from just 1 UDF for example. In this case I'd name this something like ksql.logger.thread.exception.uncaught.

To create the logger, you can use the factory in the processingLogContext member of this class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

protected void uncaughtHandler(final Thread thread, final Throwable error) {
super.uncaughtHandler(thread, error);

processingLogger.error(KafkaStreamsThreadError.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider logging this at the fatal level, since this is a thread-killing error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I log at fatal level?

The ProcessingLogger has only error, an the inner logger has only error, info, debug.

@spena
Copy link
Member Author

spena commented Sep 22, 2020

@rodesai I didn't do transient queries because seems they catch the uncaught errors differently (not by QueryMetadata::uncaughtHandler), and they send the error back to the client.

I assumed this is what we want, that users who are running push/pull queries see the error in their side 'cause they can do it. Contrary to persistent queries that they cannot see it unless the look in the processing log. Does it make sense? or should we also configure transient queries to use the internal QueryMetadata::uncaughtHandler() and processing log handler too?

@spena spena requested a review from JimGalasyn as a code owner September 22, 2020 16:50
@spena spena force-pushed the add_processing_log_msg branch from d03ddd5 to 79b471c Compare September 22, 2020 20:35
@rodesai
Copy link
Contributor

rodesai commented Sep 22, 2020

@rodesai I didn't do transient queries because seems they catch the uncaught errors differently (not by QueryMetadata::uncaughtHandler), and they send the error back to the client.

This makes sense. Thanks for the explainer.

Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@spena
Copy link
Member Author

spena commented Sep 23, 2020

@rodesai I added the queryId to the logger name. This is how it looks now:

ksql> select * from KSQL_PROCESSING_LOG emit changes;
+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|LOGGER                                       |LEVEL                                        |TIME                                         |MESSAGE                                      |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|processing.CSAS_S1_0.ksql.logger.thread.excep|ERROR                                        |1600872731171                                |{TYPE=4, DESERIALIZATIONERROR=null, RECORDPRO|
|tion.uncaught                                |                                             |                                             |CESSINGERROR=null, PRODUCTIONERROR=null, SERI|
|                                             |                                             |                                             |ALIZATIONERROR=null, KAFKASTREAMSTHREADERROR=|
|                                             |                                             |                                             |{ERRORMESSAGE=Unhandled exception caught in s|
|                                             |                                             |                                             |treams thread, THREADNAME=_confluent-ksql-def|
|                                             |                                             |                                             |ault_query_CSAS_S1_0-22278e08-e1ab-475f-84fe-|
|                                             |                                             |                                             |c4c97815c5d4-StreamThread-4, CAUSE=[One or mo|
|                                             |                                             |                                             |re source topics were missing during rebalanc|
|                                             |                                             |                                             |e]}}                                         |

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending Rohan's comments

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@spena spena merged commit ac8875f into confluentinc:master Sep 23, 2020
@spena spena deleted the add_processing_log_msg branch September 23, 2020 21:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement P0 Denotes must-have for a given milestone
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add KSQL processing log message on uncaught exceptions from streams
4 participants