Skip to content

Commit

Permalink
[Improve][format] Using number format for Decimal type in `seatunnel-…
Browse files Browse the repository at this point in the history
…format-compatible-debezium-json` (#5803)
  • Loading branch information
hailin0 authored Nov 8, 2023
1 parent 9d1b258 commit 7b3a5d3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.utils.ReflectionUtils;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -30,7 +31,8 @@
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@RequiredArgsConstructor
public class DebeziumJsonConverter implements Serializable {
Expand Down Expand Up @@ -68,10 +70,12 @@ private void tryInit() {
synchronized (this) {
if (keyConverter == null) {
keyConverter = new JsonConverter();
keyConverter.configure(
Collections.singletonMap(
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, keySchemaEnable),
true);
Map<String, Object> configs = new HashMap<>();
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, keySchemaEnable);
configs.put(
JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
DecimalFormat.NUMERIC.name());
keyConverter.configure(configs, true);
keyConverterMethod =
ReflectionUtils.getDeclaredMethod(
JsonConverter.class,
Expand All @@ -88,10 +92,12 @@ private void tryInit() {
synchronized (this) {
if (valueConverter == null) {
valueConverter = new JsonConverter();
valueConverter.configure(
Collections.singletonMap(
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, valueSchemaEnable),
false);
Map<String, Object> configs = new HashMap<>();
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, valueSchemaEnable);
configs.put(
JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
DecimalFormat.NUMERIC.name());
valueConverter.configure(configs, false);
valueConverterMethod =
ReflectionUtils.getDeclaredMethod(
JsonConverter.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.format.compatible.debezium.json;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.util.Collections;

public class TestDebeziumJsonConverter {

@Test
public void testSerializeDecimalToNumber()
throws InvocationTargetException, IllegalAccessException, JsonProcessingException {
String key = "k";
String value = "v";
Struct keyStruct =
new Struct(SchemaBuilder.struct().field(key, Decimal.builder(2).build()).build());
keyStruct.put(key, BigDecimal.valueOf(1101, 2));
Struct valueStruct =
new Struct(SchemaBuilder.struct().field(value, Decimal.builder(2).build()).build());
valueStruct.put(value, BigDecimal.valueOf(1101, 2));

SourceRecord sourceRecord =
new SourceRecord(
Collections.emptyMap(),
Collections.emptyMap(),
null,
keyStruct.schema(),
keyStruct,
valueStruct.schema(),
valueStruct);

DebeziumJsonConverter converter = new DebeziumJsonConverter(false, false);
Assertions.assertEquals("{\"k\":11.01}", converter.serializeKey(sourceRecord));
Assertions.assertEquals("{\"v\":11.01}", converter.serializeValue(sourceRecord));
}
}

0 comments on commit 7b3a5d3

Please sign in to comment.