Skip to content

Commit

Permalink
Dynamic serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
dmariassy committed Jul 24, 2024
1 parent 0211d39 commit c40a4e4
Show file tree
Hide file tree
Showing 8 changed files with 764 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.protobuf.registry.confluent.dynamic.serializer;

import com.google.protobuf.AbstractMessage;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;

import org.apache.flink.formats.protobuf.serialize.MessageSerializer;

public class ConfluentMessageSerializer implements MessageSerializer {
private final KafkaProtobufSerializer kafkaProtobufSerializer;
private final String subjectName;

public ConfluentMessageSerializer(KafkaProtobufSerializer kafkaProtobufSerializer, String subjectName) {
this.kafkaProtobufSerializer = kafkaProtobufSerializer;
this.subjectName = subjectName;
}

@Override
public byte[] serialize(AbstractMessage message) {
return kafkaProtobufSerializer.serialize(subjectName, message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.protobuf.registry.confluent.dynamic.serializer;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

import java.util.Map;

/**
* {@link io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy} implementation that
* treats the provided topic as the subject name without appending a "-key" or "-value" suffix.
*/
public class NoSuffixTopicNameStrategy implements SubjectNameStrategy {
@Override
public String subjectName(String topic, boolean isKey, ParsedSchema parsedSchema) {
return topic;
}

@Override
public void configure(Map<String, ?> map) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.protobuf.registry.confluent.dynamic.serializer;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.protobuf.PbFormatConfig;
import org.apache.flink.formats.protobuf.serialize.MessageSerializer;
import org.apache.flink.formats.protobuf.serialize.RowToProtoConverter;
import org.apache.flink.protobuf.registry.confluent.dynamic.ProtoCompiler;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;

import java.util.HashMap;
import java.util.Map;

public class ProtoRegistryDynamicSerializationSchema implements SerializationSchema<RowData> {

private static final String PROTOBUF_OUTER_CLASS_NAME_SUFFIX = "OuterClass";

private final String generatedPackageName;
private final String generatedClassName;
private final RowType rowType;
private final String subjectName;
private final SchemaRegistryClient schemaRegistryClient;
private final String schemaRegistryUrl;

private transient RowToProtoConverter rowToProtoConverter;

public ProtoRegistryDynamicSerializationSchema(
String generatedPackageName,
String generatedClassName,
RowType rowType,
String subjectName,
SchemaRegistryClient schemaRegistryClient,
String schemaRegistryUrl) {
this.generatedPackageName = generatedPackageName;
this.generatedClassName = generatedClassName;
this.rowType = rowType;
this.subjectName = subjectName;
this.schemaRegistryClient = schemaRegistryClient;
this.schemaRegistryUrl = schemaRegistryUrl;
}

@Override
public byte[] serialize(RowData element) {
try {
return rowToProtoConverter.convertRowToProtoBinary(element);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}

@Override
public void open(InitializationContext context) throws Exception {
Class generatedClass = generateProtoClassForRowType();
KafkaProtobufSerializer kafkaProtobufSerializer = createKafkaSerializer();
MessageSerializer messageSerializer = new ConfluentMessageSerializer(kafkaProtobufSerializer, subjectName);
PbFormatConfig formatConfig = new PbFormatConfig(generatedClass.getName(), false, true, null);
rowToProtoConverter = new RowToProtoConverter(rowType, formatConfig, messageSerializer);
}

private Class generateProtoClassForRowType() throws Exception {
RowToProtobufSchemaConverter rowToProtobufSchemaConverter = new RowToProtobufSchemaConverter(
generatedPackageName, generatedClassName, rowType
);

ProtobufSchema protoSchema = rowToProtobufSchemaConverter.convert();
ProtoCompiler protoCompiler = new ProtoCompiler(PROTOBUF_OUTER_CLASS_NAME_SUFFIX);
return protoCompiler.generateMessageClass(protoSchema, null);
}

private KafkaProtobufSerializer createKafkaSerializer() {
Map<String, String> opts = new HashMap<>();
opts.put("schema.registry.url", schemaRegistryUrl);
opts.put("auto.register.schemas", "true");
opts.put("value.subject.name.strategy", NoSuffixTopicNameStrategy.class.getName());
boolean isKey = false;

KafkaProtobufSerializer ser = new KafkaProtobufSerializer(schemaRegistryClient);
ser.configure(opts, isKey);

return ser;
}
}
Loading

0 comments on commit c40a4e4

Please sign in to comment.