diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java index 1c9306fdde977..f1e0d49cfd388 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java @@ -59,10 +59,11 @@ public void testSource() { } @Test - public void testSink() throws Exception { + public void testSourceWithDefaultValue() { + MapTest mapTest = MapTest.newBuilder().build(); + TestProtobufSourceFunction.messages.clear(); - TestProtobufSourceFunction.messages.add(getProtoTestObject()); - TestProtobufSinkFunction.results.clear(); + TestProtobufSourceFunction.messages.add(mapTest); env().setParallelism(1); String sql = @@ -73,10 +74,23 @@ public void testSink() throws Exception { + ") with (" + " 'connector' = 'protobuf-test-connector', " + " 'format' = 'protobuf', " - + " 'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.MapTest'" + + " 'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.MapTest', " + + " 'protobuf.read-default-values' = 'true' " + ")"; tEnv().executeSql(sql); - sql = + TableResult result = tEnv().executeSql("select * from bigdata_source"); + Row row = result.collect().next(); + assertEquals(0, (int) row.getField(0)); + } + + @Test + public void testSink() throws Exception { + TestProtobufSourceFunction.messages.clear(); + TestProtobufSourceFunction.messages.add(getProtoTestObject()); + TestProtobufSinkFunction.results.clear(); + + env().setParallelism(1); + String sql = "create table bigdata_sink ( " + " a int, " + " map1 map," @@ -88,7 +102,8 @@ public void testSink() throws Exception { + ")"; tEnv().executeSql(sql); TableResult tableResult = - tEnv().executeSql("insert into bigdata_sink select * from bigdata_source"); + tEnv().executeSql( + "insert into bigdata_sink select 1, map['a', 'b', 'c', 'd'], map['f', row(1,cast(2 as bigint))] "); tableResult.await(); byte[] bytes = TestProtobufSinkFunction.results.get(0); @@ -100,4 +115,36 @@ public void testSink() throws Exception { assertEquals(1, innerMessageTest.getA()); assertEquals(2L, innerMessageTest.getB()); } + + @Test + public void testSinkWithNullLiteral() throws Exception { + TestProtobufSourceFunction.messages.clear(); + TestProtobufSourceFunction.messages.add(getProtoTestObject()); + TestProtobufSinkFunction.results.clear(); + + env().setParallelism(1); + String sql = + "create table bigdata_sink ( " + + " a int, " + + " map1 map," + + " map2 map>" + + ") with (" + + " 'connector' = 'protobuf-test-connector', " + + " 'format' = 'protobuf', " + + " 'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.MapTest', " + + " 'protobuf.write-null-string-literal' = 'NULL' " + + ")"; + tEnv().executeSql(sql); + TableResult tableResult = + tEnv().executeSql( + "insert into bigdata_sink select 1, map['a', null], map['b', cast(null as row)]"); + tableResult.await(); + + byte[] bytes = TestProtobufSinkFunction.results.get(0); + MapTest mapTest = MapTest.parseFrom(bytes); + assertEquals(1, mapTest.getA()); + assertEquals("NULL", mapTest.getMap1Map().get("a")); + MapTest.InnerMessageTest innerMessageTest = mapTest.getMap2Map().get("b"); + assertEquals(MapTest.InnerMessageTest.getDefaultInstance(), innerMessageTest); + } } diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableFactory.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableFactory.java index 71a4ac72630a1..d3ee7980b589d 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableFactory.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableFactory.java @@ -18,7 +18,7 @@ import java.util.HashSet; import java.util.Set; -/** */ +/** Test protobuf table factory. Only used in flink ptotobuf test module. */ public class TestProtobufTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSink.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSink.java index f32a684041a1d..435e96850922c 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSink.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSink.java @@ -8,7 +8,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; -/** */ +/** Table sink for protobuf table factory test. */ public class TestProtobufTableSink implements DynamicTableSink { private final EncodingFormat> encodingFormat; private final DataType dataType; diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSource.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSource.java index 150e8d65d1906..610a38ddbeea5 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSource.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/table/TestProtobufTableSource.java @@ -9,7 +9,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; -/** */ +/** Table source for protobuf table factory test. */ public class TestProtobufTableSource implements ScanTableSource { private final DecodingFormat> decodingFormat; private final DataType producedDataType;