Skip to content

Commit

Permalink
chore: support PROTOBUF for PRINT TOPIC
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Feb 19, 2020
1 parent 41277c2 commit b0ee347
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class InsertValuesExecutor {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Logger LOG = LoggerFactory.getLogger(InsertValuesExecutor.class);
private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);

private final LongSupplier clock;
Expand Down Expand Up @@ -453,6 +456,7 @@ private byte[] serializeValue(
}
}

LOG.error("Could not serialize row.", e);
throw new KsqlException("Could not serialize row: " + row, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.kafka.KafkaSerdeFactory;
Expand Down Expand Up @@ -59,6 +62,7 @@ public static class RecordFormatter {
private static final Logger log = LoggerFactory.getLogger(RecordFormatter.class);

private final KafkaAvroDeserializer avroDeserializer;
private final KafkaProtobufDeserializer<Message> protobufDeserializer;
private final String topicName;
private final DateFormat dateFormat;

Expand All @@ -85,6 +89,7 @@ public RecordFormatter(
this.topicName = requireNonNull(topicName, "topicName");
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
this.dateFormat = requireNonNull(dateFormat, "dateFormat");
this.protobufDeserializer = new KafkaProtobufDeserializer<>(schemaRegistryClient);
}

public List<Supplier<String>> format(final Iterable<ConsumerRecord<Bytes, Bytes>> records) {
Expand Down Expand Up @@ -189,13 +194,13 @@ private Optional<Formatter> findFormatter(final Stream<Bytes> dataStream) {

default:
// Mixed format topic:
return Format.MIXED.maybeGetFormatter(topicName, null, avroDeserializer);
return Format.MIXED.maybeGetFormatter(topicName, null, this);
}
}

private Formatter findFormatter(final Bytes data) {
return Arrays.stream(Format.values())
.map(f -> f.maybeGetFormatter(topicName, data, avroDeserializer))
.map(f -> f.maybeGetFormatter(topicName, data, this))
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst()
Expand All @@ -216,7 +221,7 @@ enum Format {
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final Bytes data,
final KafkaAvroDeserializer avroDeserializer
final RecordFormatter formatter
) {
return Optional.empty();
}
Expand All @@ -226,11 +231,11 @@ public Optional<Formatter> maybeGetFormatter(
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final Bytes data,
final KafkaAvroDeserializer avroDeserializer
final RecordFormatter formatter
) {
try {
avroDeserializer.deserialize(topicName, data.get());
return Optional.of(createFormatter(topicName, avroDeserializer));
formatter.avroDeserializer.deserialize(topicName, data.get());
return Optional.of(createFormatter(topicName, formatter.avroDeserializer));
} catch (final Exception t) {
return Optional.empty();
}
Expand All @@ -254,12 +259,46 @@ public String getFormat() {
};
}
},
PROTOBUF {
@Override
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final Bytes data,
final RecordFormatter formatter
) {
try {
formatter.protobufDeserializer.deserialize(topicName, data.get());
return Optional.of(createFormatter(topicName, formatter.protobufDeserializer));
} catch (final Exception t) {
return Optional.empty();
}
}

private Formatter createFormatter(
final String topicName,
final KafkaProtobufDeserializer<Message> protobufDeserializer
) {
return new Formatter() {
@Override
public String print(final Bytes data) {
return TextFormat.printer().shortDebugString(
protobufDeserializer.deserialize(topicName, data.get())
);
}

@Override
public String getFormat() {
return PROTOBUF.toString();
}
};
}
},
JSON {
@Override
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final Bytes data,
final KafkaAvroDeserializer avroDeserializer
final RecordFormatter formatter
) {
try {
final JsonNode jsonNode = JsonMapper.INSTANCE.mapper.readTree(data.toString());
Expand Down Expand Up @@ -299,7 +338,7 @@ public String getFormat() {
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final Bytes data,
final KafkaAvroDeserializer avroDeserializer
final RecordFormatter formatter
) {
return KafkaSerdeFactory.SQL_SERDE.entrySet().stream()
.map(e -> trySerde(e.getKey(), e.getValue(), topicName, data))
Expand Down Expand Up @@ -353,7 +392,7 @@ public String getFormat() {
public Optional<Formatter> maybeGetFormatter(
final String topicName,
final Bytes data,
final KafkaAvroDeserializer avroDeserializer
final RecordFormatter formatter
) {
// Mixed mode defaults to string values:
return Optional.of(createStringFormatter(MIXED.toString()));
Expand All @@ -363,7 +402,7 @@ public Optional<Formatter> maybeGetFormatter(
abstract Optional<Formatter> maybeGetFormatter(
String topicName,
Bytes data,
KafkaAvroDeserializer avroDeserializer
RecordFormatter formatter
);

private static Formatter createStringFormatter(final String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import com.google.protobuf.Message;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.ksql.rest.server.resources.streaming.TopicStream.Format;
import io.confluent.ksql.rest.server.resources.streaming.TopicStream.RecordFormatter;
import java.text.DateFormat;
Expand All @@ -58,6 +62,8 @@
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -82,6 +88,9 @@ public class TopicStreamTest {
" ]" +
"}");
private static final byte[] SERIALIZED_AVRO_RECORD = serializedAvroRecord();
private static final ProtobufSchema PROTOBUF_SCHEMA = new ProtobufSchema(
"syntax = \"proto3\"; message MyRecord {string str1 = 1;}");
private static final byte[] SERIALIZED_PROTOBUF_RECORD = serializedProtoRecord();
private static final int KAFKA_INT = 24;
private static final byte[] SERIZALIZED_KAFKA_INT = serialize(KAFKA_INT, new IntegerSerializer());
private static final long KAFKA_BIGINT = 199L;
Expand Down Expand Up @@ -345,6 +354,30 @@ public void shouldFormatAvroValue() {
assertThat(formatted, endsWith(", value: {\"str1\": \"My first string\"}"));
}

@Test
public void shouldFormatProtobufKey() {
// Given:
givenProtobufSchemaRegistered();

// When:
final String formatted = formatSingle(SERIALIZED_PROTOBUF_RECORD, null);

// Then:
assertThat(formatted, containsString(", key: str1: \"My first string\""));
}

@Test
public void shouldFormatProtobufValue() {
// Given:
givenProtobufSchemaRegistered();

// When:
final String formatted = formatSingle(NULL, SERIALIZED_PROTOBUF_RECORD);

// Then:
assertThat(formatted, endsWith(", value: str1: \"My first string\""));
}

@Test
public void shouldFormatJsonObjectKey() {
// When:
Expand Down Expand Up @@ -549,6 +582,15 @@ private void givenAvroSchemaRegistered() {
}
}

private void givenProtobufSchemaRegistered() {
try {
when(schemaRegistryClient.getSchemaById(anyInt()))
.thenReturn(PROTOBUF_SCHEMA);
} catch (final Exception e) {
fail("invalid test");
}
}

/*
No way to tell between a double and a long once serialized.
KSQL defaults to long. So doubles are output as longs:
Expand All @@ -572,6 +614,24 @@ private static Schema parseAvroSchema(final String avroSchema) {
return parser.parse(avroSchema);
}

private static byte[] serializedProtoRecord() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
final org.apache.kafka.connect.data.Schema schema = SchemaBuilder.struct()
.field("str1", SchemaBuilder.OPTIONAL_STRING_SCHEMA)
.build();

return new KafkaProtobufSerializer<>(schemaRegistryClient, props).serialize(
"topic",
(Message) new ProtobufData().fromConnectData(
schema,
new Struct(schema).put("str1", "My first string")
).getValue()
);
}

private static byte[] serializedAvroRecord() {
final GenericData.Record avroRecord = new GenericData.Record(AVRO_SCHEMA);
avroRecord.put("str1", "My first string");
Expand Down

0 comments on commit b0ee347

Please sign in to comment.