From 971a259515297bb4de6f3148579211c6f21ec5f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Thu, 4 Apr 2024 17:39:00 +0200 Subject: [PATCH] feat: AS400 plugin Fixes #248 --- plugin-jdbc-as400/build.gradle | 20 +++++ .../plugin/jdbc/db2/As400CellConverter.java | 41 +++++++++ .../java/io/kestra/plugin/jdbc/db2/Query.java | 72 ++++++++++++++++ .../io/kestra/plugin/jdbc/db2/Trigger.java | 83 +++++++++++++++++++ .../kestra/plugin/jdbc/db2/package-info.java | 7 ++ .../plugin/jdbc/db2/As400DriverTest.java | 15 ++++ settings.gradle | 2 +- 7 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 plugin-jdbc-as400/build.gradle create mode 100644 plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/As400CellConverter.java create mode 100644 plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Query.java create mode 100644 plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Trigger.java create mode 100644 plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/package-info.java create mode 100644 plugin-jdbc-as400/src/test/java/io/kestra/plugin/jdbc/db2/As400DriverTest.java diff --git a/plugin-jdbc-as400/build.gradle b/plugin-jdbc-as400/build.gradle new file mode 100644 index 00000000..f7d9bc4b --- /dev/null +++ b/plugin-jdbc-as400/build.gradle @@ -0,0 +1,20 @@ +project.description = 'Query AS400 databases using the Kestra JDBC plugin.' + +jar { + manifest { + attributes( + "X-Kestra-Name": project.name, + "X-Kestra-Title": "AS400", + "X-Kestra-Group": project.group + ".jdbc.as400", + "X-Kestra-Description": project.description, + "X-Kestra-Version": project.version + ) + } +} + +dependencies { + implementation 'net.sf.jt400:jt400:20.0.7' + implementation project(':plugin-jdbc') + + testImplementation project(':plugin-jdbc').sourceSets.test.output +} diff --git a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/As400CellConverter.java b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/As400CellConverter.java new file mode 100644 index 00000000..a3650e2a --- /dev/null +++ b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/As400CellConverter.java @@ -0,0 +1,41 @@ +package io.kestra.plugin.jdbc.db2; + +import io.kestra.plugin.jdbc.AbstractCellConverter; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.ZoneId; + +/** + * Copied from the DB2 code as we cannot test AS400 we assume it works like DB2 + */ +public class As400CellConverter extends AbstractCellConverter { + public As400CellConverter(ZoneId zoneId) { + super(zoneId); + } + + @Override + public Object convertCell(int columnIndex, ResultSet rs, Connection connection) throws SQLException { + Object data = rs.getObject(columnIndex); + + if (data == null) { + return null; + } + + String columnTypeName = rs.getMetaData().getColumnTypeName(columnIndex); + + return switch (columnTypeName.toLowerCase()) { + case "char", "varchar" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getString(columnIndex); + case "date" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getDate(columnIndex).toLocalDate(); + case "time" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getTime(columnIndex).toLocalTime(); + case "timestamp" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getTimestamp(columnIndex).toInstant(); + case "blob" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getBlob(columnIndex); + case "clob" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getClob(columnIndex); + case "nclob" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getNClob(columnIndex); + case "xml" -> ((com.ibm.as400.access.AS400JDBCResultSet) rs).getSQLXML(columnIndex); + default -> super.convert(columnIndex, rs); + }; + + } +} diff --git a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Query.java b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Query.java new file mode 100644 index 00000000..1714d9be --- /dev/null +++ b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Query.java @@ -0,0 +1,72 @@ +package io.kestra.plugin.jdbc.db2; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractCellConverter; +import io.kestra.plugin.jdbc.AbstractJdbcQuery; +import io.kestra.plugin.jdbc.AutoCommitInterface; +import io.micronaut.http.uri.UriBuilder; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.net.URI; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.Properties; + +/** + * Copied from the DB2 code as we cannot test AS400 we assume it works like DB2 + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Query a AS400 database." +) +@Plugin( + examples = { + @Example( + title = "Send a SQL query to a AS400 Database and fetch a row as output.", + code = { + "url: jdbc:as400://127.0.0.1:50000/", + "username: as400_user", + "password: as400_passwd", + "sql: select * from as400_types", + "fetchOne: true", + } + ) + } +) +public class Query extends AbstractJdbcQuery implements RunnableTask, AutoCommitInterface { + protected final Boolean autoCommit = true; + + @Override + protected AbstractCellConverter getCellConverter(ZoneId zoneId) { + return new As400CellConverter(zoneId); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new com.ibm.as400.access.AS400JDBCDriver()); + } + + @Override + public Properties connectionProperties(RunContext runContext) throws Exception { + Properties props = super.connectionProperties(runContext); + + URI url = URI.create((String) props.get("jdbc.url")); + url = URI.create(url.getSchemeSpecificPart()); + + UriBuilder builder = UriBuilder.of(url).scheme("jdbc:as400"); + + props.put("jdbc.url", builder.build().toString()); + + return props; + } +} diff --git a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Trigger.java b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Trigger.java new file mode 100644 index 00000000..63dd05b7 --- /dev/null +++ b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/Trigger.java @@ -0,0 +1,83 @@ +package io.kestra.plugin.jdbc.db2; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractJdbcQuery; +import io.kestra.plugin.jdbc.AbstractJdbcTrigger; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * Copied from the DB2 code as we cannot test AS400 we assume it works like DB2 + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Wait for query on a AS400 database." +) +@Plugin( + examples = { + @Example( + title = "Wait for a SQL query to return results, and then iterate through rows.", + full = true, + code = { + "id: jdbc-trigger", + "namespace: io.kestra.tests", + "", + "tasks:", + " - id: each", + " type: io.kestra.core.tasks.flows.EachSequential", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{ json(taskrun.value) }}\"", + " value: \"{{ trigger.rows }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.jdbc.as400.Trigger", + " interval: \"PT5M\"", + " url: jdbc:as400://127.0.0.1:50000/", + " username: as400_user", + " password: as400_passwd", + " sql: \"SELECT * FROM my_table\"", + " fetch: true", + } + ) + } +) +public class Trigger extends AbstractJdbcTrigger { + + @Override + protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception { + + var query = Query.builder() + .id(this.id) + .type(Query.class.getName()) + .url(this.getUrl()) + .username(this.getUsername()) + .password(this.getPassword()) + .timeZoneId(this.getTimeZoneId()) + .sql(this.getSql()) + .fetch(this.isFetch()) + .store(this.isStore()) + .fetchOne(this.isFetchOne()) + .additionalVars(this.additionalVars) + .build(); + + return query.run(runContext); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new com.ibm.as400.access.AS400JDBCDriver()); + } +} diff --git a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/package-info.java b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/package-info.java new file mode 100644 index 00000000..6ad255ae --- /dev/null +++ b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/db2/package-info.java @@ -0,0 +1,7 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for accessing the AS400 database.", + categories = PluginSubGroup.PluginCategory.DATABASE +) +package io.kestra.plugin.jdbc.db2; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/plugin-jdbc-as400/src/test/java/io/kestra/plugin/jdbc/db2/As400DriverTest.java b/plugin-jdbc-as400/src/test/java/io/kestra/plugin/jdbc/db2/As400DriverTest.java new file mode 100644 index 00000000..5415c544 --- /dev/null +++ b/plugin-jdbc-as400/src/test/java/io/kestra/plugin/jdbc/db2/As400DriverTest.java @@ -0,0 +1,15 @@ +package io.kestra.plugin.jdbc.db2; + +import io.kestra.plugin.jdbc.AbstractJdbcDriverTest; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; + +import java.sql.Driver; + +@MicronautTest +public class As400DriverTest extends AbstractJdbcDriverTest { + + @Override + protected Class getDriverClass() { + return com.ibm.as400.access.AS400JDBCDriver.class; + } +} diff --git a/settings.gradle b/settings.gradle index b1baa53d..43918740 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,4 +19,4 @@ include 'plugin-jdbc-dremio' include 'plugin-jdbc-arrow-flight' include 'plugin-jdbc-sqlite' include 'plugin-jdbc-db2' - +include 'plugin-jdbc-as400'