From b885d942d5863005d3a6b2c61ebf36477b9e2759 Mon Sep 17 00:00:00 2001 From: wolfhack Date: Fri, 16 Aug 2024 21:56:21 +0300 Subject: [PATCH 1/8] Added code, that allows users to pass table name, from which will be retrieved columns names for insert --- .../jdbc/clickhouse/ClickHouseTest.java | 38 ++++++++- .../io/kestra/plugin/jdbc/mysql/Batch.java | 24 ++++++ .../kestra/plugin/jdbc/mysql/BatchTest.java | 78 ++++++++++++++++++ .../io/kestra/plugin/jdbc/oracle/Batch.java | 24 ++++++ .../kestra/plugin/jdbc/oracle/BatchTest.java | 79 +++++++++++++++++- .../kestra/plugin/jdbc/postgresql/Batch.java | 25 ++++++ .../plugin/jdbc/postgresql/BatchTest.java | 80 ++++++++++++++++++ .../kestra/plugin/jdbc/sqlserver/Batch.java | 24 ++++++ .../plugin/jdbc/sqlserver/BatchTest.java | 82 +++++++++++++++++++ .../kestra/plugin/jdbc/vectorwise/Batch.java | 24 ++++++ .../plugin/jdbc/vectorwise/BatchTest.java | 78 ++++++++++++++++++ .../io/kestra/plugin/jdbc/vertica/Batch.java | 25 ++++++ .../kestra/plugin/jdbc/vertica/BatchTest.java | 58 +++++++++++++ .../kestra/plugin/jdbc/AbstractJdbcBatch.java | 57 +++++++++++-- 14 files changed, 686 insertions(+), 10 deletions(-) diff --git a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java index 5fa1972c..34b317d6 100644 --- a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java +++ b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java @@ -1,13 +1,18 @@ package io.kestra.plugin.jdbc.clickhouse; +import com.clickhouse.client.internal.google.protobuf.UInt32Value; +import com.clickhouse.client.internal.google.protobuf.UInt64Value; import com.google.common.collect.ImmutableMap; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; import io.kestra.core.utils.IdUtils; +import io.kestra.plugin.jdbc.AbstractJdbcBatch; import io.kestra.plugin.jdbc.AbstractJdbcQuery; import io.kestra.plugin.jdbc.AbstractRdbmsTest; -import io.kestra.core.junit.annotations.KestraTest; +import org.h2.value.ValueTinyint; import org.junit.jupiter.api.Test; +import reactor.util.function.Tuple2; import java.io.*; import java.math.BigDecimal; @@ -106,7 +111,7 @@ void updateBatch() throws Exception { FileSerde.write(output, ImmutableMap.builder() .put("String", "kestra") .build() - ); + ); } URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); @@ -140,6 +145,35 @@ void updateBatch() throws Exception { assertThat(runOutput.getRows().stream().anyMatch(map -> map.get("String").equals("kestra")), is(true)); } + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + "Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + BulkInsert task = BulkInsert.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("clickhouse_types") + .columns(List.of("String")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Override protected String getUrl() { return "jdbc:clickhouse://127.0.0.1:28123/default"; diff --git a/plugin-jdbc-mysql/src/main/java/io/kestra/plugin/jdbc/mysql/Batch.java b/plugin-jdbc-mysql/src/main/java/io/kestra/plugin/jdbc/mysql/Batch.java index 46c9dc1d..3d4ffb4d 100644 --- a/plugin-jdbc-mysql/src/main/java/io/kestra/plugin/jdbc/mysql/Batch.java +++ b/plugin-jdbc-mysql/src/main/java/io/kestra/plugin/jdbc/mysql/Batch.java @@ -51,6 +51,30 @@ " sql: |", " insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", } + ), + @Example( + title = "Fetch rows from a table, and bulk insert them to another one, without using sql query.", + full = true, + code = { + "tasks:", + " - id: query", + " type: io.kestra.plugin.jdbc.mysql.Query", + " url: jdbc:mysql://127.0.0.1:3306/", + " username: mysql_user", + " password: mysql_passwd", + " sql: |", + " SELECT *", + " FROM xref", + " LIMIT 1500;", + " store: true", + " - id: update", + " type: io.kestra.plugin.jdbc.mysql.Batch", + " from: \"{{ outputs.query.uri }}\"", + " url: jdbc:mysql://127.0.0.1:3306/", + " username: mysql_user", + " password: mysql_passwd", + " table: xref" + } ) } ) diff --git a/plugin-jdbc-mysql/src/test/java/io/kestra/plugin/jdbc/mysql/BatchTest.java b/plugin-jdbc-mysql/src/test/java/io/kestra/plugin/jdbc/mysql/BatchTest.java index 5e31d66a..34057dbf 100644 --- a/plugin-jdbc-mysql/src/test/java/io/kestra/plugin/jdbc/mysql/BatchTest.java +++ b/plugin-jdbc-mysql/src/test/java/io/kestra/plugin/jdbc/mysql/BatchTest.java @@ -19,6 +19,7 @@ import java.sql.SQLException; import java.time.*; import java.util.Arrays; +import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -186,6 +187,83 @@ public void namedColumnsInsert() throws Exception { assertThat(runOutput.getRowCount(), is(5L)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, Arrays.asList( + i, + true, + "four", + "Here is a varchar", + "Here is a text column data", + null, + -9223372036854775808L, + 1844674407370955161L, + new byte[]{0b000101}, + 9223372036854776000F, + 9223372036854776000D, + 2147483645.1234D, + new BigDecimal("5.36"), + new BigDecimal("999.99"), + LocalDate.parse("2030-12-25"), + "2050-12-31 22:59:57.150150", + LocalTime.parse("04:05:30"), + "2004-10-19T10:23:54.999999", + "2025", + "{\"color\": \"red\", \"value\": \"#f00\"}", + Hex.decodeHex("DEADBEEF".toCharArray()) + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("mysql_types") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + "Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("namedInsert") + .columns(List.of("name")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Override protected String getUrl() { return "jdbc:mysql://127.0.0.1:64790/kestra"; diff --git a/plugin-jdbc-oracle/src/main/java/io/kestra/plugin/jdbc/oracle/Batch.java b/plugin-jdbc-oracle/src/main/java/io/kestra/plugin/jdbc/oracle/Batch.java index 644c78a8..6780f726 100644 --- a/plugin-jdbc-oracle/src/main/java/io/kestra/plugin/jdbc/oracle/Batch.java +++ b/plugin-jdbc-oracle/src/main/java/io/kestra/plugin/jdbc/oracle/Batch.java @@ -50,6 +50,30 @@ " sql: |", " insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", } + ), + @Example( + title = "Fetch rows from a table and bulk insert to another one, without using sql query", + full = true, + code = { + "tasks:", + " - id: query", + " type: io.kestra.plugin.jdbc.oracle.Query", + " url: jdbc:oracle:thin:@dev:49161:XE", + " username: oracle", + " password: oracle_passwd", + " sql: |", + " SELECT *", + " FROM xref", + " LIMIT 1500;", + " store: true", + " - id: update", + " type: io.kestra.plugin.jdbc.oracle.Batch", + " from: \"{{ outputs.query.uri }}\"", + " url: jdbc:oracle:thin:@prod:49161:XE", + " username: oracle", + " password: oracle_passwd", + " table: XREF" + } ) } ) diff --git a/plugin-jdbc-oracle/src/test/java/io/kestra/plugin/jdbc/oracle/BatchTest.java b/plugin-jdbc-oracle/src/test/java/io/kestra/plugin/jdbc/oracle/BatchTest.java index 777b991e..34f4370f 100644 --- a/plugin-jdbc-oracle/src/test/java/io/kestra/plugin/jdbc/oracle/BatchTest.java +++ b/plugin-jdbc-oracle/src/test/java/io/kestra/plugin/jdbc/oracle/BatchTest.java @@ -20,6 +20,7 @@ import java.time.LocalDateTime; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; @@ -121,7 +122,6 @@ public void namedColumnsInsert() throws Exception { ); } - URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); ArrayList columns = new ArrayList<>(); @@ -142,6 +142,83 @@ public void namedColumnsInsert() throws Exception { assertThat(runOutput.getRowCount(), is(5L)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 0; i < 5; i++) { + FileSerde.write(output, List.of( + "aa", + "", + "bb", + "cc", + "dd", + "ee", + "ff".getBytes(StandardCharsets.UTF_8), + "gg", + "hh", + BigDecimal.valueOf(7456123.89), + BigDecimal.valueOf(7456123.9), + BigDecimal.valueOf(7456124), + BigDecimal.valueOf(7456123.89), + BigDecimal.valueOf(7456123.9), + BigDecimal.valueOf(7456100), + 7456123.89F, + 7456123.89D, + LocalDate.parse("1992-11-13"), + LocalDateTime.parse("1998-01-23T06:00:00"), + ZonedDateTime.parse("1998-01-23T06:00:00-05:00"), + LocalDateTime.parse("1998-01-23T12:00:00") + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("ORACLE_TYPES") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + "It's-a me, Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("namedInsert") + .columns(List.of("t_name")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Override protected String getUrl() { return "jdbc:oracle:thin:@localhost:49161:XE"; diff --git a/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/Batch.java b/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/Batch.java index 634aa0db..fd036aab 100644 --- a/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/Batch.java +++ b/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/Batch.java @@ -51,6 +51,31 @@ " sql: |", " insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", } + ), + + @Example( + title = "Fetch rows from a table, and bulk insert them to another one, without using sql query.", + full = true, + code = { + "tasks:", + " - id: query", + " type: io.kestra.plugin.jdbc.postgresql.Query", + " url: jdbc:postgresql://dev:56982/", + " username: pg_user", + " password: pg_passwd", + " sql: |", + " SELECT *", + " FROM xref", + " LIMIT 1500;", + " store: true", + " - id: update", + " type: io.kestra.plugin.jdbc.postgresql.Batch", + " from: \"{{ outputs.query.uri }}\"", + " url: jdbc:postgresql://prod:56982/", + " username: pg_user", + " password: pg_passwd", + " table: xref" + } ) } ) diff --git a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java index 6f8b43fc..6251a4d6 100644 --- a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java +++ b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java @@ -19,6 +19,7 @@ import java.sql.SQLException; import java.time.*; import java.util.Arrays; +import java.util.List; import java.util.Properties; import static org.hamcrest.MatcherAssert.assertThat; @@ -210,6 +211,85 @@ public void namedColumnsInsert() throws Exception { assertThat(runOutput.getRowCount(), is(5L)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, Arrays.asList( + i, + true, + "four", + "Here is a varchar", + "Here is a text column data", + null, + 32767, + 9223372036854775807L, + 9223372036854776000F, + 9223372036854776000d, + 9223372036854776000d, + new BigDecimal("2147483645.1234"), + LocalDate.parse("2030-12-25"), + LocalTime.parse("04:05:30"), + OffsetTime.parse("13:05:06+01:00"), + LocalDateTime.parse("2004-10-19T10:23:54.999999"), + ZonedDateTime.parse("2004-10-19T08:23:54.250+02:00[Europe/Paris]"), + "P10Y4M5DT0H0M10S", + new int[]{100, 200, 300}, + new String[][]{new String[]{"meeting", "lunch"}, new String[]{"training", "presentation"}}, + "{\"color\":\"red\",\"value\":\"#f00\"}", + "{\"color\":\"blue\",\"value\":\"#0f0\"}", + Hex.decodeHex("DEADBEEF".toCharArray()) + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("pgsql_types") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + "Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("namedInsert") + .columns(List.of("name")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Override protected String getUrl() { return TestUtils.url(); diff --git a/plugin-jdbc-sqlserver/src/main/java/io/kestra/plugin/jdbc/sqlserver/Batch.java b/plugin-jdbc-sqlserver/src/main/java/io/kestra/plugin/jdbc/sqlserver/Batch.java index 3ff6329a..ea071800 100644 --- a/plugin-jdbc-sqlserver/src/main/java/io/kestra/plugin/jdbc/sqlserver/Batch.java +++ b/plugin-jdbc-sqlserver/src/main/java/io/kestra/plugin/jdbc/sqlserver/Batch.java @@ -50,6 +50,30 @@ " sql: |", " insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", } + ), + @Example( + title = "Fetch rows from a table and bulk insert to another one, without using sql query.", + full = true, + code = { + "tasks:", + " - id: query", + " type: io.kestra.plugin.jdbc.sqlserver.Query", + " url: jdbc:sqlserver://dev:41433;trustServerCertificate=true", + " username: sql_server_user", + " password: sql_server_passwd", + " sql: |", + " SELECT *", + " FROM xref", + " LIMIT 1500;", + " store: true", + " - id: update", + " type: io.kestra.plugin.jdbc.sqlserver.Batch", + " from: \"{{ outputs.query.uri }}\"", + " url: jdbc:sqlserver://prod:41433;trustServerCertificate=true", + " username: sql_server_user", + " password: sql_server_passwd", + " table: xref" + } ) } ) diff --git a/plugin-jdbc-sqlserver/src/test/java/io/kestra/plugin/jdbc/sqlserver/BatchTest.java b/plugin-jdbc-sqlserver/src/test/java/io/kestra/plugin/jdbc/sqlserver/BatchTest.java index 90caf1a5..65c20ecc 100644 --- a/plugin-jdbc-sqlserver/src/test/java/io/kestra/plugin/jdbc/sqlserver/BatchTest.java +++ b/plugin-jdbc-sqlserver/src/test/java/io/kestra/plugin/jdbc/sqlserver/BatchTest.java @@ -145,6 +145,88 @@ public void namedColumnsInsert() throws Exception { assertThat(runOutput.getRowCount(), is(5L)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write( + output, + List.of( + 9223372036854775807L, + 2147483647, + 32767, + 255, + 12345.12345D, + 12345.12345F, + BigDecimal.valueOf(123.46), + BigDecimal.valueOf(12345.12345), + true, + BigDecimal.valueOf(3148.2929), + BigDecimal.valueOf(3148.1234), + "test ", + "test", + "test ", + "test", + "test", + "test", + LocalTime.parse("12:35:29"), + LocalDate.parse("2007-05-08"), + ZonedDateTime.parse("2007-05-08T12:35+02:00[Europe/Paris]"), + ZonedDateTime.parse("2007-05-08T12:35:29.123+02:00[Europe/Paris]"), + ZonedDateTime.parse("2007-05-08T12:35:29.123456700+02:00[Europe/Paris]"), + OffsetDateTime.parse("2007-05-08T12:35:29.123456700+12:15") + ) + ); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("sqlserver_types") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + "Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("namedInsert") + .columns(List.of("t_name")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Override protected String getUrl() { return "jdbc:sqlserver://localhost:41433;trustServerCertificate=true"; diff --git a/plugin-jdbc-vectorwise/src/main/java/io/kestra/plugin/jdbc/vectorwise/Batch.java b/plugin-jdbc-vectorwise/src/main/java/io/kestra/plugin/jdbc/vectorwise/Batch.java index fdeb43c0..6acdfc0e 100644 --- a/plugin-jdbc-vectorwise/src/main/java/io/kestra/plugin/jdbc/vectorwise/Batch.java +++ b/plugin-jdbc-vectorwise/src/main/java/io/kestra/plugin/jdbc/vectorwise/Batch.java @@ -49,6 +49,30 @@ " password: admin_passwd", " sql: insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", } + ), + @Example( + title = "Fetch rows from a table and bulk insert to another one without using sql query.", + full = true, + code = { + "tasks:", + " - id: query", + " type: io.kestra.plugin.jdbc.vectorwise.Query", + " url: jdbc:vectorwise://dev:port/base", + " username: admin", + " password: admin_passwd", + " sql: |", + " SELECT *", + " FROM xref", + " LIMIT 1500;", + " store: true", + " - id: update", + " type: io.kestra.plugin.jdbc.vectorwise.Batch", + " from: \"{{ outputs.query.uri }}\"", + " url: jdbc:vectorwise://prod:port/base", + " username: admin", + " password: admin_passwd", + " table: xref", + } ) } ) diff --git a/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java b/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java index 4aeaa2d9..43aa4c06 100644 --- a/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java +++ b/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java @@ -190,6 +190,84 @@ public void namedColumnsInsert() throws Exception { assertThat(runOutput.getRowCount(), is(5L)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + 125, + 32000, + 2147483640, + 9007199254740991L, + new BigDecimal("1.55"), + 1.12365125789541, + 1.2F, + "y", + "yoplait", + "tes", + "autret", + LocalDateTime.parse("1900-10-04T22:23:00.000"), + LocalDate.parse("2006-05-16"), + LocalDate.parse("2006-05-16"), + LocalTime.parse("04:05:30"), + LocalTime.parse("05:05:30"), + LocalTime.parse("05:05:30"), + LocalDateTime.parse("2006-05-16T00:00:00.000"), + ZonedDateTime.parse("2006-05-16T02:00:00.000+02:00[Europe/Paris]"), + ZonedDateTime.parse("2006-05-16T02:00:00.000+02:00[Europe/Paris]"), + true + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("ingres.abdt_test") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + 123, + "Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("ingres.abdt_test") + .columns(List.of("tinyint", "varchar")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Override protected String getUrl() { return this.url; diff --git a/plugin-jdbc-vertica/src/main/java/io/kestra/plugin/jdbc/vertica/Batch.java b/plugin-jdbc-vertica/src/main/java/io/kestra/plugin/jdbc/vertica/Batch.java index b8c140a3..85fa4aff 100644 --- a/plugin-jdbc-vertica/src/main/java/io/kestra/plugin/jdbc/vertica/Batch.java +++ b/plugin-jdbc-vertica/src/main/java/io/kestra/plugin/jdbc/vertica/Batch.java @@ -50,6 +50,31 @@ " password: vertica_passwd", " sql: insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", } + ), + @Example( + title = "Fetch rows from a table and bulk insert to another one, without using sql query.", + full = true, + code = { + "tasks:", + " - id: query", + " type: io.kestra.plugin.jdbc.vertica.Query", + " url: jdbc:vertica://dev:56982/db", + " username: vertica_user", + " password: vertica_passwd", + " sql: |", + " SELECT *", + " FROM xref", + " LIMIT 1500;", + " fetch: true", + " store: true", + " - id: update", + " type: io.kestra.plugin.jdbc.vertica.Batch", + " from: \"{{ outputs.query.uri }}\"", + " url: jdbc:vertica://prod:56982/db", + " username: vertica_user", + " password: vertica_passwd", + " table: xref", + } ) } ) diff --git a/plugin-jdbc-vertica/src/test/java/io/kestra/plugin/jdbc/vertica/BatchTest.java b/plugin-jdbc-vertica/src/test/java/io/kestra/plugin/jdbc/vertica/BatchTest.java index cfa03015..baad751b 100644 --- a/plugin-jdbc-vertica/src/test/java/io/kestra/plugin/jdbc/vertica/BatchTest.java +++ b/plugin-jdbc-vertica/src/test/java/io/kestra/plugin/jdbc/vertica/BatchTest.java @@ -152,6 +152,64 @@ public void namedColumnsInsert() throws Exception { assertThat(runOutput.getRowCount(), is(5L)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + i, + "It's-a me, Mario", + "Here" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("namedInsert") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + + @Test + public void noSqlWithNamedColumnsForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, List.of( + "Mario" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + Batch task = Batch.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("namedInsert") + .columns(List.of("name")) + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } @Override protected String getUrl() { diff --git a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java index df66275f..1b65ef08 100644 --- a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java +++ b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java @@ -1,27 +1,26 @@ package io.kestra.plugin.jdbc; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; import org.slf4j.Logger; +import reactor.core.publisher.Flux; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; -import java.sql.Connection; -import java.sql.ParameterMetaData; -import java.sql.PreparedStatement; -import java.sql.SQLException; +import java.sql.*; import java.time.ZoneId; import java.util.*; import java.util.concurrent.atomic.AtomicLong; -import jakarta.validation.constraints.NotNull; -import reactor.core.publisher.Flux; +import java.util.stream.Collectors; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -73,6 +72,15 @@ public abstract class AbstractJdbcBatch extends Task implements JdbcStatementInt @PluginProperty(dynamic = true) private List columns; + @Schema( + title = "The table from which columns names will be retrieved.", + description = + "This property specifies table name, which will be used to retrieve columns to specify in what columns will be inserted values. \n" + + "In That way columns names in insert statement will be match table schema." + ) + @PluginProperty(dynamic = true) + private String table; + protected abstract AbstractCellConverter getCellConverter(ZoneId zoneId); public Output run(RunContext runContext) throws Exception { @@ -83,7 +91,18 @@ public Output run(RunContext runContext) throws Exception { AbstractCellConverter cellConverter = this.getCellConverter(this.zoneId()); - String sql = runContext.render(this.sql); + List columnsToUse = this.columns; + if (columnsToUse == null && this.table != null) { + columnsToUse = fetchColumnsFromTable(runContext, this.table); + } + + String sql; + if (columnsToUse != null && this.sql == null) { + sql = constructInsertStatement(runContext, this.table, columnsToUse); + } else { + sql = runContext.render(this.sql); + } + logger.debug("Starting prepared statement: {}", sql); try ( @@ -131,6 +150,30 @@ public Output run(RunContext runContext) throws Exception { } } + private String constructInsertStatement(RunContext runContext, String table, List columns) throws IllegalVariableEvaluationException { + return String.format( + "INSERT INTO %s (%s) VALUES (%s)", + runContext.render(table), + String.join(", ", columns), + String.join(", ", Collections.nCopies(columns.size(), "?")) + ); + } + + private List fetchColumnsFromTable(RunContext runContext, String table) throws Exception { + List columns = new ArrayList<>(); + + try (Connection connection = this.connection(runContext)) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet resultSet = metaData.getColumns(null, null, table, null)) { + while (resultSet.next()) { + columns.add(resultSet.getString("COLUMN_NAME")); + } + } + } + + return columns; + } + @SuppressWarnings("unchecked") private PreparedStatement addRows( PreparedStatement ps, From fa86e557257b30e5513223b8e5244a64c6fd8223 Mon Sep 17 00:00:00 2001 From: wolfhack Date: Tue, 20 Aug 2024 15:42:23 +0300 Subject: [PATCH 2/8] Changed descriptions and titles. Added example to ClickHouse BulkInsert. Added test to ClickHouseTest --- .../plugin/jdbc/clickhouse/BulkInsert.java | 10 +++++ .../jdbc/clickhouse/ClickHouseTest.java | 38 +++++++++++++++++++ .../src/test/resources/scripts/clickhouse.sql | 13 +++++++ .../kestra/plugin/jdbc/AbstractJdbcBatch.java | 6 +-- 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/BulkInsert.java b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/BulkInsert.java index efc24a62..567c23db 100644 --- a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/BulkInsert.java +++ b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/BulkInsert.java @@ -49,6 +49,16 @@ "password: ch_passwd", "sql: INSERT INTO YourTable ( field1, field2, field3 ) SETTINGS async_insert=1, wait_for_async_insert=1 values( ?, ?, ? )" } + ), + @Example( + title = "Insert data into specific columns via a SQL query to a ClickHouse database using asynchronous inserts.", + code = { + "from: \"{{ outputs.query.uri }}\"", + "url: jdbc:clickhouse://127.0.0.1:56982/", + "username: ch_user", + "password: ch_passwd", + "table: YourTable" + } ) } ) diff --git a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java index 34b317d6..d3f2a679 100644 --- a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java +++ b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseTest.java @@ -2,6 +2,10 @@ import com.clickhouse.client.internal.google.protobuf.UInt32Value; import com.clickhouse.client.internal.google.protobuf.UInt64Value; +import com.clickhouse.client.internal.google.type.Decimal; +import com.clickhouse.data.ClickHouseColumn; +import com.clickhouse.data.value.ClickHouseNestedValue; +import com.clickhouse.data.value.ClickHouseTupleValue; import com.google.common.collect.ImmutableMap; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.runners.RunContext; @@ -22,6 +26,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; @@ -145,6 +151,38 @@ void updateBatch() throws Exception { assertThat(runOutput.getRows().stream().anyMatch(map -> map.get("String").equals("kestra")), is(true)); } + @Test + public void noSqlForInsert() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + for (int i = 1; i < 6; i++) { + FileSerde.write(output, Arrays.asList( + i, + 2147483645.1234, + 2147483645.1234, + BigDecimal.valueOf(2147483645.1234), + "four" + )); + } + + URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile)); + + BulkInsert task = BulkInsert.builder() + .url(getUrl()) + .username(getUsername()) + .password(getPassword()) + .from(uri.toString()) + .table("clickhouse_ins") + .build(); + + AbstractJdbcBatch.Output runOutput = task.run(runContext); + + assertThat(runOutput.getRowCount(), is(5L)); + } + @Test public void noSqlWithNamedColumnsForInsert() throws Exception { RunContext runContext = runContextFactory.of(ImmutableMap.of()); diff --git a/plugin-jdbc-clickhouse/src/test/resources/scripts/clickhouse.sql b/plugin-jdbc-clickhouse/src/test/resources/scripts/clickhouse.sql index 8a1260fb..4b9043e1 100644 --- a/plugin-jdbc-clickhouse/src/test/resources/scripts/clickhouse.sql +++ b/plugin-jdbc-clickhouse/src/test/resources/scripts/clickhouse.sql @@ -84,3 +84,16 @@ VALUES ( select * from clickhouse_types; + +DROP TABLE IF EXISTS clickhouse_ins; + +CREATE TABLE clickhouse_ins ( + Int64 Int64, + Float32 Float32, + Float64 Float64, + Decimal Decimal(14, 4), + String String, +) +ENGINE = MergeTree() +ORDER BY (Int64) +SETTINGS index_granularity = 8192; \ No newline at end of file diff --git a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java index 1b65ef08..fbbc39db 100644 --- a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java +++ b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java @@ -73,10 +73,10 @@ public abstract class AbstractJdbcBatch extends Task implements JdbcStatementInt private List columns; @Schema( - title = "The table from which columns names will be retrieved.", + title = "The table from which column names will be retrieved.", description = - "This property specifies table name, which will be used to retrieve columns to specify in what columns will be inserted values. \n" + - "In That way columns names in insert statement will be match table schema." + "This property specifies the table name which will be used to retrieve the columns for the inserted values.\n" + + "You can use it instead of specifying manually the columns in the `columns` property. In this case, the `sql` property can also be omitted, an INSERT statement would be generated automatically." ) @PluginProperty(dynamic = true) private String table; From ee6e5843ddd39beadd142ffce6b214bbdcd26438 Mon Sep 17 00:00:00 2001 From: wolfhack Date: Wed, 21 Aug 2024 13:17:20 +0300 Subject: [PATCH 3/8] Fixed postgres tests. Added ssl parameters for new postgres tests --- .../plugin/jdbc/postgresql/BatchTest.java | 21 ++++++++++---- .../resources/scripts/postgres_insert.sql | 28 +++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java index 6251a4d6..62a71e92 100644 --- a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java +++ b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java @@ -1,12 +1,12 @@ package io.kestra.plugin.jdbc.postgresql; import com.google.common.collect.ImmutableMap; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; import io.kestra.core.utils.IdUtils; import io.kestra.plugin.jdbc.AbstractJdbcBatch; import io.kestra.plugin.jdbc.AbstractRdbmsTest; -import io.kestra.core.junit.annotations.KestraTest; import org.apache.commons.codec.binary.Hex; import org.junit.jupiter.api.Test; @@ -83,7 +83,7 @@ void insert() throws Exception { " a,\n" + " b,\n" + " c,\n" + - " d,\n" + + " d,\n" + " play_time,\n" + " library_record,\n" + " floatn_test,\n" + @@ -241,7 +241,6 @@ public void noSqlForInsert() throws Exception { new int[]{100, 200, 300}, new String[][]{new String[]{"meeting", "lunch"}, new String[]{"training", "presentation"}}, "{\"color\":\"red\",\"value\":\"#f00\"}", - "{\"color\":\"blue\",\"value\":\"#0f0\"}", Hex.decodeHex("DEADBEEF".toCharArray()) )); } @@ -252,8 +251,14 @@ public void noSqlForInsert() throws Exception { .url(getUrl()) .username(getUsername()) .password(getPassword()) + .ssl(TestUtils.ssl()) + .sslMode(TestUtils.sslMode()) + .sslRootCert(TestUtils.ca()) + .sslCert(TestUtils.cert()) + .sslKey(TestUtils.key()) + .sslKeyPassword(TestUtils.keyPass()) .from(uri.toString()) - .table("pgsql_types") + .table("pgsql_nosql") .build(); AbstractJdbcBatch.Output runOutput = task.run(runContext); @@ -270,7 +275,7 @@ public void noSqlWithNamedColumnsForInsert() throws Exception { for (int i = 1; i < 6; i++) { FileSerde.write(output, List.of( - "Mario" + "Mario" )); } @@ -280,6 +285,12 @@ public void noSqlWithNamedColumnsForInsert() throws Exception { .url(getUrl()) .username(getUsername()) .password(getPassword()) + .ssl(TestUtils.ssl()) + .sslMode(TestUtils.sslMode()) + .sslRootCert(TestUtils.ca()) + .sslCert(TestUtils.cert()) + .sslKey(TestUtils.key()) + .sslKeyPassword(TestUtils.keyPass()) .from(uri.toString()) .table("namedInsert") .columns(List.of("name")) diff --git a/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql b/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql index ac2ba882..6dc48154 100644 --- a/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql +++ b/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql @@ -1,5 +1,6 @@ DROP TABLE IF EXISTS pgsql_types; +DROP TABLE IF EXISTS pgsql_nosql; DROP TABLE IF EXISTS namedInsert; CREATE TABLE pgsql_types ( @@ -30,6 +31,33 @@ CREATE TABLE pgsql_types ( ); +CREATE TABLE pgsql_nosql ( + concert_id serial NOT NULL, + available boolean not null, + a CHAR(4) not null, + b VARCHAR(30) not null, + c TEXT not null, + d VARCHAR(10), + play_time smallint not null, + library_record BIGINT not null, + -- money_type money not null, + floatn_test float8 not null, + double_test double precision not null, + real_test real not null, + numeric_test numeric not null, + date_type DATE not null, + time_type TIME not null, + timez_type TIME WITH TIME ZONE not null, + timestamp_type TIMESTAMP not null, + timestampz_type TIMESTAMP WITH TIME ZONE not null, + interval_type INTERVAL not null, + pay_by_quarter integer[] not null, + schedule text[][] not null, + json_type JSON not null, + blob_type bytea not null +); + + CREATE TABLE namedInsert ( id integer, name VARCHAR, From f4c9b00ea9c9b4326865d3538cf8b8e92f4ca0e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Thu, 22 Aug 2024 17:33:33 +0200 Subject: [PATCH 4/8] Update plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java --- .../test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java b/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java index 43aa4c06..f0fcf90b 100644 --- a/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java +++ b/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java @@ -191,6 +191,7 @@ public void namedColumnsInsert() throws Exception { } @Test + @Disabled public void noSqlForInsert() throws Exception { RunContext runContext = runContextFactory.of(ImmutableMap.of()); From 90df9ca1f090144af3457c6c02adf456a11a8023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Thu, 22 Aug 2024 17:33:39 +0200 Subject: [PATCH 5/8] Update plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java --- .../test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java b/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java index f0fcf90b..435a6113 100644 --- a/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java +++ b/plugin-jdbc-vectorwise/src/test/java/io/kestra/plugin/jdbc/vectorwise/BatchTest.java @@ -240,6 +240,7 @@ public void noSqlForInsert() throws Exception { } @Test + @Disabled public void noSqlWithNamedColumnsForInsert() throws Exception { RunContext runContext = runContextFactory.of(ImmutableMap.of()); From c4b29455dd450a962e1bcd88408008abed56ec3f Mon Sep 17 00:00:00 2001 From: wolfhack Date: Fri, 23 Aug 2024 17:05:09 +0300 Subject: [PATCH 6/8] Added support for tsvector for postgres --- .../jdbc/postgresql/PostgresCellConverter.java | 4 ++++ .../kestra/plugin/jdbc/postgresql/BatchTest.java | 9 ++++++--- .../kestra/plugin/jdbc/postgresql/PgsqlTest.java | 5 +++-- .../src/test/resources/scripts/postgres.sql | 15 ++++++++++----- .../test/resources/scripts/postgres_insert.sql | 3 ++- 5 files changed, 25 insertions(+), 11 deletions(-) diff --git a/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/PostgresCellConverter.java b/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/PostgresCellConverter.java index 42d99774..3c7d584f 100644 --- a/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/PostgresCellConverter.java +++ b/plugin-jdbc-postgres/src/main/java/io/kestra/plugin/jdbc/postgresql/PostgresCellConverter.java @@ -62,6 +62,10 @@ public Object convertCell(int columnIndex, ResultSet rs, Connection connection) PGobject o = ((PGobject) data); String type = o.getType(); switch (type.toLowerCase()) { + case "tsvector": + { + return o.getValue(); + } case "json": case "jsonb": try { diff --git a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java index 62a71e92..9595b6ca 100644 --- a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java +++ b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/BatchTest.java @@ -58,7 +58,8 @@ void insert() throws Exception { new String[][]{new String[]{"meeting", "lunch"}, new String[]{"training", "presentation"}}, "{\"color\":\"red\",\"value\":\"#f00\"}", "{\"color\":\"blue\",\"value\":\"#0f0\"}", - Hex.decodeHex("DEADBEEF".toCharArray()) + Hex.decodeHex("DEADBEEF".toCharArray()), + "a quick brown fox jumped over the lazy dog" )); } @@ -100,7 +101,8 @@ void insert() throws Exception { " schedule,\n" + " json_type,\n" + " jsonb_type,\n" + - " blob_type" + + " blob_type,\n" + + " tsvector_col" + ")\n" + "values\n" + "(\n " + @@ -126,7 +128,8 @@ void insert() throws Exception { " ?,\n" + " ?,\n" + " ?::jsonb,\n" + - " ?\n" + + " ?,\n" + + " ?::tsvector\n" + ")" ) .build(); diff --git a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/PgsqlTest.java b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/PgsqlTest.java index 6a2f7687..fa240312 100644 --- a/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/PgsqlTest.java +++ b/plugin-jdbc-postgres/src/test/java/io/kestra/plugin/jdbc/postgresql/PgsqlTest.java @@ -53,7 +53,7 @@ void selectAndFetchOne() throws Exception { .sslKey(TestUtils.keyNoPass()) .fetchOne(true) .timeZoneId("Europe/Paris") - .sql("select concert_id, available, a, b, c, d, play_time, library_record, floatn_test, double_test, real_test, numeric_test, date_type, time_type, timez_type, timestamp_type, timestampz_type, interval_type, pay_by_quarter, schedule, json_type, jsonb_type, blob_type from pgsql_types") + .sql("select concert_id, available, a, b, c, d, play_time, library_record, floatn_test, double_test, real_test, numeric_test, date_type, time_type, timez_type, timestamp_type, timestampz_type, interval_type, pay_by_quarter, schedule, json_type, jsonb_type, blob_type, tsvector_col from pgsql_types") .build(); AbstractJdbcQuery.Output runOutput = task.run(runContext); @@ -118,6 +118,7 @@ private void checkRow(Map row, int concertId) throws DecoderExce assertThat(row.get("json_type"), is(Map.of("color", "red", "value", "#f00"))); assertThat(row.get("jsonb_type"), is(Map.of("color", "blue", "value", "#0f0"))); assertThat(row.get("blob_type"), is(Hex.decodeHex("DEADBEEF".toCharArray()))); + assertThat(row.get("tsvector_col"), is("'brown':4 'dice':2 'dog':9 'fox':5 'fuzzi':1 'jump':6 'lazi':8 'quick':3")); } @Test @@ -135,7 +136,7 @@ void selectAndFetch() throws Exception { .sslKey(TestUtils.keyNoPass()) .fetch(true) .timeZoneId("Europe/Paris") - .sql("select concert_id, available, a, b, c, d, play_time, library_record, floatn_test, double_test, real_test, numeric_test, date_type, time_type, timez_type, timestamp_type, timestampz_type, interval_type, pay_by_quarter, schedule, json_type, jsonb_type, blob_type from pgsql_types") + .sql("select concert_id, available, a, b, c, d, play_time, library_record, floatn_test, double_test, real_test, numeric_test, date_type, time_type, timez_type, timestamp_type, timestampz_type, interval_type, pay_by_quarter, schedule, json_type, jsonb_type, blob_type, tsvector_col from pgsql_types") .build(); AbstractJdbcQuery.Output runOutput = task.run(runContext); diff --git a/plugin-jdbc-postgres/src/test/resources/scripts/postgres.sql b/plugin-jdbc-postgres/src/test/resources/scripts/postgres.sql index 308398a7..5e02b61d 100644 --- a/plugin-jdbc-postgres/src/test/resources/scripts/postgres.sql +++ b/plugin-jdbc-postgres/src/test/resources/scripts/postgres.sql @@ -52,7 +52,8 @@ CREATE TABLE pgsql_types ( json_type JSON not null, jsonb_type JSONB not null, item inventory_item not null, - blob_type bytea not null + blob_type bytea not null, + tsvector_col TSVECTOR not null ); @@ -82,7 +83,8 @@ INSERT INTO pgsql_types json_type, jsonb_type, item, - blob_type) + blob_type, + tsvector_col) VALUES ( DEFAULT, true, 'four', @@ -107,7 +109,8 @@ VALUES ( DEFAULT, '{"color":"red","value":"#f00"}', '{"color":"blue","value":"#0f0"}', Row('fuzzy dice', 42, 1.99), - '\xDEADBEEF' ); + '\xDEADBEEF', + to_tsvector('english', 'fuzzy dice quick brown fox jumps over lazy dog')); -- Insert @@ -136,7 +139,8 @@ INSERT INTO pgsql_types json_type, jsonb_type, item, - blob_type) + blob_type, + tsvector_col) VALUES ( DEFAULT, true, 'four', @@ -161,4 +165,5 @@ VALUES ( DEFAULT, '{"color":"red","value":"#f00"}', '{"color":"blue","value":"#0f0"}', Row('fuzzy dice', 42, 1.99), - '\xDEADBEEF' ); + '\xDEADBEEF', + to_tsvector('english', 'fuzzy dice quick brown fox jumps over lazy dog')); diff --git a/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql b/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql index 6dc48154..c550d41d 100644 --- a/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql +++ b/plugin-jdbc-postgres/src/test/resources/scripts/postgres_insert.sql @@ -27,7 +27,8 @@ CREATE TABLE pgsql_types ( schedule text[][] not null, json_type JSON not null, jsonb_type JSONB not null, - blob_type bytea not null + blob_type bytea not null, + tsvector_col TSVECTOR not null ); From 49cdd82367684c4a24e5dfa95fa105e5951c3f81 Mon Sep 17 00:00:00 2001 From: wolfhack Date: Thu, 29 Aug 2024 22:15:04 +0300 Subject: [PATCH 7/8] Implemented ClickHouse CLI task --- plugin-jdbc-clickhouse/build.gradle | 2 + .../jdbc/clickhouse/cli/ClickHouseCLI.java | 120 ++++++++++++++++++ .../jdbc/clickhouse/cli/package-info.java | 8 ++ .../io.kestra.plugin.jdbc.clickhouse.cli.svg | 1 + .../clickhouse/cli/ClickHouseCLITest.java | 39 ++++++ 5 files changed, 170 insertions(+) create mode 100644 plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java create mode 100644 plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java create mode 100644 plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg create mode 100644 plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java diff --git a/plugin-jdbc-clickhouse/build.gradle b/plugin-jdbc-clickhouse/build.gradle index 2cc297e0..f527c524 100644 --- a/plugin-jdbc-clickhouse/build.gradle +++ b/plugin-jdbc-clickhouse/build.gradle @@ -16,5 +16,7 @@ dependencies { implementation("com.clickhouse:clickhouse-jdbc:0.6.0:all") implementation project(':plugin-jdbc') + api group: "io.kestra", name: "script", version: kestraVersion + testImplementation project(':plugin-jdbc').sourceSets.test.output } diff --git a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java new file mode 100644 index 00000000..2ea00132 --- /dev/null +++ b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java @@ -0,0 +1,120 @@ +package io.kestra.plugin.jdbc.clickhouse.cli; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.tasks.*; +import io.kestra.core.models.tasks.runners.ScriptService; +import io.kestra.core.models.tasks.runners.TaskRunner; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; +import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper; +import io.kestra.plugin.scripts.runner.docker.Docker; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Runs a ClickHouse-local commands." +) +@Plugin( + examples = { + @Example( + title = "Query data in a Parquet file in AWS S3", + full = true, + code = { + """ + id: clickhouse-local + namespace: company.team + tasks: + - id: wdir + type: io.kestra.plugin.core.flow.WorkingDirectory + tasks: + - id: query + type: io.kestra.plugin.clickhouse.cli.ClickHouseCLI + commands: + - SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet') + """ + } + ) + } +) +public class ClickHouseCLI extends Task implements RunnableTask, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface { + + public static final String DEFAULT_IMAGE = "clickhouse/clickhouse-server:latest"; + + @Schema( + title = "The commands to run before main list of commands." + ) + @PluginProperty(dynamic = true) + protected List beforeCommands; + + @Schema( + title = "The commands to run in clickhouse-local." + ) + @PluginProperty(dynamic = true) + @NotEmpty + protected List commands; + + @Schema( + title = "Additional environment variables for the current process." + ) + @PluginProperty(dynamic = true) + protected Map env; + + @Schema( + title = "The task runner to use." + ) + @Valid + @PluginProperty + @Builder.Default + private TaskRunner taskRunner = Docker.instance(); + + @Schema( + title = "The Clickhouse container image." + ) + @PluginProperty(dynamic =true) + @Builder.Default + private String containerImage = DEFAULT_IMAGE; + + private NamespaceFiles namespaceFiles; + + private Object inputFiles; + + private List outputFiles; + + @Override + public ScriptOutput run(RunContext runContext) throws Exception { + return new CommandsWrapper(runContext) + .withWarningOnStdErr(true) + .withTaskRunner(this.taskRunner) + .withContainerImage(this.containerImage) + .withEnv(Optional.ofNullable(env).orElse(new HashMap<>())) + .withNamespaceFiles(namespaceFiles) + .withInputFiles(inputFiles) + .withOutputFiles(outputFiles) + .withCommands( + ScriptService.scriptCommands( + List.of("clickhouse-local"), + Optional.ofNullable(this.beforeCommands).map(throwFunction(runContext::render)).orElse(null), + runContext.render(this.commands) + ) + ) + .run(); + } + +} diff --git a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java new file mode 100644 index 00000000..70400803 --- /dev/null +++ b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java @@ -0,0 +1,8 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for using ClickHouse-local.\n " + + "ClickHouse-local is an easy-to-use version of ClickHouse that is ideal for developers who need to perform fast processing on local and remote files using SQL", + categories = {PluginSubGroup.PluginCategory.DATABASE, PluginSubGroup.PluginCategory.TOOL} +) +package io.kestra.plugin.jdbc.clickhouse.cli; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg b/plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg new file mode 100644 index 00000000..865b96d9 --- /dev/null +++ b/plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java new file mode 100644 index 00000000..34ec03d6 --- /dev/null +++ b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java @@ -0,0 +1,39 @@ +package io.kestra.plugin.jdbc.clickhouse.cli; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.is; + +@KestraTest +public class ClickHouseCLITest { + + @Inject + private RunContextFactory runContextFactory; + + @Test + void run() throws Exception { + ClickHouseCLI clickHouseCLI = ClickHouseCLI.builder() + .id(IdUtils.create()) + .type(ClickHouseCLI.class.getName()) + .commands(List.of("SELECT * FROM system.tables")) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, clickHouseCLI, Map.of()); + + ScriptOutput output = clickHouseCLI.run(runContext); + + assertThat(output.getExitCode(), is(0)); + } + +} From e6af3209998d1c778ee0c233502727c1673fe646 Mon Sep 17 00:00:00 2001 From: wolfhack Date: Fri, 30 Aug 2024 18:25:26 +0300 Subject: [PATCH 8/8] Apply suggestions --- plugin-jdbc-clickhouse/build.gradle | 2 +- ...kHouseCLI.java => ClickHouseLocalCLI.java} | 21 ++++++++----------- .../jdbc/clickhouse/cli/package-info.java | 8 ------- .../io.kestra.plugin.jdbc.clickhouse.cli.svg | 1 - ...ITest.java => ClickHouseLocalCLITest.java} | 14 ++++++------- 5 files changed, 17 insertions(+), 29 deletions(-) rename plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/{cli/ClickHouseCLI.java => ClickHouseLocalCLI.java} (78%) delete mode 100644 plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java delete mode 100644 plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg rename plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/{cli/ClickHouseCLITest.java => ClickHouseLocalCLITest.java} (69%) diff --git a/plugin-jdbc-clickhouse/build.gradle b/plugin-jdbc-clickhouse/build.gradle index f527c524..122a926c 100644 --- a/plugin-jdbc-clickhouse/build.gradle +++ b/plugin-jdbc-clickhouse/build.gradle @@ -16,7 +16,7 @@ dependencies { implementation("com.clickhouse:clickhouse-jdbc:0.6.0:all") implementation project(':plugin-jdbc') - api group: "io.kestra", name: "script", version: kestraVersion + compileOnly group: "io.kestra", name: "script", version: kestraVersion testImplementation project(':plugin-jdbc').sourceSets.test.output } diff --git a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseLocalCLI.java similarity index 78% rename from plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java rename to plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseLocalCLI.java index 2ea00132..35dbe809 100644 --- a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLI.java +++ b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseLocalCLI.java @@ -1,4 +1,4 @@ -package io.kestra.plugin.jdbc.clickhouse.cli; +package io.kestra.plugin.jdbc.clickhouse; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; @@ -29,31 +29,28 @@ @Getter @NoArgsConstructor @Schema( - title = "Runs a ClickHouse-local commands." + title = "Run clickhouse-local commands." ) @Plugin( examples = { @Example( - title = "Query data in a Parquet file in AWS S3", + title = "Run clickhouse-local commands", full = true, code = { """ id: clickhouse-local namespace: company.team tasks: - - id: wdir - type: io.kestra.plugin.core.flow.WorkingDirectory - tasks: - - id: query - type: io.kestra.plugin.clickhouse.cli.ClickHouseCLI - commands: - - SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet') + - id: query + type: io.kestra.plugin.clickhouse.ClickHouseLocalCLI + commands: + - SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet') """ } ) } ) -public class ClickHouseCLI extends Task implements RunnableTask, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface { +public class ClickHouseLocalCLI extends Task implements RunnableTask, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface { public static final String DEFAULT_IMAGE = "clickhouse/clickhouse-server:latest"; @@ -64,7 +61,7 @@ public class ClickHouseCLI extends Task implements RunnableTask, N protected List beforeCommands; @Schema( - title = "The commands to run in clickhouse-local." + title = "The commands to run." ) @PluginProperty(dynamic = true) @NotEmpty diff --git a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java b/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java deleted file mode 100644 index 70400803..00000000 --- a/plugin-jdbc-clickhouse/src/main/java/io/kestra/plugin/jdbc/clickhouse/cli/package-info.java +++ /dev/null @@ -1,8 +0,0 @@ -@PluginSubGroup( - description = "This sub-group of plugins contains tasks for using ClickHouse-local.\n " + - "ClickHouse-local is an easy-to-use version of ClickHouse that is ideal for developers who need to perform fast processing on local and remote files using SQL", - categories = {PluginSubGroup.PluginCategory.DATABASE, PluginSubGroup.PluginCategory.TOOL} -) -package io.kestra.plugin.jdbc.clickhouse.cli; - -import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg b/plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg deleted file mode 100644 index 865b96d9..00000000 --- a/plugin-jdbc-clickhouse/src/main/resources/icons/io.kestra.plugin.jdbc.clickhouse.cli.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseLocalCLITest.java similarity index 69% rename from plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java rename to plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseLocalCLITest.java index 34ec03d6..976434a5 100644 --- a/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/cli/ClickHouseCLITest.java +++ b/plugin-jdbc-clickhouse/src/test/java/io/kestra/plugin/jdbc/clickhouse/ClickHouseLocalCLITest.java @@ -1,4 +1,4 @@ -package io.kestra.plugin.jdbc.clickhouse.cli; +package io.kestra.plugin.jdbc.clickhouse; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.runners.RunContext; @@ -12,26 +12,26 @@ import java.util.List; import java.util.Map; -import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @KestraTest -public class ClickHouseCLITest { +public class ClickHouseLocalCLITest { @Inject private RunContextFactory runContextFactory; @Test void run() throws Exception { - ClickHouseCLI clickHouseCLI = ClickHouseCLI.builder() + ClickHouseLocalCLI clickhouseLocalCLI = ClickHouseLocalCLI.builder() .id(IdUtils.create()) - .type(ClickHouseCLI.class.getName()) + .type(ClickHouseLocalCLI.class.getName()) .commands(List.of("SELECT * FROM system.tables")) .build(); - RunContext runContext = TestsUtils.mockRunContext(runContextFactory, clickHouseCLI, Map.of()); + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, clickhouseLocalCLI, Map.of()); - ScriptOutput output = clickHouseCLI.run(runContext); + ScriptOutput output = clickhouseLocalCLI.run(runContext); assertThat(output.getExitCode(), is(0)); }