Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
support protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
jianyun8023 committed Feb 25, 2021
1 parent 8ecac83 commit 717c296
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ public enum RecordSchemaType {

JSON,

ATOMIC
ATOMIC,

PROTOBUF
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ public void putSchema(ObjectPath tablePath, CatalogBaseTable table, String forma
throws PulsarAdminException, IncompatibleSchemaException {
String topicName = objectPath2TopicName(tablePath);
final TableSchema schema = table.getSchema();
pulsarMetadataReader.putSchema(topicName, tableSchemaToPulsarSchema(format, schema));
pulsarMetadataReader.putSchema(topicName, tableSchemaToPulsarSchema(format, schema, table.getOptions()));
}

private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema) throws IncompatibleSchemaException {
private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema,
Map<String, String> options) throws IncompatibleSchemaException {
// The exclusion logic for the key is not handled correctly here when the user sets the key-related fields using pulsar
final DataType physicalRowDataType = schema.toPhysicalRowDataType();
return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType);
return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType, options);
}

private TableSchema pulsarSchemaToTableSchema(SchemaInfo pulsarSchema) throws IncompatibleSchemaException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.protobuf.PbFormatOptions;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import com.google.protobuf.GeneratedMessageV3;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -50,6 +52,7 @@

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

import static org.apache.avro.Schema.Type.RECORD;
import static org.apache.pulsar.shade.com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -243,18 +246,35 @@ public static <T> org.apache.pulsar.client.api.Schema<T> buildSchemaForRecordCla
return org.apache.pulsar.client.api.Schema.AVRO(recordClazz);
case JSON:
return org.apache.pulsar.client.api.Schema.JSON(recordClazz);
case PROTOBUF:
@SuppressWarnings("unchecked")
final org.apache.pulsar.client.api.Schema<T> tSchema =
(org.apache.pulsar.client.api.Schema<T>) org.apache.pulsar.client.api.Schema
.PROTOBUF_NATIVE(convertProtobuf(recordClazz));
return tSchema;
default:
throw new IllegalArgumentException("not support schema type " + recordSchemaType);
}
}

public static SchemaInfo tableSchemaToSchemaInfo(String format, DataType dataType)
@SuppressWarnings("unchecked")
private static <T extends GeneratedMessageV3> Class<T> convertProtobuf(Class recordClazz) {
if (!recordClazz.isAssignableFrom(GeneratedMessageV3.class)) {
throw new IllegalArgumentException("not support protobuf class " + recordClazz);
}
return recordClazz;
}

public static SchemaInfo tableSchemaToSchemaInfo(String format, DataType dataType,
Map<String, String> options)
throws IncompatibleSchemaException {
switch (StringUtils.lowerCase(format)) {
case "json":
return getSchemaInfo(SchemaType.JSON, dataType);
case "avro":
return getSchemaInfo(SchemaType.AVRO, dataType);
case "protobuf":
return getProtobufSchemaInfo(SchemaType.PROTOBUF, dataType, options);
case "atomic":
org.apache.pulsar.client.api.Schema pulsarSchema =
SimpleSchemaTranslator.sqlType2PulsarSchema(dataType.getChildren().get(0));
Expand All @@ -265,6 +285,21 @@ public static SchemaInfo tableSchemaToSchemaInfo(String format, DataType dataTyp
}
}

// TODO use user classload
private static <T extends GeneratedMessageV3> SchemaInfo getProtobufSchemaInfo(SchemaType protobuf,
DataType dataType,
Map<String, String> options) {

final String messageClassName = options.get(PbFormatOptions.MESSAGE_CLASS_NAME.key());
try {
final org.apache.pulsar.client.api.Schema<T> tSchema = org.apache.pulsar.client.api.Schema
.PROTOBUF_NATIVE(convertProtobuf(Class.forName(messageClassName)));
return tSchema.getSchemaInfo();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("not load Protobuf class: " + messageClassName, e);
}
}

public static SchemaInfo getSchemaInfo(SchemaType type, DataType dataType) {
byte[] schemaBytes = getAvroSchema(dataType).toString().getBytes(StandardCharsets.UTF_8);
return SchemaInfo.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.protobuf.PbFormatOptions;
import org.apache.flink.formats.protobuf.serialize.PbRowSerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.util.serialization.FlinkSchema;
import org.apache.flink.streaming.util.serialization.PulsarContextAware;
Expand All @@ -28,13 +30,15 @@
import org.apache.flink.types.RowKind;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -77,6 +81,8 @@ class DynamicPulsarSerializationSchema

private String valueFormatType;

private ClassLoader userClassLoader;

DynamicPulsarSerializationSchema(
@Nullable SerializationSchema<RowData> keySerialization,
SerializationSchema<RowData> valueSerialization,
Expand Down Expand Up @@ -108,6 +114,7 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
keySerialization.open(context);
}
valueSerialization.open(context);
userClassLoader = context.getUserCodeClassLoader().asClassLoader();
}

@Override
Expand Down Expand Up @@ -192,10 +199,25 @@ public Schema<RowData> getSchema() {
if (StringUtils.isBlank(valueFormatType)) {
return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), valueSerialization, null);
}
SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType);
Map<String, String> options = new HashMap<>();
hackPbSerializationSchema(options);
SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, options);
return new FlinkSchema<>(schemaInfo, valueSerialization, null);
}

private void hackPbSerializationSchema(Map<String, String> options) {
// reflect read PbRowSerializationSchema#messageClassName
if (valueSerialization instanceof PbRowSerializationSchema){
try {
String messageClassName =
(String) FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true);
options.put(PbFormatOptions.MESSAGE_CLASS_NAME.key(),messageClassName);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}

// --------------------------------------------------------------------------------------------

interface MetadataConverter extends Serializable {
Expand Down

0 comments on commit 717c296

Please sign in to comment.