Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support multi-table sink feature for kudu #5951

Merged
merged 7 commits into from
Dec 14, 2023
72 changes: 72 additions & 0 deletions docs/en/connector-v2/sink/Kudu.md
Original file line number Diff line number Diff line change
@@ -123,6 +123,78 @@ sink {
}
```

### Multiple Table

```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "kudu_sink_1"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
},
{
schema = {
table = "kudu_sink_2"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
]
}
}


sink {
kudu{
kudu_masters = "kudu-master-multiple:7051"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,18 +29,16 @@
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;

import com.google.auto.service.AutoService;

import java.io.IOException;

/**
* Kudu Sink implementation by using SeaTunnel sink API. This class contains the method to create
* {@link AbstractSimpleSink}.
*/
@AutoService(SeaTunnelSink.class)
public class KuduSink
implements SeaTunnelSink<
SeaTunnelRow, KuduSinkState, KuduCommitInfo, KuduAggregatedCommitInfo> {
SeaTunnelRow, KuduSinkState, KuduCommitInfo, KuduAggregatedCommitInfo>,
SupportMultiTableSink {

private KuduSinkConfig kuduSinkConfig;
private SeaTunnelRowType seaTunnelRowType;
@@ -51,7 +50,7 @@ public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {

@Override
public String getPluginName() {
return "kudu";
return "Kudu";
}

@Override
Original file line number Diff line number Diff line change
@@ -29,6 +29,8 @@
import com.google.auto.service.AutoService;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
import static org.apache.kudu.client.SessionConfiguration.FlushMode.MANUAL_FLUSH;
@@ -43,7 +45,8 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(KuduSinkConfig.MASTER, KuduSinkConfig.TABLE_NAME)
.required(KuduSinkConfig.MASTER)
.optional(KuduSinkConfig.TABLE_NAME)
.optional(KuduSinkConfig.WORKER_COUNT)
.optional(KuduSinkConfig.OPERATION_TIMEOUT)
.optional(KuduSinkConfig.ADMIN_OPERATION_TIMEOUT)
@@ -73,6 +76,13 @@ public OptionRule optionRule() {
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
if (!config.getOptional(KuduSinkConfig.TABLE_NAME).isPresent()) {
Map<String, String> map = config.toMap();
map.put(
KuduSinkConfig.TABLE_NAME.key(),
catalogTable.getTableId().toTablePath().getFullName());
config = ReadonlyConfig.fromMap(new HashMap<>(map));
}
KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
return () -> new KuduSink(kuduSinkConfig, catalogTable);
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.sink;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
@@ -32,7 +33,9 @@
import java.util.Optional;

@Slf4j
public class KuduSinkWriter implements SinkWriter<SeaTunnelRow, KuduCommitInfo, KuduSinkState> {
public class KuduSinkWriter
implements SinkWriter<SeaTunnelRow, KuduCommitInfo, KuduSinkState>,
SupportMultiTableSinkWriter<Void> {

private SeaTunnelRowType seaTunnelRowType;
private KuduOutputFormat fileWriter;
Original file line number Diff line number Diff line change
@@ -28,14 +28,12 @@
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@AutoService(SeaTunnelSource.class)
public class KuduSource
implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSourceState>,
SupportParallelism {
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -273,6 +274,7 @@ public List<List<Object>> readData(String tableName) throws KuduException {
}

@Override
@AfterAll
public void tearDown() throws Exception {
if (kuduClient != null) {
kuduClient.close();
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -274,6 +275,7 @@ public List<String> readData(String tableName) throws KuduException {
}

@Override
@AfterAll
public void tearDown() throws Exception {
if (kuduClient != null) {
kuduClient.close();
Original file line number Diff line number Diff line change
@@ -32,11 +32,15 @@
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -62,9 +66,11 @@
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.awaitility.Awaitility.await;

@Slf4j
@DisabledOnContainer(
@@ -158,12 +164,11 @@ private void batchInsertData(String tableName) throws KuduException {
row.addObject("val_decimal", new BigDecimal("1.1212"));
row.addObject("val_string", "test");
row.addObject("val_unixtime_micros", new java.sql.Timestamp(1693477266998L));
row.addObject("val_binary", "NEW".getBytes());
OperationResponse response = kuduSession.apply(insert);
}
}

private void initializeKuduTable() throws KuduException {
private void initializeKuduTable(String tableName) throws KuduException {

List<ColumnSchema> columns = new ArrayList();

@@ -210,10 +215,6 @@ private void initializeKuduTable() throws KuduException {
new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", Type.UNIXTIME_MICROS)
.nullable(true)
.build());
columns.add(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove binary type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

binary type, difficult to generate in fake, and difficult to compare,KuduIT contains all types of checks

new ColumnSchema.ColumnSchemaBuilder("val_binary", Type.BINARY)
.nullable(true)
.build());

Schema schema = new Schema(columns);

@@ -222,8 +223,7 @@ private void initializeKuduTable() throws KuduException {

tableOptions.addHashPartitions(hashKeys, 2);
tableOptions.setNumReplicas(1);
kuduClient.createTable("kudu_source_table_1", schema, tableOptions);
kuduClient.createTable("kudu_source_table_2", schema, tableOptions);
kuduClient.createTable(tableName, schema, tableOptions);
}

private void getKuduClient() {
@@ -238,8 +238,10 @@ private void getKuduClient() {
}

@TestTemplate
public void testKudu(TestContainer container) throws IOException, InterruptedException {
initializeKuduTable();
public void testKuduMultipleRead(TestContainer container)
throws IOException, InterruptedException {
initializeKuduTable("kudu_source_table_1");
initializeKuduTable("kudu_source_table_2");
batchInsertData("kudu_source_table_1");
batchInsertData("kudu_source_table_2");
Container.ExecResult execResult =
@@ -249,7 +251,62 @@ public void testKudu(TestContainer container) throws IOException, InterruptedExc
kuduClient.deleteTable("kudu_source_table_2");
}

@TestTemplate
public void testKuduMultipleWrite(TestContainer container)
throws IOException, InterruptedException {
initializeKuduTable("kudu_sink_1");
initializeKuduTable("kudu_sink_2");
Container.ExecResult execResult =
container.executeJob("/fake_to_kudu_with_multipletable.conf");
Assertions.assertEquals(0, execResult.getExitCode());

await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertAll(
() -> {
Assertions.assertIterableEquals(
Stream.<List<Object>>of(
Arrays.asList(
"1",
"true",
"1",
"2",
"3",
"4",
"4.3",
"5.3",
"6.30000",
"NEW",
"2020-02-02 02:02:02.0"))
.collect(Collectors.toList()),
readData("kudu_sink_1"));
},
() -> {
Assertions.assertIterableEquals(
Stream.<List<Object>>of(
Arrays.asList(
"1",
"true",
"1",
"2",
"3",
"4",
"4.3",
"5.3",
"6.30000",
"NEW",
"2020-02-02 02:02:02.0"))
.collect(Collectors.toList()),
readData("kudu_sink_2"));
}));

kuduClient.deleteTable("kudu_sink_1");
kuduClient.deleteTable("kudu_sink_2");
}

@Override
@AfterAll
public void tearDown() throws Exception {
if (kuduClient != null) {
kuduClient.close();
@@ -264,6 +321,26 @@ public void tearDown() throws Exception {
}
}

public List<List<Object>> readData(String tableName) throws KuduException {
List<List<Object>> result = new ArrayList<>();
KuduTable kuduTable = kuduClient.openTable(tableName);
KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
while (scanner.hasMoreRows()) {
RowResultIterator rowResults = scanner.nextRows();
List<Object> row = new ArrayList<>();
while (rowResults.hasNext()) {
RowResult rowResult = rowResults.next();
for (int i = 0; i < rowResult.getSchema().getColumns().size(); i++) {
row.add(rowResult.getObject(i).toString());
}
}
if (!row.isEmpty()) {
result.add(row);
}
}
return result;
}

private static String getHostIPAddress() {
try {
Enumeration<NetworkInterface> networkInterfaceEnumeration =
Loading