From 80e42c1f75f7f681b1f559230fbd573d54247210 Mon Sep 17 00:00:00 2001 From: maosuhan Date: Thu, 7 Jan 2021 12:30:33 +0800 Subject: [PATCH] add comment --- .../protobuf/deserialize/PbCodegenArrayDeserializer.java | 2 ++ .../protobuf/deserialize/PbCodegenDeserializeFactory.java | 2 ++ .../formats/protobuf/deserialize/PbCodegenMapDeserializer.java | 2 ++ .../formats/protobuf/deserialize/PbCodegenRowDeserializer.java | 2 ++ .../protobuf/deserialize/PbCodegenSimpleDeserializer.java | 2 ++ 5 files changed, 10 insertions(+) diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java index 4e760358089d2..9fdceb706575e 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java @@ -45,6 +45,8 @@ public PbCodegenArrayDeserializer( public String codegen( String returnVarName, String messageGetStr) throws PbCodegenException { + // The type of messageGetStr is a native List object, + // it should be converted to ArrayData of flink internal type PbCodegenVarId varUid = PbCodegenVarId.getInstance(); int uid = varUid.getAndIncrement(); String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializeFactory.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializeFactory.java index 2db3c4d889054..02bac12ad8a6d 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializeFactory.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializeFactory.java @@ -32,6 +32,8 @@ public static PbCodegenDeserializer getPbCodegenDes( Descriptors.FieldDescriptor fd, LogicalType type, boolean readDefaultValues) throws PbCodegenException { + // We do not use FieldDescriptor to check because when FieldDescriptor is an element type in array, + // FieldDescriptor.isRepeated() is still true if (type instanceof RowType) { return new PbCodegenRowDeserializer(fd.getMessageType(), (RowType) type, readDefaultValues); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java index 2a9fb83d2b47a..855657c5b7cc2 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java @@ -46,6 +46,8 @@ public PbCodegenMapDeserializer( public String codegen( String returnVarName, String messageGetStr) throws PbCodegenException { + // The type of messageGetStr is a native Map object, + // it should be converted to MapData of flink internal type PbCodegenVarId varUid = PbCodegenVarId.getInstance(); int uid = varUid.getAndIncrement(); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java index 083c391008503..cc41f28c46b57 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java @@ -50,6 +50,8 @@ public PbCodegenRowDeserializer( public String codegen( String returnVarName, String messageGetStr) throws PbCodegenException { + // The type of messageGetStr is a native pb object, + // it should be converted to RowData of flink internal type PbCodegenVarId varUid = PbCodegenVarId.getInstance(); int uid = varUid.getAndIncrement(); String pbMessageVar = "message" + uid; diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenSimpleDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenSimpleDeserializer.java index cb5a26f946e65..c27fcf07e7219 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenSimpleDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenSimpleDeserializer.java @@ -30,6 +30,8 @@ public PbCodegenSimpleDeserializer( @Override public String codegen(String returnVarName, String messageGetStr) { + // the type of messageGetStr must not be primitive type, + // it should convert to internal flink row type like StringData. StringBuilder sb = new StringBuilder(); switch (fd.getJavaType()) { case INT: