diff --git a/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java b/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java index 53b91ee4a051..49c1d4bf0afa 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java @@ -27,12 +27,14 @@ public final class ProcessingLogMessageSchema { public static final String DESERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage"; public static final String DESERIALIZATION_ERROR_FIELD_RECORD_B64 = "recordB64"; public static final String DESERIALIZATION_ERROR_FIELD_CAUSE = "cause"; + public static final String DESERIALIZATION_ERROR_FIELD_TOPIC = "topic"; private static final Schema DESERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct() .name(NAMESPACE + "DeserializationError") .field(DESERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA) .field(DESERIALIZATION_ERROR_FIELD_RECORD_B64, Schema.OPTIONAL_STRING_SCHEMA) .field(DESERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA) + .field(DESERIALIZATION_ERROR_FIELD_TOPIC, Schema.OPTIONAL_STRING_SCHEMA) .optional() .build(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java index 232e668ca0ba..d02b81befbf8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java @@ -191,7 +191,7 @@ public void shouldBuildCorrectStreamCreateDDL() { + "time BIGINT, " + "message STRUCT<" + "type INT, " - + "deserializationError STRUCT>, " + + "deserializationError STRUCT, `topic` VARCHAR>, " + "recordProcessingError STRUCT>, " + "productionError STRUCT" + ">" diff --git a/ksql-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java b/ksql-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java index 8469f419017f..4832588ebd63 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java @@ -46,7 +46,8 @@ public T deserialize(final String topic, final byte[] bytes) { return delegate.deserialize(topic, bytes); } catch (final RuntimeException e) { processingLogger.error( - SerdeProcessingLogMessageFactory.deserializationErrorMsg(e, Optional.ofNullable(bytes)) + SerdeProcessingLogMessageFactory.deserializationErrorMsg( + e, Optional.ofNullable(bytes), topic) ); throw e; } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactory.java index ecbe72fe77a4..87e4fd6268ce 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactory.java @@ -33,7 +33,8 @@ private SerdeProcessingLogMessageFactory() { public static Function deserializationErrorMsg( final Throwable exception, - final Optional record + final Optional record, + final String topic ) { Objects.requireNonNull(exception); return (config) -> { @@ -48,6 +49,10 @@ public static Function deserializationError ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE, cause ); + deserializationError.put( + ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_TOPIC, + topic + ); if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) { deserializationError.put( ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_RECORD_B64, diff --git a/ksql-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java index 181a26bc083a..394b486c313e 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java @@ -129,7 +129,7 @@ public void shouldLogOnException() { final SchemaAndValue result = errorCaptor.getValue().apply(LOG_CONFIG); assertThat(result, is(SerdeProcessingLogMessageFactory - .deserializationErrorMsg(e, Optional.of(SOME_BYTES)).apply(LOG_CONFIG))); + .deserializationErrorMsg(e, Optional.of(SOME_BYTES), "t").apply(LOG_CONFIG))); throw e; } diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactoryTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactoryTest.java index 2ab1aca6cb4a..355ffc6a4367 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactoryTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/util/SerdeProcessingLogMessageFactoryTest.java @@ -15,12 +15,7 @@ package io.confluent.ksql.serde.util; -import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR; -import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE; -import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_MESSAGE; -import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_RECORD_B64; -import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA; -import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.TYPE; +import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.*; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -54,7 +49,8 @@ public void shouldSetNullRecordToNull() { // When: final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg( error, - Optional.empty() + Optional.empty(), + "topic" ).apply(config); // Then: @@ -68,7 +64,8 @@ public void shouldBuildDeserializationError() { // When: final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg( error, - Optional.of(record) + Optional.of(record), + "topic" ).apply(config); // Then: @@ -91,6 +88,10 @@ public void shouldBuildDeserializationError() { deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD_B64), equalTo(Base64.getEncoder().encodeToString(record)) ); + assertThat( + deserializationError.get(DESERIALIZATION_ERROR_FIELD_TOPIC), + equalTo("topic") + ); schema.fields().forEach( f -> { if (!ImmutableList.of(TYPE, DESERIALIZATION_ERROR).contains(f.name())) { @@ -110,7 +111,8 @@ public void shouldBuildDeserializationErrorWithNullRecordIfIncludeRowFalse() { // When: final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg( error, - Optional.of(record) + Optional.of(record), + "topic" ).apply(config); // Then: