Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18202][PB format] New Format of protobuf #14376

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
888696d
[FLINK-18202][formats] introduce new format and add init version of f…
maosuhan Dec 12, 2020
8e0745c
[FLINK-18202][flink-protobuf] add connector param write-null-string-l…
maosuhan Apr 27, 2021
07d420e
[FLINK-18202][flink-protobuf] change class compile implementation
maosuhan Nov 19, 2021
5711ab4
[FLINK-18202][flink-protobuf] add flink prootbuf format SQL integrati…
maosuhan Nov 19, 2021
57a8e37
[FLINK-18202][flink protobuf] add IT test for write-null-string-literal
maosuhan Apr 28, 2021
e3fa43d
[FLINK-18202][flink-protobuf] support java_package option in proto fi…
maosuhan Jun 16, 2021
0a051f0
[FLINK-18202][flink-protobuf] fix bug of ByteString cannot compile be…
maosuhan Nov 19, 2021
ea13b90
[FLINK-18202][flink-protobuf] simplify pom.xml
maosuhan Nov 19, 2021
6a97696
[FLINK-18202][flink-protobuf] change project version to 1.15-SNAPSHOT
maosuhan Nov 18, 2021
0e4a169
[FLINK-18202][flink-protobuf] remove flink-test-utils scala suffix ve…
maosuhan Nov 21, 2021
159bed8
[FLINK-18202][flink-protobuf] add more test cases for read_default_va…
maosuhan Nov 21, 2021
8ac2c7d
[FLINK-18202][flink-protobuf] add license files to fix IC error
maosuhan Nov 21, 2021
73ab80f
[FLINK-18202][flink-protobuf] rebase master 1.16-SNAPSHOT
maosuhan Jun 20, 2022
c6238a5
[FLINK-18202][flink-protobuf] adaptively convert protobuf enum and nu…
maosuhan Jun 20, 2022
0ea2b62
[FLINK-18202][flink-protobuf] fix bug of protobuf outerclassname
maosuhan Dec 11, 2021
94d5c1d
[FLINK-18202][flink-protobuf] change to context classloader
maosuhan Jun 20, 2022
80591ee
[FLINK-18202][flink-protobuf] add new module flink-sql-protobuf
maosuhan Jun 20, 2022
ccde9b6
[FLINK-18202][flink-protobuf] reformat according to PR suggestion
maosuhan Jun 25, 2022
da0b6bc
Add indent for PbCodegenAppender
libenchao Jun 27, 2022
3f89d1e
Add indent for PbCodegenAppender of deserializer
maosuhan Jul 5, 2022
40c4275
not detect codegen error in constructor of PbRowDataDeserializationSc…
maosuhan Jul 5, 2022
5ac3ef8
simplify unit test
maosuhan Jul 5, 2022
5a1f064
[FLINK-18202][flink-protobuf] fix code
maosuhan Jul 15, 2022
9b62a54
[FLINK-18202][flink-protobuf] fix code
maosuhan Jul 15, 2022
c1399a5
[FLINK-18202][flink-protobuf] fix code
maosuhan Jul 18, 2022
46001db
[FLINK-18202][flink-protobuf] fix classloader issue
maosuhan Jul 18, 2022
5cf1de8
[FLINK-18202][flink-protobuf] fix flink-sql-protobuf NOTICE
maosuhan Jul 20, 2022
f15756e
Update flink-formats/flink-sql-protobuf/src/main/resources/META-INF/N…
maosuhan Jul 26, 2022
9504205
[FLINK-18202][flink-protobuf] update protobuf bundle lisence
maosuhan Jul 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions flink-formats/flink-protobuf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-formats</artifactId>
<version>1.16-SNAPSHOT</version>
</parent>

<artifactId>flink-protobuf</artifactId>
<name>Flink : Formats : Protobuf</name>

<packaging>jar</packaging>

<properties>
<!-- the same with flink-table/pom.xml -->
<janino.version>3.0.11</janino.version>
</properties>

<dependencies>
<!-- core dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protoc.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<!-- this should be the same version of flink-table module -->
<version>${janino.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added flink-table-planner twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is jar and another is test jar, there will be exception if I remove one of them.

<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- test utils dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${protoc.version}</protocVersion>
maosuhan marked this conversation as resolved.
Show resolved Hide resolved
<inputDirectories>
<include>src/test/proto</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<addSources>none</addSources>
<outputDirectory>target/test-proto-sources</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>target/test-proto-sources</source>
maosuhan marked this conversation as resolved.
Show resolved Hide resolved
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.protobuf;
maosuhan marked this conversation as resolved.
Show resolved Hide resolved

/** This class is to access internal method in protobuf package. */
public class ProtobufInternalUtils {
/** convert underscore name to camel name. */
public static String underScoreToCamelCase(String name, boolean capNext) {
return SchemaUtil.toCamelCase(name, capNext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

/** Exception represents codegen error in row and proto conversion which is probably a bug. */
public class PbCodegenException extends Exception {
public PbCodegenException() {}

public PbCodegenException(String message) {
super(message);
}

public PbCodegenException(String message, Throwable cause) {
super(message, cause);
}

public PbCodegenException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

/** Keeps protobuf constants separately. */
public class PbConstant {
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
public static final String PB_METHOD_PARSE_FROM = "parseFrom";
public static final String GENERATED_DECODE_METHOD = "decode";
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
public static final String PB_MAP_VALUE_NAME = "value";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

/** {@link DecodingFormat} for protobuf decoding. */
public class PbDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
private final PbFormatConfig formatConfig;

public PbDecodingFormat(PbFormatConfig formatConfig) {
this.formatConfig = formatConfig;
}

@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
return new PbRowDataDeserializationSchema(rowType, rowDataTypeInfo, formatConfig);
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

/** {@link EncodingFormat} for protobuf encoding. */
public class PbEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
private final PbFormatConfig pbFormatConfig;

public PbEncodingFormat(PbFormatConfig pbFormatConfig) {
this.pbFormatConfig = pbFormatConfig;
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}

@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
RowType rowType = (RowType) consumedDataType.getLogicalType();
return new PbRowDataSerializationSchema(rowType, pbFormatConfig);
}
}
Loading