-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add KSQL processing log message on uncaught streams exceptions #6253
Conversation
9219c6d
to
430c7ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Why are we only doing this for persistent queries?
@@ -76,11 +76,23 @@ | |||
.optional() | |||
.build(); | |||
|
|||
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE = "errorMessage"; | |||
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use the name "threadName"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -229,7 +229,9 @@ public PersistentQueryMetadata buildPersistentQuery( | |||
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG), | |||
classifier, | |||
physicalPlan, | |||
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE) | |||
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE), | |||
(ProcessingLogger) streamsProperties.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use a different logger here. The logger is important because its name tells you a lot about the context of the error. In this case, the logger's name is ksql.logger.production.error
, which tells you this is an error hit while trying to produce a record to Kafka. Other loggers' names include the node in the query topology. Eventually we'll let users adjust the log levels for some loggers at runtime, so you could see very detailed logging from just 1 UDF for example. In this case I'd name this something like ksql.logger.thread.exception.uncaught
.
To create the logger, you can use the factory in the processingLogContext
member of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
protected void uncaughtHandler(final Thread thread, final Throwable error) { | ||
super.uncaughtHandler(thread, error); | ||
|
||
processingLogger.error(KafkaStreamsThreadError.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider logging this at the fatal level, since this is a thread-killing error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do I log at fatal level?
The ProcessingLogger
has only error
, an the inner
logger has only error, info, debug
.
@rodesai I didn't do transient queries because seems they catch the uncaught errors differently (not by QueryMetadata::uncaughtHandler), and they send the error back to the client. I assumed this is what we want, that users who are running push/pull queries see the error in their side 'cause they can do it. Contrary to persistent queries that they cannot see it unless the look in the processing log. Does it make sense? or should we also configure transient queries to use the internal QueryMetadata::uncaughtHandler() and processing log handler too? |
d03ddd5
to
79b471c
Compare
This makes sense. Thanks for the explainer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@rodesai I added the queryId to the logger name. This is how it looks now:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending Rohan's comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
Fixes #6130
Calls the
ProcessingLogger
on uncaught errors by Kafka streams. The processing log uncaught handler is registered by thePersistentQueryMetadata
only. Transient queries do not need to log errors to the processing log (they're at sent back to the user instead).The error sent to the logger uses a new
KafkaStreamsThreadErrorMessage
type that creates an structured error message. It contains the error message, the thread and the exception.Testing done
Added unit tests
Reviewer checklist