diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 729bdedc6cb19..66e423ddd8918 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -1116,6 +1116,10 @@ First, you need to include the `quarkus-jackson` extension. There is an existing `ObjectMapperSerializer` that can be used to serialize all data objects via Jackson. You may create an empty subclass if you want to use <>. +NOTE: By default, the `ObjectMapperSerializer` serializes null as the `"null"` String, this can be customized by setting the Kafka configuration +property `json.serialize.null-as-null=true` which will serialize null as `null`. +This is handy when using a compacted topic, as `null` is used as a tombstone to know which messages delete during compaction phase. + The corresponding deserializer class needs to be subclassed. So, let's create a `FruitDeserializer` that extends the `ObjectMapperDeserializer`. @@ -1150,6 +1154,25 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali Now, your Kafka messages will contain a Jackson serialized representation of your `Fruit` data object. In this case, the `deserializer` configuration is not necessary as the <> is enabled by default. +If you want to deserialize a list of fruits, you need to create a deserializer with a Jackson `TypeReference` denoted the generic collection used. + +[source,java] +---- +package com.acme.fruit.jackson; + +import java.util.List; +import com.fasterxml.jackson.core.type.TypeReference; +import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; + +public class ListOfFruitDeserializer extends ObjectMapperDeserializer> { + public ListOfFruitDeserializer() { + TypeReference> listType = new TypeReference<>() { + }; + super(listType); + } +} +---- + [[jsonb-serialization]] === Serializing via JSON-B @@ -1166,6 +1189,10 @@ First, you need to include the `quarkus-jsonb` extension. There is an existing `JsonbSerializer` that can be used to serialize all data objects via JSON-B. You may create an empty subclass if you want to use <>. +NOTE: By default, the `JsonbSerializer` serializes null as the `"null"` String, this can be customized by setting the Kafka configuration +property `json.serialize.null-as-null=true` which will serialize null as `null`. +This is handy when using a compacted topic, as `null` is used as a tombstone to know which messages delete during compaction phase. + The corresponding deserializer class needs to be subclassed. So, let's create a `FruitDeserializer` that extends the generic `JsonbDeserializer`. @@ -1199,6 +1226,24 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali Now, your Kafka messages will contain a JSON-B serialized representation of your `Fruit` data object. +If you want to deserialize a list of fruits, you need to create a deserializer with a `Type` denoted the generic collection used. + +[source,java] +---- +package com.acme.fruit.jsonb; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import io.quarkus.kafka.client.serialization.JsonbDeserializer; + +public class ListOfFruitDeserializer extends JsonbDeserializer> { + public ListOfFruitDeserializer() { + Type listType = new ArrayList() {}.getClass().getGenericSuperclass(); + super(listType); + } +} +---- + NOTE: If you don't want to create a deserializer for each data object, you can use the generic `io.vertx.kafka.client.serialization.JsonObjectDeserializer` that will deserialize to a `io.vertx.core.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`. diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java index 1518f851afb15..17ba60dcdcd4a 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java @@ -12,10 +12,13 @@ * A {@link Serializer} that serializes to JSON using JSON-B. */ public class JsonbSerializer implements Serializer { + public static final String NULL_AS_NULL_CONFIG = "json.serialize.null-as-null"; private final Jsonb jsonb; private final boolean jsonbNeedsClosing; + private boolean nullAsNull = false; + public JsonbSerializer() { this(JsonbProducer.get(), true); } @@ -31,10 +34,17 @@ private JsonbSerializer(Jsonb jsonb, boolean jsonbNeedsClosing) { @Override public void configure(Map configs, boolean isKey) { + if (configs.containsKey(NULL_AS_NULL_CONFIG) && Boolean.parseBoolean((String) configs.get(NULL_AS_NULL_CONFIG))) { + nullAsNull = true; + } } @Override public byte[] serialize(String topic, T data) { + if (nullAsNull && data == null) { + return null; + } + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { jsonb.toJson(data, output); return output.toByteArray(); diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializer.java index 12f63947e10a5..ebeb1a95649da 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializer.java @@ -13,9 +13,12 @@ * A {@link Deserializer} that deserializes JSON using Jackson's ObjectMapper. */ public class ObjectMapperSerializer implements Serializer { + public static final String NULL_AS_NULL_CONFIG = "json.serialize.null-as-null"; private final ObjectMapper objectMapper; + private boolean nullAsNull = false; + public ObjectMapperSerializer() { this(ObjectMapperProducer.get()); } @@ -26,10 +29,17 @@ public ObjectMapperSerializer(ObjectMapper objectMapper) { @Override public void configure(Map configs, boolean isKey) { + if (configs.containsKey(NULL_AS_NULL_CONFIG) && Boolean.parseBoolean((String) configs.get(NULL_AS_NULL_CONFIG))) { + nullAsNull = true; + } } @Override public byte[] serialize(String topic, T data) { + if (nullAsNull && data == null) { + return null; + } + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { objectMapper.writeValue(output, data); return output.toByteArray(); diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonbDeserializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonbDeserializerTest.java new file mode 100644 index 0000000000000..0ad197ba438fd --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonbDeserializerTest.java @@ -0,0 +1,47 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +class JsonbDeserializerTest { + @Test + void shouldDeserializeEntity() { + MyEntity expected = new MyEntity(1, "entity1"); + JsonbDeserializer deserializer = new JsonbDeserializer<>(MyEntity.class); + MyEntity actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes()); + assertNotNull(actual); + assertEquals(expected, actual); + } + + @Test + void shouldDeserializeListOfEntities() { + Type listType = new ArrayList() { + }.getClass().getGenericSuperclass(); + JsonbDeserializer> deserializer = new JsonbDeserializer<>(listType); + List actuals = deserializer.deserialize("topic", + "[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]".getBytes()); + assertNotNull(actuals); + assertEquals(2, actuals.size()); + } + + @Test + void shouldDeserializeNullAsNullString() { + JsonbDeserializer deserializer = new JsonbDeserializer<>(MyEntity.class); + MyEntity results = deserializer.deserialize("topic", "null".getBytes()); + assertNull(results); + } + + @Test + void shouldDeserializeNullAsNull() { + JsonbDeserializer deserializer = new JsonbDeserializer<>(MyEntity.class); + MyEntity results = deserializer.deserialize("topic", null); + assertNull(results); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonbSerializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonbSerializerTest.java new file mode 100644 index 0000000000000..2d8ac060123c2 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonbSerializerTest.java @@ -0,0 +1,49 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +class JsonbSerializerTest { + + @Test + void shouldSerializeEntity() { + JsonbSerializer serializer = new JsonbSerializer<>(); + MyEntity entity = new MyEntity(1, "entity1"); + byte[] result = serializer.serialize("topic", entity); + assertNotNull(result); + assertEquals("{\"id\":1,\"name\":\"entity1\"}", new String(result)); + } + + @Test + void shouldSerializeListOfEntities() { + JsonbSerializer> serializer = new JsonbSerializer<>(); + MyEntity entity1 = new MyEntity(1, "entity1"); + MyEntity entity2 = new MyEntity(2, "entity2"); + byte[] result = serializer.serialize("topic", List.of(entity1, entity2)); + assertNotNull(result); + assertEquals("[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]", new String(result)); + } + + @Test + void shouldSerializeNullAsNullString() { + JsonbSerializer serializer = new JsonbSerializer<>(); + byte[] results = serializer.serialize("topic", null); + assertNotNull(results); + assertEquals("null", new String(results)); + } + + @Test + void shouldSerializeNullAsNull() { + JsonbSerializer serializer = new JsonbSerializer<>(); + serializer.configure(Map.of(JsonbSerializer.NULL_AS_NULL_CONFIG, "true"), false); + byte[] results = serializer.serialize("topic", null); + assertNull(results); + } + +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/MyEntity.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/MyEntity.java new file mode 100644 index 0000000000000..aae6ac2cbb5d9 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/MyEntity.java @@ -0,0 +1,32 @@ +package io.quarkus.kafka.client.serialization; + +import java.util.Objects; + +public class MyEntity { + public long id; + public String name; + + // used by deserializers + public MyEntity() { + } + + public MyEntity(long id, String name) { + this.id = id; + this.name = name; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + MyEntity myEntity = (MyEntity) o; + return id == myEntity.id && Objects.equals(name, myEntity.name); + } + + @Override + public int hashCode() { + return Objects.hash(id, name); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/ObjectMapperDeserializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/ObjectMapperDeserializerTest.java new file mode 100644 index 0000000000000..0f30ed079960c --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/ObjectMapperDeserializerTest.java @@ -0,0 +1,47 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.type.TypeReference; + +class ObjectMapperDeserializerTest { + @Test + void shouldDeserializeEntity() { + MyEntity expected = new MyEntity(1, "entity1"); + ObjectMapperDeserializer deserializer = new ObjectMapperDeserializer<>(MyEntity.class); + MyEntity actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes()); + assertNotNull(actual); + assertEquals(expected, actual); + } + + @Test + void shouldDeserializeListOfEntities() { + TypeReference> listType = new TypeReference<>() { + }; + ObjectMapperDeserializer> deserializer = new ObjectMapperDeserializer<>(listType); + List actuals = deserializer.deserialize("topic", + "[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]".getBytes()); + assertNotNull(actuals); + assertEquals(2, actuals.size()); + } + + @Test + void shouldDeserializeNullAsNullString() { + ObjectMapperDeserializer deserializer = new ObjectMapperDeserializer<>(MyEntity.class); + MyEntity results = deserializer.deserialize("topic", "null".getBytes()); + assertNull(results); + } + + @Test + void shouldDeserializeNullAsNull() { + ObjectMapperDeserializer deserializer = new ObjectMapperDeserializer<>(MyEntity.class); + MyEntity results = deserializer.deserialize("topic", null); + assertNull(results); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializerTest.java new file mode 100644 index 0000000000000..02c9389f1bae2 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/ObjectMapperSerializerTest.java @@ -0,0 +1,49 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +class ObjectMapperSerializerTest { + + @Test + void shouldSerializeEntity() { + ObjectMapperSerializer serializer = new ObjectMapperSerializer<>(); + MyEntity entity = new MyEntity(1, "entity1"); + byte[] result = serializer.serialize("topic", entity); + assertNotNull(result); + assertEquals("{\"id\":1,\"name\":\"entity1\"}", new String(result)); + } + + @Test + void shouldSerializeListOfEntities() { + ObjectMapperSerializer> serializer = new ObjectMapperSerializer<>(); + MyEntity entity1 = new MyEntity(1, "entity1"); + MyEntity entity2 = new MyEntity(2, "entity2"); + byte[] result = serializer.serialize("topic", List.of(entity1, entity2)); + assertNotNull(result); + assertEquals("[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]", new String(result)); + } + + @Test + void shouldSerializeNullAsNullString() { + ObjectMapperSerializer serializer = new ObjectMapperSerializer<>(); + byte[] results = serializer.serialize("topic", null); + assertNotNull(results); + assertEquals("null", new String(results)); + } + + @Test + void shouldSerializeNullAsNull() { + ObjectMapperSerializer serializer = new ObjectMapperSerializer<>(); + serializer.configure(Map.of(ObjectMapperSerializer.NULL_AS_NULL_CONFIG, "true"), false); + byte[] results = serializer.serialize("topic", null); + assertNull(results); + } + +}