Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the topic name to deserialization errors #4573

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public final class ProcessingLogMessageSchema {
public static final String DESERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String DESERIALIZATION_ERROR_FIELD_RECORD_B64 = "recordB64";
public static final String DESERIALIZATION_ERROR_FIELD_CAUSE = "cause";
public static final String DESERIALIZATION_ERROR_FIELD_TOPIC = "topic";

private static final Schema DESERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "DeserializationError")
.field(DESERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_RECORD_B64, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_TOPIC, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "time BIGINT, "
+ "message STRUCT<"
+ "type INT, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>"
+ ">"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public T deserialize(final String topic, final byte[] bytes) {
return delegate.deserialize(topic, bytes);
} catch (final RuntimeException e) {
processingLogger.error(
SerdeProcessingLogMessageFactory.deserializationErrorMsg(e, Optional.ofNullable(bytes))
SerdeProcessingLogMessageFactory.deserializationErrorMsg(
e, Optional.ofNullable(bytes), topic)
);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private SerdeProcessingLogMessageFactory() {

public static Function<ProcessingLogConfig, SchemaAndValue> deserializationErrorMsg(
final Throwable exception,
final Optional<byte[]> record
final Optional<byte[]> record,
final String topic
) {
Objects.requireNonNull(exception);
return (config) -> {
Expand All @@ -48,6 +49,10 @@ public static Function<ProcessingLogConfig, SchemaAndValue> deserializationError
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE,
cause
);
deserializationError.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_TOPIC,
topic
);
if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
deserializationError.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_RECORD_B64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void shouldLogOnException() {
final SchemaAndValue result = errorCaptor.getValue().apply(LOG_CONFIG);

assertThat(result, is(SerdeProcessingLogMessageFactory
.deserializationErrorMsg(e, Optional.of(SOME_BYTES)).apply(LOG_CONFIG)));
.deserializationErrorMsg(e, Optional.of(SOME_BYTES), "t").apply(LOG_CONFIG)));

throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@

package io.confluent.ksql.serde.util;

import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_MESSAGE;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_RECORD_B64;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.TYPE;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -54,7 +49,8 @@ public void shouldSetNullRecordToNull() {
// When:
final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.empty()
Optional.empty(),
"topic"
).apply(config);

// Then:
Expand All @@ -68,7 +64,8 @@ public void shouldBuildDeserializationError() {
// When:
final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.of(record)
Optional.of(record),
"topic"
).apply(config);

// Then:
Expand All @@ -91,6 +88,10 @@ public void shouldBuildDeserializationError() {
deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD_B64),
equalTo(Base64.getEncoder().encodeToString(record))
);
assertThat(
deserializationError.get(DESERIALIZATION_ERROR_FIELD_TOPIC),
equalTo("topic")
);
schema.fields().forEach(
f -> {
if (!ImmutableList.of(TYPE, DESERIALIZATION_ERROR).contains(f.name())) {
Expand All @@ -110,7 +111,8 @@ public void shouldBuildDeserializationErrorWithNullRecordIfIncludeRowFalse() {
// When:
final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.of(record)
Optional.of(record),
"topic"
).apply(config);

// Then:
Expand Down