Skip to content

Commit

Permalink
fix: address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Sep 22, 2020
1 parent 430c7ac commit d03ddd5
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/developer-guide/ksqldb-reference/show-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Host Query Status : {192.168.1.6:8088=RUNNING, 192.168.1.6:8089=RUNNING}
LOGGER | VARCHAR(STRING)
LEVEL | VARCHAR(STRING)
TIME | BIGINT
MESSAGE | STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORDB64 VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORD VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING)>>
MESSAGE | STRUCT<TYPE INTEGER, DESERIALIZATIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORDB64 VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, RECORDPROCESSINGERROR STRUCT<ERRORMESSAGE VARCHAR(STRING), RECORD VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>, PRODUCTIONERROR STRUCT<ERRORMESSAGE VARCHAR(STRING)>, KAFKASTREAMSTHREADERROR STRUCT<THREADNAME VARCHAR(STRING), ERRORMESSAGE VARCHAR(STRING), CAUSE ARRAY<VARCHAR(STRING)>>>
-------------------------------------------------------------------------------------

Sources that this query reads from:
Expand Down
20 changes: 19 additions & 1 deletion docs/developer-guide/test-and-debug/processing-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ message.serializationError.topic (STRING)
: The {{ site.ak }} topic to which the ksqlDB row that failed to serialize
would have been produced.

message.kafkaStreamsThreadError.message (STRING)

: A string containing a human-readable error message detailing the
error encountered.

message.kafkaStreamsThreadError.name (STRING)

: A string containing the thread name.

message.kafkaStreamsThreadError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

Log Stream
----------

Expand Down Expand Up @@ -307,7 +321,11 @@ ksql> CREATE STREAM PROCESSING_LOG_STREAM (
errorMessage STRING,
record STRING,
cause ARRAY<STRING>,
`topic` STRING>>)
`topic` STRING>>,
kafkaStreamsError STRUCT<
threadName STRING,
errorMessage STRING,
cause ARRAY<STRING>)
WITH (KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ public final class ProcessingLogMessageSchema {
.build();

public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "name";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "threadName";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE = "cause";

private static final Schema KAFKA_STREAMS_THREAD_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "KafkaStreamsThreadError")
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.optional()
.build();

public enum MessageType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public final class QueryExecutor {
private static final String KSQL_THREAD_EXCEPTION_UNCAUGHT_LOGGER
= "ksql.logger.thread.exception.uncaught";

// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
private final KsqlConfig ksqlConfig;
private final Map<String, Object> overrides;
Expand Down Expand Up @@ -230,8 +233,7 @@ public PersistentQueryMetadata buildPersistentQuery(
classifier,
physicalPlan,
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE),
(ProcessingLogger) streamsProperties.get(
ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER)
processingLogContext.getLoggerFactory().getLogger(KSQL_THREAD_EXCEPTION_UNCAUGHT_LOGGER)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public SchemaAndValue get(final ProcessingLogConfig config) {

private Struct streamsThreadError() {
final Struct threadError = new Struct(MessageType.KAFKA_STREAMS_THREAD_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE,
errorMsg)
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME,
thread.getName())
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE,
errorMsg)
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE,
ErrorMessageUtil.getErrorMessages(exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ Optional<MaterializationProvider> getMaterializationProvider() {
return materializationProvider;
}

@VisibleForTesting
public ProcessingLogger getProcessingLogger() {
return processingLogger;
}

public Optional<Materialization> getMaterialization(
final QueryId queryId,
final QueryContext.Stacker contextStacker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public class QueryExecutorTest {
private KStreamHolder<Struct> streamHolder;
@Captor
private ArgumentCaptor<Map<String, Object>> propertyCaptor;
@Captor
private ArgumentCaptor<String> processingLoggerNameCaptor;

private QueryExecutor queryBuilder;
private final Stacker stacker = new Stacker();
Expand Down Expand Up @@ -194,9 +196,6 @@ public void setup() {
.thenReturn(PERSISTENT_PREFIX);
when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn(SERVICE_ID);
when(physicalPlan.build(any())).thenReturn(tableHolder);
when(topology.describe()).thenReturn(topoDesc);
when(topoDesc.subtopologies()).thenReturn(ImmutableSet.of());
when(serviceContext.getTopicClient()).thenReturn(topicClient);
when(streamsBuilder.build(any())).thenReturn(topology);
queryBuilder = new QueryExecutor(
ksqlConfig,
Expand Down Expand Up @@ -244,6 +243,11 @@ public void shouldBuildTransientQueryCorrectly() {

@Test
public void shouldBuildPersistentQueryCorrectly() {
// Given:
final ProcessingLogger uncaughtProcessingLogger = mock(ProcessingLogger.class);
when(processingLoggerFactory.getLogger("ksql.logger.thread.exception.uncaught"))
.thenReturn(uncaughtProcessingLogger);

// When:
final PersistentQueryMetadata queryMetadata = queryBuilder.buildPersistentQuery(
STATEMENT_TEXT,
Expand All @@ -266,6 +270,7 @@ public void shouldBuildPersistentQueryCorrectly() {
assertThat(queryMetadata.getTopology(), is(topology));
assertThat(queryMetadata.getOverriddenProperties(), equalTo(OVERRIDES));
assertThat(queryMetadata.getStreamsProperties(), equalTo(capturedStreamsProperties()));
assertThat(queryMetadata.getProcessingLogger(), equalTo(uncaughtProcessingLogger));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>, "
+ "serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, name VARCHAR, cause ARRAY<VARCHAR>>"
+ "kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, threadName VARCHAR, cause ARRAY<VARCHAR>>"
+ ">"
+ ") WITH(KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');"));
}
Expand Down

0 comments on commit d03ddd5

Please sign in to comment.