Skip to content

Commit

Permalink
Add Sqlite jdbc sub-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
iNikitaGricenko committed Oct 30, 2023
1 parent 2587e19 commit 9f4bd16
Show file tree
Hide file tree
Showing 14 changed files with 474 additions and 0 deletions.
23 changes: 23 additions & 0 deletions plugin-jdbc-sqlite/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
project.description = 'Access SQLite databases using Kestra\'s JDBC plugin integration.'

jar {
manifest {
attributes(
"X-Kestra-Name": project.name,
"X-Kestra-Title": "SQLite",
"X-Kestra-Group": project.group + ".jdbc.sqlite",
"X-Kestra-Description": project.description,
"X-Kestra-Version": project.version
)
}
}

dependencies {
jdbcDriver 'org.xerial:sqlite-jdbc:3.42.0.0'
implementation project(':plugin-jdbc')
api 'org.bouncycastle:bcprov-jdk15on:1.70'
api 'org.bouncycastle:bcpkix-jdk15on:1.70'
api 'name.neuhalfen.projects.crypto.bouncycastle.openpgp:bouncy-gpg:2.3.0'

testImplementation project(':plugin-jdbc').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AutoCommitInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Properties;


@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a SQLite"
)
@Plugin(
examples = {
@Example(
full = true,
title = "Execute a query and fetch results on another task to update another table",
code = {
"tasks:",
"- id: update",
" type: io.kestra.plugin.jdbc.sqlite.Query",
" url: jdbc:sqlite:myfile:db",
" sql: select concert_id, available, a, b, c, d, play_time, library_record, floatn_test, double_test, real_test, numeric_test, date_type, time_type, timez_type, timestamp_type, timestampz_type, interval_type, pay_by_quarter, schedule, json_type, blob_type from pgsql_types",
" fetch: true",
"- id: use-fetched-data",
" type: io.kestra.plugin.jdbc.sqlite.Query",
" url: jdbc:sqlite:myfile:db",
" sql: \"{% for row in outputs.update.rows %} INSERT INTO pl_store_distribute (year_month,store_code, update_date) values ({{row.play_time}}, {{row.concert_id}}, TO_TIMESTAMP('{{row.timestamp_type}}', 'YYYY-MM-DDTHH:MI:SS.US') ); {% endfor %}\""}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output>, AutoCommitInterface {
protected final Boolean autoCommit = true;

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties properties = super.connectionProperties(runContext);

return properties;
}

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new SqliteCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new org.sqlite.JDBC());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;

public class SqliteCellConverter extends AbstractCellConverter {

public SqliteCellConverter(ZoneId zoneId) {
super(zoneId);
}

@Override
public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException {
Object data = resultSet.getObject(columnIndex);

if (data == null) {
return null;
}

String columnTypeName = resultSet.getMetaData().getColumnTypeName(columnIndex);

return switch (columnTypeName.toLowerCase()) {
case "date" -> LocalDate.parse(resultSet.getString(columnIndex));
case "datetime", "timestamp" -> resultSet.getTimestamp(columnIndex).toInstant();
case "time" -> LocalTime.parse(resultSet.getString(columnIndex));
default -> super.convert(columnIndex, resultSet);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Wait for query on a SQLite database."
)
@Plugin(
examples = {
@Example(
title = "Wait for a sql query to return results and iterate through rows",
full = true,
code = {
"id: jdbc-trigger",
"namespace: io.kestra.tests",
"",
"tasks:",
" - id: each",
" type: io.kestra.core.tasks.flows.EachSequential",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{json(taskrun.value)}}\"",
" value: \"{{ trigger.rows }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.jdbc.sqlite.Trigger",
" interval: \"PT5M\"",
" sql: \"SELECT * FROM my_table\""
}
)
}
)
public class Trigger extends AbstractJdbcTrigger {

@Override
protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
var query = Query.builder()
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
.username(this.getUsername())
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.additionalVars(this.additionalVars)
.build();
return query.run(runContext);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new org.sqlite.JDBC());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@PluginSubGroup(
description = "This sub-group of plugins contains tasks for accessing the SQLite database.",
categories = PluginSubGroup.PluginCategory.DATABASE
)
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.core.models.annotations.PluginSubGroup;
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.plugin.jdbc.AbstractJdbcDriverTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;

import java.sql.Driver;

@MicronautTest
class SqliteDriverTest extends AbstractJdbcDriverTest {

@Override
protected Class<? extends Driver> getDriverClass() {
return org.sqlite.JDBC.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.kestra.plugin.jdbc.sqlite;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.apache.commons.codec.binary.Hex;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Properties;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@MicronautTest
public class SqliteTest extends AbstractRdbmsTest {
@Test
void select() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Query task = Query.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.timeZoneId("Europe/Paris")
.sql("select * from lite_types")
.build();

AbstractJdbcQuery.Output runOutput = task.run(runContext);
assertThat(runOutput.getRow(), notNullValue());

assertThat(runOutput.getRow().get("id"), is(1));

// May be boolean
assertThat(runOutput.getRow().get("boolean_column"), is(1));
assertThat(runOutput.getRow().get("text_column"), is("Sample Text"));
assertThat(runOutput.getRow().get("d"), nullValue());

assertThat(runOutput.getRow().get("float_column"), is(3.14));
assertThat(runOutput.getRow().get("double_column"), is(3.14159265359d));
assertThat(runOutput.getRow().get("int_column"), is(42));

assertThat(runOutput.getRow().get("date_column"), is(LocalDate.parse("2023-10-30")));
assertThat(runOutput.getRow().get("datetime_column"), is(Instant.parse("2023-10-30T21:02:27.150Z")));
assertThat(runOutput.getRow().get("time_column"), is(LocalTime.parse("14:30:00")));
assertThat(runOutput.getRow().get("timestamp_column"), is(Instant.parse("2023-10-30T12:30:00.0Z")));
assertThat(runOutput.getRow().get("year_column"), is(2023));

assertThat(runOutput.getRow().get("json_column"), is("{\"key\": \"value\"}"));
assertThat(runOutput.getRow().get("blob_column"), is(Hex.decodeHex("0102030405060708".toCharArray())));
}

@Test
void update() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Query taskUpdate = Query.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.timeZoneId("Europe/Paris")
.sql("update lite_types set d = 'D'")
.build();

taskUpdate.run(runContext);

Query taskGet = Query.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.timeZoneId("Europe/Paris")
.sql("select d from lite_types")
.build();

AbstractJdbcQuery.Output runOutput = taskGet.run(runContext);
assertThat(runOutput.getRow(), notNullValue());
assertThat(runOutput.getRow().get("d"), is("D"));
}

@Override
protected String getUrl() {
return TestUtils.url();
}

@Override
protected String getUsername() {
return TestUtils.username();
}

@Override
protected String getPassword() {
return TestUtils.password();
}

protected Connection getConnection() throws SQLException {
Properties props = new Properties();
props.put("jdbc.url", getUrl());
props.put("user", getUsername());
props.put("password", getPassword());

return DriverManager.getConnection(props.getProperty("jdbc.url"), props);
}

@Override
protected void initDatabase() throws SQLException, FileNotFoundException, URISyntaxException {
executeSqlScript("scripts/sqlite.sql");
}
}
Loading

0 comments on commit 9f4bd16

Please sign in to comment.