diff --git a/docs/en/connector-v2/sink/Kudu.md b/docs/en/connector-v2/sink/Kudu.md index b6e4eee24c1..f885240498f 100644 --- a/docs/en/connector-v2/sink/Kudu.md +++ b/docs/en/connector-v2/sink/Kudu.md @@ -123,6 +123,78 @@ sink { } ``` +### Multiple Table + +```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "kudu_sink_1" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + }, + { + schema = { + table = "kudu_sink_2" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } + ] + } +} + + +sink { + kudu{ + kudu_masters = "kudu-master-multiple:7051" + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java index 0cc827f1ed5..898016b5cf8 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -28,18 +29,16 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo; import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState; -import com.google.auto.service.AutoService; - import java.io.IOException; /** * Kudu Sink implementation by using SeaTunnel sink API. This class contains the method to create * {@link AbstractSimpleSink}. */ -@AutoService(SeaTunnelSink.class) public class KuduSink implements SeaTunnelSink< - SeaTunnelRow, KuduSinkState, KuduCommitInfo, KuduAggregatedCommitInfo> { + SeaTunnelRow, KuduSinkState, KuduCommitInfo, KuduAggregatedCommitInfo>, + SupportMultiTableSink { private KuduSinkConfig kuduSinkConfig; private SeaTunnelRowType seaTunnelRowType; @@ -51,7 +50,7 @@ public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) { @Override public String getPluginName() { - return "kudu"; + return "Kudu"; } @Override diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java index fc94e94b883..3917d1cd62a 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java @@ -29,6 +29,8 @@ import com.google.auto.service.AutoService; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND; import static org.apache.kudu.client.SessionConfiguration.FlushMode.MANUAL_FLUSH; @@ -43,7 +45,8 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(KuduSinkConfig.MASTER, KuduSinkConfig.TABLE_NAME) + .required(KuduSinkConfig.MASTER) + .optional(KuduSinkConfig.TABLE_NAME) .optional(KuduSinkConfig.WORKER_COUNT) .optional(KuduSinkConfig.OPERATION_TIMEOUT) .optional(KuduSinkConfig.ADMIN_OPERATION_TIMEOUT) @@ -73,6 +76,13 @@ public OptionRule optionRule() { public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig config = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); + if (!config.getOptional(KuduSinkConfig.TABLE_NAME).isPresent()) { + Map map = config.toMap(); + map.put( + KuduSinkConfig.TABLE_NAME.key(), + catalogTable.getTableId().toTablePath().getFullName()); + config = ReadonlyConfig.fromMap(new HashMap<>(map)); + } KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config); return () -> new KuduSink(kuduSinkConfig, catalogTable); } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java index acf5b4ca2e3..82a28c0451a 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.sink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig; @@ -32,7 +33,9 @@ import java.util.Optional; @Slf4j -public class KuduSinkWriter implements SinkWriter { +public class KuduSinkWriter + implements SinkWriter, + SupportMultiTableSinkWriter { private SeaTunnelRowType seaTunnelRowType; private KuduOutputFormat fileWriter; diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java index 57b7925b0d9..9490208af59 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java @@ -28,14 +28,12 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceTableConfig; import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.stream.Collectors; @Slf4j -@AutoService(SeaTunnelSource.class) public class KuduSource implements SeaTunnelSource, SupportParallelism { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java index dd45471060f..733e0d39c94 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java @@ -36,6 +36,7 @@ import org.apache.kudu.client.RowResult; import org.apache.kudu.client.RowResultIterator; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; @@ -273,6 +274,7 @@ public List> readData(String tableName) throws KuduException { } @Override + @AfterAll public void tearDown() throws Exception { if (kuduClient != null) { kuduClient.close(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java index ae26aff7969..b0c8cba1fe3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java @@ -39,6 +39,7 @@ import org.apache.kudu.client.RowResult; import org.apache.kudu.client.RowResultIterator; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; @@ -274,6 +275,7 @@ public List readData(String tableName) throws KuduException { } @Override + @AfterAll public void tearDown() throws Exception { if (kuduClient != null) { kuduClient.close(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java index 11cd819b6c0..cc05653d5c3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java @@ -32,11 +32,15 @@ import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.OperationResponse; import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; @@ -62,9 +66,11 @@ import java.util.Enumeration; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.lang.String.format; +import static org.awaitility.Awaitility.await; @Slf4j @DisabledOnContainer( @@ -158,12 +164,11 @@ private void batchInsertData(String tableName) throws KuduException { row.addObject("val_decimal", new BigDecimal("1.1212")); row.addObject("val_string", "test"); row.addObject("val_unixtime_micros", new java.sql.Timestamp(1693477266998L)); - row.addObject("val_binary", "NEW".getBytes()); OperationResponse response = kuduSession.apply(insert); } } - private void initializeKuduTable() throws KuduException { + private void initializeKuduTable(String tableName) throws KuduException { List columns = new ArrayList(); @@ -210,10 +215,6 @@ private void initializeKuduTable() throws KuduException { new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", Type.UNIXTIME_MICROS) .nullable(true) .build()); - columns.add( - new ColumnSchema.ColumnSchemaBuilder("val_binary", Type.BINARY) - .nullable(true) - .build()); Schema schema = new Schema(columns); @@ -222,8 +223,7 @@ private void initializeKuduTable() throws KuduException { tableOptions.addHashPartitions(hashKeys, 2); tableOptions.setNumReplicas(1); - kuduClient.createTable("kudu_source_table_1", schema, tableOptions); - kuduClient.createTable("kudu_source_table_2", schema, tableOptions); + kuduClient.createTable(tableName, schema, tableOptions); } private void getKuduClient() { @@ -238,8 +238,10 @@ private void getKuduClient() { } @TestTemplate - public void testKudu(TestContainer container) throws IOException, InterruptedException { - initializeKuduTable(); + public void testKuduMultipleRead(TestContainer container) + throws IOException, InterruptedException { + initializeKuduTable("kudu_source_table_1"); + initializeKuduTable("kudu_source_table_2"); batchInsertData("kudu_source_table_1"); batchInsertData("kudu_source_table_2"); Container.ExecResult execResult = @@ -249,7 +251,62 @@ public void testKudu(TestContainer container) throws IOException, InterruptedExc kuduClient.deleteTable("kudu_source_table_2"); } + @TestTemplate + public void testKuduMultipleWrite(TestContainer container) + throws IOException, InterruptedException { + initializeKuduTable("kudu_sink_1"); + initializeKuduTable("kudu_sink_2"); + Container.ExecResult execResult = + container.executeJob("/fake_to_kudu_with_multipletable.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertAll( + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + "1", + "true", + "1", + "2", + "3", + "4", + "4.3", + "5.3", + "6.30000", + "NEW", + "2020-02-02 02:02:02.0")) + .collect(Collectors.toList()), + readData("kudu_sink_1")); + }, + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + "1", + "true", + "1", + "2", + "3", + "4", + "4.3", + "5.3", + "6.30000", + "NEW", + "2020-02-02 02:02:02.0")) + .collect(Collectors.toList()), + readData("kudu_sink_2")); + })); + + kuduClient.deleteTable("kudu_sink_1"); + kuduClient.deleteTable("kudu_sink_2"); + } + @Override + @AfterAll public void tearDown() throws Exception { if (kuduClient != null) { kuduClient.close(); @@ -264,6 +321,26 @@ public void tearDown() throws Exception { } } + public List> readData(String tableName) throws KuduException { + List> result = new ArrayList<>(); + KuduTable kuduTable = kuduClient.openTable(tableName); + KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build(); + while (scanner.hasMoreRows()) { + RowResultIterator rowResults = scanner.nextRows(); + List row = new ArrayList<>(); + while (rowResults.hasNext()) { + RowResult rowResult = rowResults.next(); + for (int i = 0; i < rowResult.getSchema().getColumns().size(); i++) { + row.add(rowResult.getObject(i).toString()); + } + } + if (!row.isEmpty()) { + result.add(row); + } + } + return result; + } + private static String getHostIPAddress() { try { Enumeration networkInterfaceEnumeration = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf new file mode 100644 index 00000000000..61dd950214d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "kudu_sink_1" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + }, + { + schema = { + table = "kudu_sink_2" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } + ] + } +} + + +sink { + kudu{ + kudu_masters = "kudu-master-multiple:7051" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf index 8e1fb641a73..93c7114850d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf @@ -21,8 +21,7 @@ env { # You can set engine configuration here execution.parallelism = 1 - job.mode = "STREAMING" - execution.checkpoint.interval = 5000 + job.mode = "BATCH" } source {