-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18202][PB format] New Format of protobuf #14376
Closed
+5,655
−0
Closed
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
888696d
[FLINK-18202][formats] introduce new format and add init version of f…
maosuhan 8e0745c
[FLINK-18202][flink-protobuf] add connector param write-null-string-l…
maosuhan 07d420e
[FLINK-18202][flink-protobuf] change class compile implementation
maosuhan 5711ab4
[FLINK-18202][flink-protobuf] add flink prootbuf format SQL integrati…
maosuhan 57a8e37
[FLINK-18202][flink protobuf] add IT test for write-null-string-literal
maosuhan e3fa43d
[FLINK-18202][flink-protobuf] support java_package option in proto fi…
maosuhan 0a051f0
[FLINK-18202][flink-protobuf] fix bug of ByteString cannot compile be…
maosuhan ea13b90
[FLINK-18202][flink-protobuf] simplify pom.xml
maosuhan 6a97696
[FLINK-18202][flink-protobuf] change project version to 1.15-SNAPSHOT
maosuhan 0e4a169
[FLINK-18202][flink-protobuf] remove flink-test-utils scala suffix ve…
maosuhan 159bed8
[FLINK-18202][flink-protobuf] add more test cases for read_default_va…
maosuhan 8ac2c7d
[FLINK-18202][flink-protobuf] add license files to fix IC error
maosuhan 73ab80f
[FLINK-18202][flink-protobuf] rebase master 1.16-SNAPSHOT
maosuhan c6238a5
[FLINK-18202][flink-protobuf] adaptively convert protobuf enum and nu…
maosuhan 0ea2b62
[FLINK-18202][flink-protobuf] fix bug of protobuf outerclassname
maosuhan 94d5c1d
[FLINK-18202][flink-protobuf] change to context classloader
maosuhan 80591ee
[FLINK-18202][flink-protobuf] add new module flink-sql-protobuf
maosuhan ccde9b6
[FLINK-18202][flink-protobuf] reformat according to PR suggestion
maosuhan da0b6bc
Add indent for PbCodegenAppender
libenchao 3f89d1e
Add indent for PbCodegenAppender of deserializer
maosuhan 40c4275
not detect codegen error in constructor of PbRowDataDeserializationSc…
maosuhan 5ac3ef8
simplify unit test
maosuhan 5a1f064
[FLINK-18202][flink-protobuf] fix code
maosuhan 9b62a54
[FLINK-18202][flink-protobuf] fix code
maosuhan c1399a5
[FLINK-18202][flink-protobuf] fix code
maosuhan 46001db
[FLINK-18202][flink-protobuf] fix classloader issue
maosuhan 5cf1de8
[FLINK-18202][flink-protobuf] fix flink-sql-protobuf NOTICE
maosuhan f15756e
Update flink-formats/flink-sql-protobuf/src/main/resources/META-INF/N…
maosuhan 9504205
[FLINK-18202][flink-protobuf] update protobuf bundle lisence
maosuhan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
|
||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-formats</artifactId> | ||
<version>1.16-SNAPSHOT</version> | ||
</parent> | ||
|
||
<artifactId>flink-protobuf</artifactId> | ||
<name>Flink : Formats : Protobuf</name> | ||
|
||
<packaging>jar</packaging> | ||
|
||
<properties> | ||
<!-- the same with flink-table/pom.xml --> | ||
<janino.version>3.0.11</janino.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<!-- core dependencies --> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-core</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-common</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.google.protobuf</groupId> | ||
<artifactId>protobuf-java</artifactId> | ||
<version>${protoc.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.codehaus.janino</groupId> | ||
<artifactId>janino</artifactId> | ||
<!-- this should be the same version of flink-table module --> | ||
<version>${janino.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- test dependencies --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-planner_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-planner_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
<type>test-jar</type> | ||
</dependency> | ||
|
||
<!-- test utils dependency --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-test-utils</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>com.github.os72</groupId> | ||
<artifactId>protoc-jar-maven-plugin</artifactId> | ||
<version>3.11.4</version> | ||
<executions> | ||
<execution> | ||
<phase>generate-sources</phase> | ||
<goals> | ||
<goal>run</goal> | ||
</goals> | ||
<configuration> | ||
<protocVersion>${protoc.version}</protocVersion> | ||
maosuhan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
<inputDirectories> | ||
<include>src/test/proto</include> | ||
</inputDirectories> | ||
<outputTargets> | ||
<outputTarget> | ||
<type>java</type> | ||
<addSources>none</addSources> | ||
<outputDirectory>target/test-proto-sources</outputDirectory> | ||
</outputTarget> | ||
</outputTargets> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.codehaus.mojo</groupId> | ||
<artifactId>build-helper-maven-plugin</artifactId> | ||
<version>3.0.0</version> | ||
<executions> | ||
<execution> | ||
<phase>generate-sources</phase> | ||
<goals> | ||
<goal>add-test-source</goal> | ||
</goals> | ||
<configuration> | ||
<sources> | ||
<source>target/test-proto-sources</source> | ||
maosuhan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
</sources> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
27 changes: 27 additions & 0 deletions
27
flink-formats/flink-protobuf/src/main/java/com/google/protobuf/ProtobufInternalUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.protobuf; | ||
maosuhan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** This class is to access internal method in protobuf package. */ | ||
public class ProtobufInternalUtils { | ||
/** convert underscore name to camel name. */ | ||
public static String underScoreToCamelCase(String name, boolean capNext) { | ||
return SchemaUtil.toCamelCase(name, capNext); | ||
} | ||
} |
36 changes: 36 additions & 0 deletions
36
...ts/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodegenException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.formats.protobuf; | ||
|
||
/** Exception represents codegen error in row and proto conversion which is probably a bug. */ | ||
public class PbCodegenException extends Exception { | ||
public PbCodegenException() {} | ||
|
||
public PbCodegenException(String message) { | ||
super(message); | ||
} | ||
|
||
public PbCodegenException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public PbCodegenException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.formats.protobuf; | ||
|
||
/** Keeps protobuf constants separately. */ | ||
public class PbConstant { | ||
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor"; | ||
public static final String PB_METHOD_PARSE_FROM = "parseFrom"; | ||
public static final String GENERATED_DECODE_METHOD = "decode"; | ||
public static final String GENERATED_ENCODE_METHOD = "encode"; | ||
public static final String PB_MAP_KEY_NAME = "key"; | ||
public static final String PB_MAP_VALUE_NAME = "value"; | ||
} |
52 changes: 52 additions & 0 deletions
52
...mats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbDecodingFormat.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.formats.protobuf; | ||
|
||
import org.apache.flink.api.common.serialization.DeserializationSchema; | ||
import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; | ||
import org.apache.flink.table.connector.ChangelogMode; | ||
import org.apache.flink.table.connector.format.DecodingFormat; | ||
import org.apache.flink.table.connector.source.DynamicTableSource; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.types.DataType; | ||
import org.apache.flink.table.types.logical.RowType; | ||
|
||
/** {@link DecodingFormat} for protobuf decoding. */ | ||
public class PbDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { | ||
private final PbFormatConfig formatConfig; | ||
|
||
public PbDecodingFormat(PbFormatConfig formatConfig) { | ||
this.formatConfig = formatConfig; | ||
} | ||
|
||
@Override | ||
public DeserializationSchema<RowData> createRuntimeDecoder( | ||
DynamicTableSource.Context context, DataType producedDataType) { | ||
final RowType rowType = (RowType) producedDataType.getLogicalType(); | ||
final TypeInformation<RowData> rowDataTypeInfo = | ||
context.createTypeInformation(producedDataType); | ||
return new PbRowDataDeserializationSchema(rowType, rowDataTypeInfo, formatConfig); | ||
} | ||
|
||
@Override | ||
public ChangelogMode getChangelogMode() { | ||
return ChangelogMode.insertOnly(); | ||
} | ||
} |
49 changes: 49 additions & 0 deletions
49
...mats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbEncodingFormat.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.formats.protobuf; | ||
|
||
import org.apache.flink.api.common.serialization.SerializationSchema; | ||
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; | ||
import org.apache.flink.table.connector.ChangelogMode; | ||
import org.apache.flink.table.connector.format.EncodingFormat; | ||
import org.apache.flink.table.connector.sink.DynamicTableSink; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.types.DataType; | ||
import org.apache.flink.table.types.logical.RowType; | ||
|
||
/** {@link EncodingFormat} for protobuf encoding. */ | ||
public class PbEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> { | ||
private final PbFormatConfig pbFormatConfig; | ||
|
||
public PbEncodingFormat(PbFormatConfig pbFormatConfig) { | ||
this.pbFormatConfig = pbFormatConfig; | ||
} | ||
|
||
@Override | ||
public ChangelogMode getChangelogMode() { | ||
return ChangelogMode.insertOnly(); | ||
} | ||
|
||
@Override | ||
public SerializationSchema<RowData> createRuntimeEncoder( | ||
DynamicTableSink.Context context, DataType consumedDataType) { | ||
RowType rowType = (RowType) consumedDataType.getLogicalType(); | ||
return new PbRowDataSerializationSchema(rowType, pbFormatConfig); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You added
flink-table-planner
twice.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One is jar and another is test jar, there will be exception if I remove one of them.