Skip to content

Commit

Permalink
Support confluent decimal types
Browse files Browse the repository at this point in the history
  • Loading branch information
dmariassy committed Jul 26, 2024
1 parent 4044a8d commit 4d1249a
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.Descriptors;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.protobuf.MetaProto;
import io.confluent.protobuf.type.DecimalProto;

import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
Expand All @@ -37,6 +38,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

Expand All @@ -53,6 +55,8 @@ public class ProtoCompiler {

private static final String CONFLUENT = "confluent";
private static final String META_PROTO = String.format("%s/%s", CONFLUENT, "meta.proto");
private static final String DECIMAL_PROTO =
String.format("%s/%s/%s", CONFLUENT, "type", "decimal.proto");
// TODO: Hava constructor that can fetch the protocVersion from project properties
private static final String DEFAULT_PROTOC_VERSION = "3.21.7";

Expand All @@ -64,7 +68,7 @@ public ProtoCompiler(Path parentDir, String protocVersion, String classSuffix) {
this.confluentProtosDir = createChildDir(parentDir, CONFLUENT);
this.protocVersion = protocVersion;
this.copyConfluentProto("/" + META_PROTO);
this.copyConfluentProto(String.format("/%s/%s/%s", CONFLUENT, "type", "decimal.proto"));
this.copyConfluentProto("/" + DECIMAL_PROTO);
}

public ProtoCompiler(String protocVersion, String classSuffix) {
Expand Down Expand Up @@ -219,13 +223,8 @@ private ProtobufSchema setProtoOptions(ProtobufSchema protobufSchema, String cla
DescriptorProtos.FileDescriptorProto.Builder newProtoBuilder =
originalDescriptor.toProto().toBuilder().setOptions(newOptions);

// Debezium schemas sometimes contain references to "confluent/meta.proto" message types
// but don't contain the import statement.
if (!originalDescriptor.getDependencies().contains(MetaProto.getDescriptor())) {
newProtoBuilder.addDependency(META_PROTO);
}

DescriptorProtos.FileDescriptorProto newProto = newProtoBuilder.build();
DescriptorProtos.FileDescriptorProto newProto =
maybeAddConfluentImports(newProtoBuilder, originalDescriptor.getDependencies());
try {
return new ProtobufSchema(
Descriptors.FileDescriptor.buildFrom(
Expand All @@ -234,10 +233,39 @@ private ProtobufSchema setProtoOptions(ProtobufSchema protobufSchema, String cla
.getDependencies()
.toArray(new Descriptors.FileDescriptor[0])));
} catch (Exception e) {
throw new RuntimeException("Failed to set Java outer class name.", e);
throw new RuntimeException("Failed to set proto options.", e);
}
}

// Debezium schemas sometimes contain references to confluent message types
// but don't contain the necessary import statement. This can lead to
// protoc errors. We will add the necessary imports if they are missing.
private DescriptorProtos.FileDescriptorProto maybeAddConfluentImports(
DescriptorProtos.FileDescriptorProto.Builder newProtoBuilder,
List<Descriptors.FileDescriptor> originalDependencies) {
List<String> originalDependencyNames =
originalDependencies.stream()
.map(
dep -> {
if (dep.getPackage().isEmpty() || dep.getName().contains("/")) {
return dep.getName();
}
return String.format("%s/%s", dep.getPackage(), dep.getName());
})
.collect(Collectors.toList());

if (!originalDependencyNames.contains(META_PROTO)
&& !originalDependencies.contains(MetaProto.getDescriptor())) {
newProtoBuilder.addDependency(META_PROTO);
}
if (!originalDependencyNames.contains(DECIMAL_PROTO)
&& !originalDependencies.contains(DecimalProto.getDescriptor())) {
newProtoBuilder.addDependency(DECIMAL_PROTO);
}

return newProtoBuilder.build();
}

private String getClassNameFromProto(ProtobufSchema protobufSchema) {
String[] classNameParts = protobufSchema.name().split("\\.");
return classNameParts[classNameParts.length - 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,33 @@ public void confluentTagsWithMissingImportProto3() throws Exception {
protoCompiler.generateMessageClass(schema, DEFAULT_SCHEMA_ID);
}

@Test
public void confluentTagsWithImportProto3() throws Exception {
// the schemas registered by Debezium don't always seem to have the import statement in
// the definition
String schemaStr =
"syntax = \"proto3\";\n"
+ "package org.apache.flink.formats.protobuf.proto;\n"
+ "import \"confluent/meta.proto\";\n"
+ "\n"
+ "message ConfluentTagsWithImportProto3 {\n"
+ " int32 int = 1 [(confluent.field_meta) = {\n"
+ " params: [\n"
+ " {\n"
+ " key: \"connect.type\",\n"
+ " value: \"int16\"\n"
+ " }\n"
+ " ]\n"
+ " }];\n"
+ "}";

ProtobufSchema schema = new ProtobufSchema(schemaStr);
ProtoCompiler protoCompiler = new ProtoCompiler(DEFAULT_CLASS_SUFFIX);

// We just want to check that the compiler doesn't throw an exception
protoCompiler.generateMessageClass(schema, DEFAULT_SCHEMA_ID);
}

@Test
public void flatProto2() throws Exception {
FlatProto2OuterClass.FlatProto2 in =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.protobuf.registry.confluent.dynamic.deserializer;

import org.apache.flink.formats.protobuf.proto.AddressAndUser;
import org.apache.flink.formats.protobuf.proto.ConfluentDebeziumProto3;
import org.apache.flink.formats.protobuf.proto.FlatProto3OuterClass;
import org.apache.flink.formats.protobuf.proto.MapProto3;
import org.apache.flink.formats.protobuf.proto.NestedProto3OuterClass;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
Expand All @@ -42,6 +44,7 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.protobuf.type.Decimal;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -54,6 +57,8 @@
import static org.apache.flink.protobuf.registry.confluent.TestUtils.FAKE_TOPIC;
import static org.apache.flink.protobuf.registry.confluent.TestUtils.IGNORE_PARSE_ERRORS;
import static org.apache.flink.protobuf.registry.confluent.TestUtils.READ_DEFAULT_VALUES;
import static org.apache.flink.protobuf.registry.confluent.TestUtils.TEST_BYTES;
import static org.apache.flink.protobuf.registry.confluent.TestUtils.TEST_INT;

class ProtoRegistryDynamicDeserializationSchemaTest {

Expand Down Expand Up @@ -286,6 +291,48 @@ TestUtils.SECONDS_FIELD, new BigIntType()),
Assertions.assertEquals(timestampValue, actual.getRow(0, 2));
}

@Test
public void deserializeConfluentDecimal() throws Exception {
ConfluentDebeziumProto3.DecimalProto3 in =
ConfluentDebeziumProto3.DecimalProto3.newBuilder()
.setDecimal(
Decimal.newBuilder()
.setValue(TEST_BYTES)
.setPrecision(TEST_INT * 2)
.setScale(TEST_INT)
.build())
.build();
byte[] inBytes = kafkaProtobufSerializer.serialize(FAKE_TOPIC, in);

RowType rowType =
TestUtils.createRowType(
new RowType.RowField(
"decimal",
TestUtils.createRowType(
new RowType.RowField("value", new BinaryType()),
new RowType.RowField("precision", new IntType()),
new RowType.RowField("scale", new IntType()))));

ProtoRegistryDynamicDeserializationSchema deser =
new ProtoRegistryDynamicDeserializationSchema(
mockSchemaRegistryClientProvider,
DUMMY_SCHEMA_REGISTRY_URL,
rowType,
null,
IGNORE_PARSE_ERRORS,
READ_DEFAULT_VALUES);
deser.open(null);

RowData actual = deser.deserialize(inBytes);
Assertions.assertEquals(1, actual.getArity());

GenericRowData decimalValue = new GenericRowData(3);
decimalValue.setField(0, TestUtils.TEST_BYTES.toByteArray());
decimalValue.setField(1, TestUtils.TEST_INT * 2);
decimalValue.setField(2, TestUtils.TEST_INT);
Assertions.assertEquals(decimalValue, actual.getRow(0, 3));
}

@Test
public void deserializeUsingSchemaWithReferences() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";
package org.apache.flink.formats.protobuf.proto;

import "confluent/type/decimal.proto";

message DecimalProto3 {
confluent.type.Decimal decimal = 1;
}

0 comments on commit 4d1249a

Please sign in to comment.