Skip to content

Commit

Permalink
Add support for optional types alongside extensions
Browse files Browse the repository at this point in the history
The test confluentTagsWithImportProto3 would fail with the following error: [libprotobuf FATAL src/google/protobuf/compiler/java/file.cc:150] CHECK failed: CollectExtensions(*dynamic_file_proto, extensions):

Also used this as an opportunity to add support for including custom proto descriptors during the protoc compilation process.
  • Loading branch information
dmariassy committed Aug 20, 2024
1 parent 4d1249a commit c0e710a
Show file tree
Hide file tree
Showing 13 changed files with 319 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -48,6 +49,7 @@
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.BASIC_AUTH_USER_INFO;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.BEARER_AUTH_TOKEN;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.CUSTOM_PROTO_INCLUDES;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.MESSAGE_NAME;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.PACKAGE_NAME;
Expand All @@ -60,6 +62,7 @@
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.SUBJECT;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.URL;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.USE_DEFAULT_PROTO_INCLUDES;
import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.WRITE_NULL_STRING_LITERAL;

public class ProtobufConfluentFormatFactoryUtils {
Expand All @@ -81,7 +84,9 @@ public static ProtoRegistryDynamicDeserializationSchema createDynamicDeserializa
rowType,
rowDataTypeInfo,
formatOptions.get(IGNORE_PARSE_ERRORS),
formatOptions.get(READ_DEFAULT_VALUES));
formatOptions.get(READ_DEFAULT_VALUES),
formatOptions.get(USE_DEFAULT_PROTO_INCLUDES),
parseCustomProtoIncludes(formatOptions.get(CUSTOM_PROTO_INCLUDES)));
}

public static ProtoRegistryDynamicSerializationSchema createDynamicSerializationSchema(
Expand All @@ -96,7 +101,9 @@ public static ProtoRegistryDynamicSerializationSchema createDynamicSerialization
rowType,
formatOptions.get(SUBJECT),
schemaRegistryClientProvider,
schemaRegistryURL);
schemaRegistryURL,
formatOptions.get(USE_DEFAULT_PROTO_INCLUDES),
parseCustomProtoIncludes(formatOptions.get(CUSTOM_PROTO_INCLUDES)));
}

public static Set<ConfigOption<?>> requiredOptions() {
Expand All @@ -108,6 +115,8 @@ public static Set<ConfigOption<?>> requiredOptions() {
public static Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(REGISTRY_CLIENT_CACHE_CAPACITY);
options.add(USE_DEFAULT_PROTO_INCLUDES);
options.add(CUSTOM_PROTO_INCLUDES);
options.add(SUBJECT);
options.add(MESSAGE_NAME);
options.add(PACKAGE_NAME);
Expand All @@ -130,6 +139,8 @@ public static Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
URL,
REGISTRY_CLIENT_CACHE_CAPACITY,
USE_DEFAULT_PROTO_INCLUDES,
CUSTOM_PROTO_INCLUDES,
SUBJECT,
MESSAGE_NAME,
PACKAGE_NAME,
Expand Down Expand Up @@ -176,6 +187,13 @@ public static void validateDynamicEncodingOptions(
buildOptionalPropertiesMap(formatOptions));
}

static List<String> parseCustomProtoIncludes(String customProtoIncludes) {
if (customProtoIncludes == null || customProtoIncludes.isEmpty()) {
return new ArrayList<>();
}
return Arrays.asList(customProtoIncludes.split(","));
}

private static @Nullable Map<String, String> buildOptionalPropertiesMap(
ReadableConfig formatOptions) {
final Map<String, String> properties = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@ public class ProtobufConfluentFormatOptions {
.withDescription(
"The number of schemas to cache in the Confluent Schema Registry client.");

public static final ConfigOption<Boolean> USE_DEFAULT_PROTO_INCLUDES =
ConfigOptions.key("use-default-proto-includes")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to include the following set of proto descriptors when calling protoc to generate the Java classes that are used for (de)-serialization: "
+ "confluent/meta.proto, confluent/types/decimal.proto, "
+ "/google/protobuf/any.proto, /google/protobuf/api.proto, "
+ "/google/protobuf/descriptor.proto, /google/protobuf/duration.proto, "
+ "/google/protobuf/empty.proto, /google/protobuf/field_mask.proto, "
+ "/google/protobuf/source_context.proto, /google/protobuf/struct.proto, "
+ "/google/protobuf/timestamp.proto, /google/protobuf/type.proto, "
+ "/google/protobuf/wrappers.proto");

public static final ConfigOption<String> CUSTOM_PROTO_INCLUDES =
ConfigOptions.key("custom-proto-includes")
.stringType()
.defaultValue("")
.withDescription(
"A comma-separated list of Java resource URLs that should be included when calling protoc to generate the Java classes that are used for (de)-serialization.");

// --------------------------------------------------------------------------------------------
// Serialization options
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -97,7 +118,7 @@ public class ProtobufConfluentFormatOptions {
.withDescription(
"Optional flag to read as default values instead of null when some field does not exist in deserialization; default to false."
+ "If proto syntax is proto3, users need to set this to true when using protobuf versions lower than 3.15 as older versions "
+ "do not support checking for field presence which can cause runtime compilation issues. Additionally, primtive types "
+ "do not support checking for field presence which can cause runtime compilation issues. Additionally, primitive types "
+ "will be set to default values instead of null as field presence cannot be checked for them. Please be aware that setting this"
+ " to true will cause the deserialization performance to be much slower depending on schema complexity and message size");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public TypeInformation<RowData> getProducedType() {
}

public static void updateRowType(MutableRowTypeSchema mutableRowTypeSchema) {
// Debezium Avro contains other information, e.g. "source", "ts_ms"
// Debezium Protobuf contains other information, e.g. "source", "ts_ms"
// but we don't need them
DataType originalDataType = fromLogicalToDataType(mutableRowTypeSchema.getRowType());
RowType newRowType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,19 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

/** Service to expose the functionality of the Protoc compiler. */
public class ProtoCompiler {
// Avoid clashes with classes generated by other threads in the same JVM

private final String classSuffix;
// The directory where all the generated files will be written to
private final Path parentDir;
private final Path protosDir;
private final Path classesDir;
private final Path confluentProtosDir;
private final String protocVersion;

private static final String CONFLUENT = "confluent";
Expand All @@ -59,28 +58,68 @@ public class ProtoCompiler {
String.format("%s/%s/%s", CONFLUENT, "type", "decimal.proto");
// TODO: Hava constructor that can fetch the protocVersion from project properties
private static final String DEFAULT_PROTOC_VERSION = "3.21.7";
private static final List<String> DEFAULT_INCLUDES =
Arrays.asList(
"/" + META_PROTO,
"/" + DECIMAL_PROTO,
"/google/protobuf/any.proto",
"/google/protobuf/api.proto",
"/google/protobuf/descriptor.proto",
"/google/protobuf/duration.proto",
"/google/protobuf/empty.proto",
"/google/protobuf/field_mask.proto",
"/google/protobuf/source_context.proto",
"/google/protobuf/struct.proto",
"/google/protobuf/timestamp.proto",
"/google/protobuf/type.proto",
"/google/protobuf/wrappers.proto");

public ProtoCompiler(Path parentDir, String protocVersion, String classSuffix) {
/**
* @param parentDir The directory where the proto files and generated classes will be written to
* @param protocVersion The version of protoc to use
* @param classSuffix The suffix to append to the generated class name
* @param customIncludes An optional list of resource URLs to copy to the parent directory, and
* include when invoking protoc
* @param useDefaultIncludes Whether to include the default set of proto descriptors from {@link
* ProtoCompiler#DEFAULT_INCLUDES}
*/
public ProtoCompiler(
Path parentDir,
String protocVersion,
String classSuffix,
boolean useDefaultIncludes,
String... customIncludes) {
this.classSuffix = classSuffix;
this.parentDir = parentDir;
this.protosDir = createChildDir(parentDir, "protos");
this.classesDir = createChildDir(parentDir, "classes");
this.confluentProtosDir = createChildDir(parentDir, CONFLUENT);
this.protocVersion = protocVersion;
this.copyConfluentProto("/" + META_PROTO);
this.copyConfluentProto("/" + DECIMAL_PROTO);
List<String> resolvedIncludes = new ArrayList<>();
if (useDefaultIncludes) {
resolvedIncludes.addAll(DEFAULT_INCLUDES);
}
if (customIncludes != null && customIncludes.length > 0) {
resolvedIncludes.addAll(Arrays.asList(customIncludes));
}
for (String include : resolvedIncludes) {
copyIncludedProto(include);
}
}

public ProtoCompiler(String protocVersion, String classSuffix) {
this(createParentDir(), protocVersion, classSuffix);
public ProtoCompiler(
String protocVersion,
String classSuffix,
boolean useDefaultIncludes,
String... customIncludes) {
this(createParentDir(), protocVersion, classSuffix, useDefaultIncludes, customIncludes);
}

public ProtoCompiler() {
this(DEFAULT_PROTOC_VERSION, generateClassSuffix());
public ProtoCompiler(boolean useDefaultIncludes, String... customIncludes) {
this(DEFAULT_PROTOC_VERSION, generateClassSuffix(), useDefaultIncludes, customIncludes);
}

public ProtoCompiler(String classSuffix) {
this(DEFAULT_PROTOC_VERSION, classSuffix);
public ProtoCompiler(String classSuffix, boolean useDefaultIncludes, String... customIncludes) {
this(DEFAULT_PROTOC_VERSION, classSuffix, useDefaultIncludes, customIncludes);
}

/**
Expand Down Expand Up @@ -149,8 +188,7 @@ private Path getJavaClassDefPath(Path javaOutDir, String packageName, String cla
private Path generateJavaClassDef(Path schemaFilePath, String className) {
Path javaOutDir = createChildDir(classesDir, className);
String[] args = {
"--include_std_types", // include google types
"-I" + parentDir.toString(), // include confluent types
"-I" + parentDir.toString(), // include custom types
"-v" + protocVersion,
"--java_out=" + javaOutDir.toString(),
"--proto_path=" + protosDir.toString(),
Expand Down Expand Up @@ -271,26 +309,26 @@ private String getClassNameFromProto(ProtobufSchema protobufSchema) {
return classNameParts[classNameParts.length - 1];
}

private void copyConfluentProto(String resourceName) {
InputStream protoFileStream = ProtoCompiler.class.getResourceAsStream(resourceName);
private void copyIncludedProto(String includedProto) {
InputStream protoFileStream = ProtoCompiler.class.getResourceAsStream(includedProto);
if (protoFileStream == null) {
throw new IllegalArgumentException("Could not find the proto file: " + resourceName);
throw new IllegalArgumentException("Could not find the proto file: " + includedProto);
}
BufferedReader reader =
new BufferedReader(new InputStreamReader(protoFileStream, StandardCharsets.UTF_8));
String protoFileContents = reader.lines().collect(Collectors.joining("\n"));

// Create any necessary subdirectory
String[] resourceNameParts = resourceName.substring(1).split("/");
for (int i = 1; i < resourceNameParts.length - 1; i++) {
String[] resourceNameParts = includedProto.substring(1).split("/");
for (int i = 0; i < resourceNameParts.length - 1; i++) {
String childDirName = resourceNameParts[i];
Path currentParentDir =
parentDir.resolve(
String.join("/", Arrays.copyOfRange(resourceNameParts, 0, i)));
createChildDir(currentParentDir, childDirName);
}

Path protoFilePath = parentDir.resolve(resourceName.substring(1));
Path protoFilePath = parentDir.resolve(includedProto.substring(1));
try {
Files.write(protoFilePath, protoFileContents.getBytes());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -61,6 +62,8 @@ public class ProtoRegistryDynamicDeserializationSchema
private final boolean readDefaultValues;
private final String schemaRegistryUrl;
private final SchemaRegistryClientProvider schemaRegistryClientProvider;
private final boolean useDefaultProtoIncludes;
private final List<String> customProtoIncludes;

private transient SchemaRegistryClient schemaRegistryClient;
// Since these services operate on dynamically compiled and loaded classes, we need to
Expand All @@ -77,14 +80,18 @@ public ProtoRegistryDynamicDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean ignoreParseErrors,
boolean readDefaultValues) {
boolean readDefaultValues,
boolean useDefaultProtoIncludes,
List<String> customProtoIncludes) {
this.rowType = rowType;
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.readDefaultValues = readDefaultValues;
this.schemaRegistryClientProvider = schemaRegistryClientProvider;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProtobufDeserializers = new HashMap<>();
this.useDefaultProtoIncludes = useDefaultProtoIncludes;
this.customProtoIncludes = customProtoIncludes;
}

@Override
Expand Down Expand Up @@ -123,7 +130,10 @@ public TypeInformation<RowData> getProducedType() {
public void open(InitializationContext context) throws Exception {
schemaRegistryClient = schemaRegistryClientProvider.createSchemaRegistryClient();
protoToRowConverters = new HashMap<>();
protoCompiler = new ProtoCompiler();
protoCompiler =
new ProtoCompiler(
this.useDefaultProtoIncludes,
this.customProtoIncludes.toArray(new String[0]));
generatedMessageClasses = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -51,6 +52,8 @@ public class ProtoRegistryDynamicSerializationSchema
private final String subjectName;
private final String schemaRegistryUrl;
private final SchemaRegistryClientProvider schemaRegistryClientProvider;
private final boolean useDefaultProtoIncludes;
private final List<String> customProtoIncludes;

private transient SchemaRegistryClient schemaRegistryClient;
private transient RowToProtoConverter rowToProtoConverter;
Expand All @@ -61,13 +64,17 @@ public ProtoRegistryDynamicSerializationSchema(
RowType rowType,
String subjectName,
SchemaRegistryClientProvider schemaRegistryClientProvider,
String schemaRegistryUrl) {
String schemaRegistryUrl,
boolean useDefaultProtoIncludes,
List<String> customProtoIncludes) {
this.generatedPackageName = generatedPackageName;
this.generatedClassName = generatedClassName;
this.rowType = rowType;
this.subjectName = subjectName;
this.schemaRegistryClientProvider = schemaRegistryClientProvider;
this.schemaRegistryUrl = schemaRegistryUrl;
this.useDefaultProtoIncludes = useDefaultProtoIncludes;
this.customProtoIncludes = customProtoIncludes;
}

@Override
Expand Down Expand Up @@ -106,7 +113,11 @@ private Class generateProtoClassForRowType() throws Exception {
new RowToProtobufSchemaConverter(generatedPackageName, generatedClassName, rowType);

ProtobufSchema protoSchema = rowToProtobufSchemaConverter.convert();
ProtoCompiler protoCompiler = new ProtoCompiler(PROTOBUF_OUTER_CLASS_NAME_SUFFIX);
ProtoCompiler protoCompiler =
new ProtoCompiler(
PROTOBUF_OUTER_CLASS_NAME_SUFFIX,
this.useDefaultProtoIncludes,
this.customProtoIncludes.toArray(new String[0]));
return protoCompiler.generateMessageClass(protoSchema, null);
}

Expand Down
Loading

0 comments on commit c0e710a

Please sign in to comment.