From 88409edfe1e59f0bed76bd71a9ca5401335624b0 Mon Sep 17 00:00:00 2001 From: Phillip Kruger Date: Mon, 12 Feb 2024 18:25:42 +1100 Subject: [PATCH] Encode Kafka messages with UTF8 Signed-off-by: Phillip Kruger --- .../kafka/client/runtime/devui/KafkaTopicClient.java | 5 +++-- .../devui/model/converter/KafkaModelConverter.java | 8 ++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaTopicClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaTopicClient.java index 85d672c2ea45f..2cfd5b990dd7f 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaTopicClient.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaTopicClient.java @@ -249,8 +249,9 @@ private void assertRequestedPartitionsExist(String topicName, Collection(request.getTopic(), request.getPartition(), Bytes.wrap(request.getKey().getBytes()), - Bytes.wrap(request.getValue().getBytes())); + var record = new ProducerRecord<>(request.getTopic(), request.getPartition(), + Bytes.wrap(request.getKey().getBytes(StandardCharsets.UTF_8)), + Bytes.wrap(request.getValue().getBytes(StandardCharsets.UTF_8))); Optional.ofNullable(request.getHeaders()) .orElseGet(Collections::emptyMap) diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/converter/KafkaModelConverter.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/converter/KafkaModelConverter.java index 59101e9c11d9e..2b14ee5c8483a 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/converter/KafkaModelConverter.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/converter/KafkaModelConverter.java @@ -19,8 +19,12 @@ public KafkaMessage convert(ConsumerRecord message) { message.partition(), message.offset(), message.timestamp(), - Optional.ofNullable(message.key()).map(Bytes::toString).orElse(null), - Optional.ofNullable(message.value()).map(Bytes::toString).orElse(null), + Optional.ofNullable(message.key()).map((t) -> { + return new String(t.get(), StandardCharsets.UTF_8); + }).orElse(null), + Optional.ofNullable(message.value()).map((t) -> { + return new String(t.get(), StandardCharsets.UTF_8); + }).orElse(null), headers(message)); }