Skip to content

Commit

Permalink
feat: add KsqlUncaughtExceptionHandler and new KsqlRestConfig for ena…
Browse files Browse the repository at this point in the history
…bling it (#3425)

* feat: add KsqlUncaughtExceptionHandler and new KsqlRestConfig for enabling it

* change config name

* set UncaughtExceptionHandler for persistent query streams threads
  • Loading branch information
stevenpyzhang authored Sep 30, 2019
1 parent d77db50 commit d83c787
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaStreamsUncaughtExceptionHandler;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -281,6 +282,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
);

final KafkaStreams streams = kafkaStreamsBuilder.buildKafkaStreams(builder, streamsProperties);
streams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler());

final Topology topology = builder.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import io.confluent.ksql.engine.KsqlEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);

public void uncaughtException(final Thread t, final Throwable e) {
log.error("Unhandled exception caught in streams thread {}.", t.getName(), e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.confluent.ksql.rest.server.state.ServerStateDynamicBinding;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
Expand Down Expand Up @@ -107,6 +108,7 @@
import javax.ws.rs.core.Configurable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.log4j.LogManager;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.jsr356.server.ServerContainer;
import org.glassfish.hk2.utilities.Binder;
Expand Down Expand Up @@ -445,8 +447,14 @@ static KsqlRestApplication buildApplication(

final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();

if (restConfig.getBoolean(KsqlRestConfig.KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER)) {
Thread.setDefaultUncaughtExceptionHandler(
new KsqlUncaughtExceptionHandler(LogManager::shutdown));
}

final HybridQueryIdGenerator hybridQueryIdGenerator =
new HybridQueryIdGenerator();

final KsqlEngine ksqlEngine = new KsqlEngine(
serviceContext,
processingLogContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public class KsqlRestConfig extends RestConfig {
+ "will not start serving requests until all preconditions are satisfied. Until that time, "
+ "requests will return a 503 error";

static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER =
KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable";

private static final String KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC =
"Whether or not to set KsqlUncaughtExceptionHandler as the UncaughtExceptionHandler "
+ "for all threads in the application (this can be overridden). Default is false.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -97,6 +104,12 @@ public class KsqlRestConfig extends RestConfig {
"",
Importance.LOW,
KSQL_SERVER_PRECONDITIONS_DOC
).define(
KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER,
ConfigDef.Type.BOOLEAN,
false,
Importance.LOW,
KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public CommandRunner(
* {@link WakeupException} is thrown or the {@link #close()} method is called.
*/
public void start() {
executor.submit(new Runner());
executor.execute(new Runner());
executor.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.rest.server.KsqlServerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KsqlUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private final Runnable flusher;

private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class);

public KsqlUncaughtExceptionHandler(final Runnable flusher) {
this.flusher = flusher;
}

@SuppressFBWarnings
public void uncaughtException(final Thread t, final Throwable e) {
log.error("Unhandled exception caught in thread {}.", t.getName(), e);
System.err.println(
"Unhandled exception caught in thread: " + t.getName() + ". Exception:" + e.getMessage());

flusher.run();

System.exit(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void shouldSubmitTaskOnStart() {

// Then:
final InOrder inOrder = inOrder(executor);
inOrder.verify(executor).submit(any(Runnable.class));
inOrder.verify(executor).execute(any(Runnable.class));
inOrder.verify(executor).shutdown();
}

Expand Down

0 comments on commit d83c787

Please sign in to comment.