Skip to content

Commit

Permalink
[flink protobuf] add IT test
Browse files Browse the repository at this point in the history
  • Loading branch information
maosuhan committed Apr 28, 2021
1 parent 50cb883 commit e3f953d
Show file tree
Hide file tree
Showing 16 changed files with 465 additions and 66 deletions.
8 changes: 0 additions & 8 deletions flink-formats/flink-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ under the License.

<!-- test dependencies -->

<!-- JSON table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- JSON RowData schema test dependency -->
<dependency>
Expand Down
13 changes: 12 additions & 1 deletion flink-formats/flink-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,27 @@ under the License.
</dependency>

<!-- test dependencies -->
<!-- JSON RowData schema test dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- Json filesystem format factory ITCase test dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- test utils dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@

package org.apache.flink.formats.protobuf;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializeFactory;
import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializer;
import org.apache.flink.table.types.logical.LogicalType;

import com.google.protobuf.Descriptors.FieldDescriptor;
import org.codehaus.janino.SimpleCompiler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Codegen utils only used in protobuf format. */
public class PbCodegenUtils {
private static final Logger LOG = LoggerFactory.getLogger(PbCodegenUtils.class);

/**
* @param dataGetter code phrase which represent flink container type like row/array in codegen
* sections
Expand Down Expand Up @@ -238,4 +244,18 @@ public static String generateArrElementCodeWithDefaultValue(
appender.appendSegment("}");
return appender.code();
}

public static Class compileClass(ClassLoader classloader, String className, String code)
throws ClassNotFoundException {
SimpleCompiler simpleCompiler = new SimpleCompiler();
simpleCompiler.setParentClassLoader(classloader);
try {
simpleCompiler.cook(code);
} catch (Throwable t) {
LOG.error("Protobuf codegen compile error: \n" + code);
throw new InvalidProgramException(
"Program cannot be compiled. This is a bug. Please file an issue.", t);
}
return simpleCompiler.getClassLoader().loadClass(className);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
public class PbConstant {
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
public static final String PB_METHOD_PARSE_FROM = "parseFrom";
public static final String GENERATED_DECODE_METHOD = "decode";
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
public static final String PB_MAP_VALUE_NAME = "value";
public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.apache.flink.formats.protobuf;

import java.io.Serializable;
import java.util.Objects;

import static org.apache.flink.formats.protobuf.PbFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.protobuf.PbFormatOptions.READ_DEFAULT_VALUES;
import static org.apache.flink.formats.protobuf.PbFormatOptions.WRITE_NULL_STRING_LITERAL;

/** Config of protobuf configs. */
public class PbFormatConfig {
public class PbFormatConfig implements Serializable {
private String messageClassName;
private boolean ignoreParseErrors;
private boolean readDefaultValues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public static String getFullJavaName(Descriptors.Descriptor descriptor) {
}
}

public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
if (null != enumDescriptor.getContainingType()) {
return getFullJavaName(enumDescriptor.getContainingType())
+ "."
+ enumDescriptor.getName();
} else {
return enumDescriptor.getFullName();
}
}

public static boolean isSimpleType(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
Expand Down Expand Up @@ -126,16 +136,6 @@ public static Descriptors.Descriptor getDescriptor(String className) {
}
}

public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
if (null != enumDescriptor.getContainingType()) {
return getFullJavaName(enumDescriptor.getContainingType())
+ "."
+ enumDescriptor.getName();
} else {
return enumDescriptor.getFullName();
}
}

public static boolean isRepeatedType(LogicalType type) {
return type instanceof MapType || type instanceof ArrayType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.formats.protobuf.PbCodegenAppender;
import org.apache.flink.formats.protobuf.PbCodegenException;
import org.apache.flink.formats.protobuf.PbCodegenUtils;
import org.apache.flink.formats.protobuf.PbConstant;
import org.apache.flink.formats.protobuf.PbFormatConfig;
import org.apache.flink.formats.protobuf.PbFormatUtils;
Expand All @@ -33,7 +34,6 @@

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
import org.codehaus.janino.ScriptEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,15 +42,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* {@link ProtoToRowConverter} can convert binary protobuf message data to flink row data by codegen
* process.
*/
public class ProtoToRowConverter {
private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class);
private final ScriptEvaluator se;
private final Method parseFromMethod;
private final Method decodeMethod;

public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
Expand All @@ -67,34 +68,48 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
true,
formatConfig.getWriteNullStringLiterals());
}
se = new ScriptEvaluator();
se.setParameters(new String[] {"message"}, new Class[] {messageClass});
se.setReturnType(RowData.class);
se.setDefaultImports(
RowData.class.getName(),
ArrayData.class.getName(),
BinaryStringData.class.getName(),
GenericRowData.class.getName(),
GenericMapData.class.getName(),
GenericArrayData.class.getName(),
ArrayList.class.getName(),
List.class.getName(),
Map.class.getName(),
HashMap.class.getName());

PbCodegenAppender codegenAppender = new PbCodegenAppender();
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
String generatedClassName = "GeneratedProtoToRow_" + uuid;
String generatedPackageName = ProtoToRowConverter.class.getPackage().getName();
codegenAppender.appendLine("package " + generatedPackageName);
codegenAppender.appendLine("import " + RowData.class.getName());
codegenAppender.appendLine("import " + ArrayData.class.getName());
codegenAppender.appendLine("import " + BinaryStringData.class.getName());
codegenAppender.appendLine("import " + GenericRowData.class.getName());
codegenAppender.appendLine("import " + GenericMapData.class.getName());
codegenAppender.appendLine("import " + GenericArrayData.class.getName());
codegenAppender.appendLine("import " + ArrayList.class.getName());
codegenAppender.appendLine("import " + List.class.getName());
codegenAppender.appendLine("import " + Map.class.getName());
codegenAppender.appendLine("import " + HashMap.class.getName());

codegenAppender.appendSegment("public class " + generatedClassName + "{");
codegenAppender.appendSegment(
"public static RowData "
+ PbConstant.GENERATED_DECODE_METHOD
+ "("
+ PbFormatUtils.getFullJavaName(descriptor)
+ " message){");
codegenAppender.appendLine("RowData rowData=null");
PbCodegenDeserializer codegenDes =
PbCodegenDeserializeFactory.getPbCodegenTopRowDes(
descriptor, rowType, formatConfig);
String genCode = codegenDes.codegen("rowData", "message");
codegenAppender.appendSegment(genCode);
codegenAppender.appendLine("return rowData");
codegenAppender.appendSegment("}");
codegenAppender.appendSegment("}");

String printCode = codegenAppender.printWithLineNumber();
LOG.debug("Protobuf decode codegen: \n" + printCode);

se.cook(codegenAppender.code());
Class generatedClass =
PbCodegenUtils.compileClass(
this.getClass().getClassLoader(),
generatedPackageName + "." + generatedClassName,
codegenAppender.code());
decodeMethod =
generatedClass.getMethod(PbConstant.GENERATED_DECODE_METHOD, messageClass);
parseFromMethod = messageClass.getMethod(PbConstant.PB_METHOD_PARSE_FROM, byte[].class);
} catch (Exception ex) {
throw new PbCodegenException(ex);
Expand All @@ -103,6 +118,6 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)

public RowData convertProtoBinaryToRow(byte[] data) throws Exception {
Object messageObj = parseFromMethod.invoke(null, data);
return (RowData) se.evaluate(new Object[] {messageObj});
return (RowData) decodeMethod.invoke(null, messageObj);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public PbCodegenSimpleSerializer(

/**
* @param internalDataGetStr the real value of {@code internalDataGetStr} may be String, int,
* long, double, float, boolean, byte[] {@code internalDataGetStr} must not be null.
* long, double, float, boolean, byte[], enum value {@code internalDataGetStr} must not be
* null.
*/
@Override
public String codegen(String returnPbVarName, String internalDataGetStr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.formats.protobuf.PbCodegenAppender;
import org.apache.flink.formats.protobuf.PbCodegenException;
import org.apache.flink.formats.protobuf.PbCodegenUtils;
import org.apache.flink.formats.protobuf.PbConstant;
import org.apache.flink.formats.protobuf.PbFormatConfig;
import org.apache.flink.formats.protobuf.PbFormatUtils;
import org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter;
Expand All @@ -31,66 +33,79 @@
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import org.codehaus.janino.ScriptEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* {@link RowToProtoConverter} can convert flink row data to binary protobuf message data by codegen
* process.
*/
public class RowToProtoConverter {
private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class);
private final ScriptEvaluator se;

public RowToProtoConverter(RowType rowType, PbFormatConfig pbFormatConfig)
private final Method encodeMethod;

public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(pbFormatConfig.getMessageClassName());
se = new ScriptEvaluator();
se.setParameters(new String[] {"rowData"}, new Class[] {RowData.class});
se.setReturnType(AbstractMessage.class);
se.setDefaultImports(
// pb
AbstractMessage.class.getName(),
Descriptors.class.getName(),
// flink row
RowData.class.getName(),
ArrayData.class.getName(),
StringData.class.getName(),
ByteString.class.getName(),
// java common
List.class.getName(),
ArrayList.class.getName(),
Map.class.getName(),
HashMap.class.getName());
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
Class<?> messageClass = Class.forName(formatConfig.getMessageClassName());

PbCodegenAppender codegenAppender = new PbCodegenAppender();
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
String generatedClassName = "GeneratedRowToProto_" + uuid;
String generatedPackageName = RowToProtoConverter.class.getPackage().getName();
codegenAppender.appendLine("package " + generatedPackageName);
codegenAppender.appendLine("import " + AbstractMessage.class.getName());
codegenAppender.appendLine("import " + Descriptors.class.getName());
codegenAppender.appendLine("import " + RowData.class.getName());
codegenAppender.appendLine("import " + ArrayData.class.getName());
codegenAppender.appendLine("import " + StringData.class.getName());
codegenAppender.appendLine("import " + ByteString.class.getName());
codegenAppender.appendLine("import " + List.class.getName());
codegenAppender.appendLine("import " + ArrayList.class.getName());
codegenAppender.appendLine("import " + Map.class.getName());
codegenAppender.appendLine("import " + HashMap.class.getName());

codegenAppender.appendSegment("public class " + generatedClassName + "{");
codegenAppender.appendSegment(
"public static AbstractMessage "
+ PbConstant.GENERATED_ENCODE_METHOD
+ "(RowData rowData){");
codegenAppender.appendLine("AbstractMessage message = null");
PbCodegenSerializer codegenSer =
PbCodegenSerializeFactory.getPbCodegenTopRowSer(
descriptor, rowType, pbFormatConfig);
descriptor, rowType, formatConfig);
String genCode = codegenSer.codegen("message", "rowData");
codegenAppender.appendSegment(genCode);
codegenAppender.appendLine("return message");
codegenAppender.appendSegment("}");
codegenAppender.appendSegment("}");

String printCode = codegenAppender.printWithLineNumber();
LOG.debug("Protobuf encode codegen: \n" + printCode);

se.cook(codegenAppender.code());
Class generatedClass =
PbCodegenUtils.compileClass(
this.getClass().getClassLoader(),
generatedPackageName + "." + generatedClassName,
codegenAppender.code());
encodeMethod =
generatedClass.getMethod(PbConstant.GENERATED_ENCODE_METHOD, RowData.class);
} catch (Exception ex) {
throw new PbCodegenException(ex);
}
}

public byte[] convertRowToProtoBinary(RowData rowData) throws Exception {
AbstractMessage message = (AbstractMessage) se.evaluate(new Object[] {rowData});
AbstractMessage message = (AbstractMessage) encodeMethod.invoke(null, rowData);
return message.toByteArray();
}
}
Loading

0 comments on commit e3f953d

Please sign in to comment.