Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add KsqlUncaughtExceptionHandler and new KsqlRestConfig for enabling it #3425

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaStreamsUncaughtExceptionHandler;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -281,6 +282,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
);

final KafkaStreams streams = kafkaStreamsBuilder.buildKafkaStreams(builder, streamsProperties);
streams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler());

final Topology topology = builder.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import io.confluent.ksql.engine.KsqlEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);

public void uncaughtException(final Thread t, final Throwable e) {
log.error("Unhandled exception caught in streams thread {}.", t.getName(), e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.confluent.ksql.rest.server.state.ServerStateDynamicBinding;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
Expand Down Expand Up @@ -107,6 +108,7 @@
import javax.ws.rs.core.Configurable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.log4j.LogManager;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.jsr356.server.ServerContainer;
import org.glassfish.hk2.utilities.Binder;
Expand Down Expand Up @@ -445,8 +447,14 @@ static KsqlRestApplication buildApplication(

final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();

if (restConfig.getBoolean(KsqlRestConfig.KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER)) {
Thread.setDefaultUncaughtExceptionHandler(
new KsqlUncaughtExceptionHandler(LogManager::shutdown));
}

final HybridQueryIdGenerator hybridQueryIdGenerator =
new HybridQueryIdGenerator();

final KsqlEngine ksqlEngine = new KsqlEngine(
serviceContext,
processingLogContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public class KsqlRestConfig extends RestConfig {
+ "will not start serving requests until all preconditions are satisfied. Until that time, "
+ "requests will return a 503 error";

static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER =
KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable";

private static final String KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC =
"Whether or not to set KsqlUncaughtExceptionHandler as the UncaughtExceptionHandler "
+ "for all threads in the application (this can be overridden). Default is false.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -97,6 +104,12 @@ public class KsqlRestConfig extends RestConfig {
"",
Importance.LOW,
KSQL_SERVER_PRECONDITIONS_DOC
).define(
KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER,
ConfigDef.Type.BOOLEAN,
false,
Importance.LOW,
KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public CommandRunner(
* {@link WakeupException} is thrown or the {@link #close()} method is called.
*/
public void start() {
executor.submit(new Runner());
executor.execute(new Runner());
executor.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.rest.server.KsqlServerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KsqlUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private final Runnable flusher;

private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class);

public KsqlUncaughtExceptionHandler(final Runnable flusher) {
this.flusher = flusher;
}

@SuppressFBWarnings
public void uncaughtException(final Thread t, final Throwable e) {
log.error("Unhandled exception caught in thread {}.", t.getName(), e);
System.err.println(
"Unhandled exception caught in thread: " + t.getName() + ". Exception:" + e.getMessage());

flusher.run();

System.exit(-1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple things:

Can we also write the exception to stderr here?

We should call org.apache.log4j.LogManager.shutdown() to ensure that all the buffered logs (including the one above) get flushed. Ideally this would get passed in the constructor as a Runnable:

class KsqlUncaughtExceptionHandler implements UncaughtExceptionHandler {
    private final Runnable flusher;

    public void KsqlUncaughtExceptionHandler(final Runnable flusher) {
        this.flusher = flusher;
    }
}
...
new KsqlUncaughtExceptionHandler(LogManager::flush);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it, but is there a reason why we don't call LogManager.shutdown() currently when the server exits? I don't see any other uses of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely call it when the server exits cleanly as well. It's more critical here because we might lose the exact information we need to help debug the issue that made the thread crash.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void shouldSubmitTaskOnStart() {

// Then:
final InOrder inOrder = inOrder(executor);
inOrder.verify(executor).submit(any(Runnable.class));
inOrder.verify(executor).execute(any(Runnable.class));
inOrder.verify(executor).shutdown();
}

Expand Down