Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support avro format & support run action on fork repo #5064

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
name: Backend
on:
push:
pull_request:
branches:
- dev
pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'
Expand All @@ -32,7 +32,7 @@ concurrency:

jobs:
license-header:
if: github.repository == 'apache/seatunnel'
if: github.repository == '${{github.actor}}/seatunnel'
name: License header
runs-on: ubuntu-latest
timeout-minutes: 10
Expand All @@ -44,7 +44,7 @@ jobs:
uses: apache/skywalking-eyes@985866ce7e324454f61e22eb2db2e998db09d6f3

code-style:
if: github.repository == 'apache/seatunnel'
if: github.repository == '${{github.actor}}/seatunnel'
name: Code style
runs-on: ubuntu-latest
timeout-minutes: 10
Expand All @@ -56,7 +56,7 @@ jobs:
run: ./mvnw --batch-mode --quiet --no-snapshot-updates clean spotless:check

dead-link:
if: github.repository == 'apache/seatunnel'
if: github.repository == '${{github.actor}}/seatunnel'
name: Dead links
runs-on: ubuntu-latest
timeout-minutes: 30
Expand All @@ -69,7 +69,7 @@ jobs:
done

sanity-check:
if: github.repository == 'apache/seatunnel'
if: github.repository == '${{github.actor}}/seatunnel'
name: Sanity check results
needs: [ license-header, code-style, dead-link ]
runs-on: ubuntu-latest
Expand All @@ -83,8 +83,7 @@ jobs:

changes:
runs-on: ubuntu-latest
# To prevent error when there's no base branch
if: github.repository == 'apache/seatunnel'
if: github.repository == '${{github.actor}}/seatunnel'
timeout-minutes: 10
outputs:
api: ${{ steps.filter.outputs.api }}
Expand Down
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on

Kafka Topic.

Currently two formats are supported:
Currently, two formats are supported:

1. Fill in the name of the topic.

Expand Down Expand Up @@ -108,7 +108,7 @@ Kafka distinguishes different transactions by different transactionId. This para

### format

Data format. The default format is json. Optional text format. The default field separator is ",".
Data format. The default format is json. Optional text, avro format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.

### field_delimiter
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ The structure of the data, including field names and field types.

## format

Data format. The default format is json. Optional text format. The default field separator is ", ".
Data format. The default format is json. Optional text, avro format. The default field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.

## format_error_handle_way
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-avro</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
COMPATIBLE_DEBEZIUM_JSON
COMPATIBLE_DEBEZIUM_JSON,
AVRO
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.format.avro.AvroSerializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
Expand Down Expand Up @@ -221,6 +222,8 @@ private static SerializationSchema createSerializationSchema(
return new CanalJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
case AVRO:
return new AvroSerializationSchema(rowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public OptionRule optionRule() {
.conditional(
Config.FORMAT,
Arrays.asList(
MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
MessageFormat.JSON,
MessageFormat.CANAL_JSON,
MessageFormat.TEXT,
MessageFormat.AVRO),
Config.TOPIC)
.optional(
Config.KAFKA_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
Expand Down Expand Up @@ -266,6 +267,9 @@ private void setDeserialization(Config config) {
.setIgnoreParseErrors(true)
.build();
break;
case AVRO:
deserializationSchema = new AvroDeserializationSchema(typeInfo);
break;
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
Expand Down
3 changes: 2 additions & 1 deletion seatunnel-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 - http://github.com/airlift/aircompressor)
(Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations)
(The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.8.2 - http://avro.apache.org)
(Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.10.2 - http://avro.apache.org)
(Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
(Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Commons Lang (commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,28 @@ public void testSourceKafkaStartConfig(TestContainer container)
testKafkaGroupOffsetsToConsole(container);
}

@TestTemplate
public void testFakeSourceToKafkaAvroFormat(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

@TestTemplate
public void testKafkaAvroToConsole(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
DefaultSeaTunnelRowSerializer.create(
"test_avro_topic",
SEATUNNEL_ROW_TYPE,
MessageFormat.AVRO,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

public void testKafkaLatestToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
result_table_name = "fake"
}
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
format = avro
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
format = avro
format_error_handle_way = skip
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
Console {
source_table_name = "kafka_table"
}
Assert {
source_table_name = "kafka_table"
rules =
{
field_rules = [
{
field_name = id
field_type = long
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN
rule_value = 0
},
{
rule_type = MAX
rule_value = 99
}
]
}
]
}
}
}
1 change: 1 addition & 0 deletions seatunnel-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<module>seatunnel-format-json</module>
<module>seatunnel-format-text</module>
<module>seatunnel-format-compatible-debezium-json</module>
<module>seatunnel-format-avro</module>
</modules>

</project>
Loading