Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved the Batch tasks to not require an explicit columns specification #358

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -49,6 +49,16 @@
"password: ch_passwd",
"sql: INSERT INTO YourTable ( field1, field2, field3 ) SETTINGS async_insert=1, wait_for_async_insert=1 values( ?, ?, ? )"
}
),
@Example(
title = "Insert data into specific columns via a SQL query to a ClickHouse database using asynchronous inserts.",
code = {
"from: \"{{ outputs.query.uri }}\"",
"url: jdbc:clickhouse://127.0.0.1:56982/",
"username: ch_user",
"password: ch_passwd",
"table: YourTable"
}
)
}
)
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package io.kestra.plugin.jdbc.clickhouse;

import com.clickhouse.client.internal.google.protobuf.UInt32Value;
import com.clickhouse.client.internal.google.protobuf.UInt64Value;
import com.clickhouse.client.internal.google.type.Decimal;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.value.ClickHouseNestedValue;
import com.clickhouse.data.value.ClickHouseTupleValue;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import io.kestra.core.junit.annotations.KestraTest;
import org.h2.value.ValueTinyint;
import org.junit.jupiter.api.Test;
import reactor.util.function.Tuple2;

import java.io.*;
import java.math.BigDecimal;
@@ -17,6 +26,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
@@ -106,7 +117,7 @@ void updateBatch() throws Exception {
FileSerde.write(output, ImmutableMap.builder()
.put("String", "kestra")
.build()
);
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
@@ -140,6 +151,67 @@ void updateBatch() throws Exception {
assertThat(runOutput.getRows().stream().anyMatch(map -> map.get("String").equals("kestra")), is(true));
}

@Test
public void noSqlForInsert() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 1; i < 6; i++) {
FileSerde.write(output, Arrays.asList(
i,
2147483645.1234,
2147483645.1234,
BigDecimal.valueOf(2147483645.1234),
"four"
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

BulkInsert task = BulkInsert.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.from(uri.toString())
.table("clickhouse_ins")
.build();

AbstractJdbcBatch.Output runOutput = task.run(runContext);

assertThat(runOutput.getRowCount(), is(5L));
}

@Test
public void noSqlWithNamedColumnsForInsert() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 1; i < 6; i++) {
FileSerde.write(output, List.of(
"Mario"
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

BulkInsert task = BulkInsert.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.from(uri.toString())
.table("clickhouse_types")
.columns(List.of("String"))
.build();

AbstractJdbcBatch.Output runOutput = task.run(runContext);

assertThat(runOutput.getRowCount(), is(5L));
}

@Override
protected String getUrl() {
return "jdbc:clickhouse://127.0.0.1:28123/default";
13 changes: 13 additions & 0 deletions plugin-jdbc-clickhouse/src/test/resources/scripts/clickhouse.sql
Original file line number Diff line number Diff line change
@@ -84,3 +84,16 @@ VALUES (


select * from clickhouse_types;

DROP TABLE IF EXISTS clickhouse_ins;

CREATE TABLE clickhouse_ins (
Int64 Int64,
Float32 Float32,
Float64 Float64,
Decimal Decimal(14, 4),
String String,
)
ENGINE = MergeTree()
ORDER BY (Int64)
SETTINGS index_granularity = 8192;
Original file line number Diff line number Diff line change
@@ -51,6 +51,30 @@
" sql: |",
" insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )",
}
),
@Example(
title = "Fetch rows from a table, and bulk insert them to another one, without using sql query.",
full = true,
code = {
"tasks:",
" - id: query",
" type: io.kestra.plugin.jdbc.mysql.Query",
" url: jdbc:mysql://127.0.0.1:3306/",
" username: mysql_user",
" password: mysql_passwd",
" sql: |",
" SELECT *",
" FROM xref",
" LIMIT 1500;",
" store: true",
" - id: update",
" type: io.kestra.plugin.jdbc.mysql.Batch",
" from: \"{{ outputs.query.uri }}\"",
" url: jdbc:mysql://127.0.0.1:3306/",
" username: mysql_user",
" password: mysql_passwd",
" table: xref"
}
)
}
)
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import java.sql.SQLException;
import java.time.*;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -186,6 +187,83 @@ public void namedColumnsInsert() throws Exception {
assertThat(runOutput.getRowCount(), is(5L));
}

@Test
public void noSqlForInsert() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 1; i < 6; i++) {
FileSerde.write(output, Arrays.asList(
i,
true,
"four",
"Here is a varchar",
"Here is a text column data",
null,
-9223372036854775808L,
1844674407370955161L,
new byte[]{0b000101},
9223372036854776000F,
9223372036854776000D,
2147483645.1234D,
new BigDecimal("5.36"),
new BigDecimal("999.99"),
LocalDate.parse("2030-12-25"),
"2050-12-31 22:59:57.150150",
LocalTime.parse("04:05:30"),
"2004-10-19T10:23:54.999999",
"2025",
"{\"color\": \"red\", \"value\": \"#f00\"}",
Hex.decodeHex("DEADBEEF".toCharArray())
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.from(uri.toString())
.table("mysql_types")
.build();

AbstractJdbcBatch.Output runOutput = task.run(runContext);

assertThat(runOutput.getRowCount(), is(5L));
}

@Test
public void noSqlWithNamedColumnsForInsert() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 1; i < 6; i++) {
FileSerde.write(output, List.of(
"Mario"
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.from(uri.toString())
.table("namedInsert")
.columns(List.of("name"))
.build();

AbstractJdbcBatch.Output runOutput = task.run(runContext);

assertThat(runOutput.getRowCount(), is(5L));
}

@Override
protected String getUrl() {
return "jdbc:mysql://127.0.0.1:64790/kestra";
Original file line number Diff line number Diff line change
@@ -50,6 +50,30 @@
" sql: |",
" insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )",
}
),
@Example(
title = "Fetch rows from a table and bulk insert to another one, without using sql query",
full = true,
code = {
"tasks:",
" - id: query",
" type: io.kestra.plugin.jdbc.oracle.Query",
" url: jdbc:oracle:thin:@dev:49161:XE",
" username: oracle",
" password: oracle_passwd",
" sql: |",
" SELECT *",
" FROM xref",
" LIMIT 1500;",
" store: true",
" - id: update",
" type: io.kestra.plugin.jdbc.oracle.Batch",
" from: \"{{ outputs.query.uri }}\"",
" url: jdbc:oracle:thin:@prod:49161:XE",
" username: oracle",
" password: oracle_passwd",
" table: XREF"
}
)
}
)
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
@@ -121,7 +122,6 @@ public void namedColumnsInsert() throws Exception {
);
}


URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

ArrayList<String> columns = new ArrayList<>();
@@ -142,6 +142,83 @@ public void namedColumnsInsert() throws Exception {
assertThat(runOutput.getRowCount(), is(5L));
}

@Test
public void noSqlForInsert() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 0; i < 5; i++) {
FileSerde.write(output, List.of(
"aa",
"",
"bb",
"cc",
"dd",
"ee",
"ff".getBytes(StandardCharsets.UTF_8),
"gg",
"hh",
BigDecimal.valueOf(7456123.89),
BigDecimal.valueOf(7456123.9),
BigDecimal.valueOf(7456124),
BigDecimal.valueOf(7456123.89),
BigDecimal.valueOf(7456123.9),
BigDecimal.valueOf(7456100),
7456123.89F,
7456123.89D,
LocalDate.parse("1992-11-13"),
LocalDateTime.parse("1998-01-23T06:00:00"),
ZonedDateTime.parse("1998-01-23T06:00:00-05:00"),
LocalDateTime.parse("1998-01-23T12:00:00")
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.from(uri.toString())
.table("ORACLE_TYPES")
.build();

AbstractJdbcBatch.Output runOutput = task.run(runContext);

assertThat(runOutput.getRowCount(), is(5L));
}

@Test
public void noSqlWithNamedColumnsForInsert() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 1; i < 6; i++) {
FileSerde.write(output, List.of(
"It's-a me, Mario"
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.from(uri.toString())
.table("namedInsert")
.columns(List.of("t_name"))
.build();

AbstractJdbcBatch.Output runOutput = task.run(runContext);

assertThat(runOutput.getRowCount(), is(5L));
}

@Override
protected String getUrl() {
return "jdbc:oracle:thin:@localhost:49161:XE";
Loading