Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqlite #292

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.tasks.PluginUtilsService;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AutoCommitInterface;
import io.micronaut.http.uri.UriBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;


Expand All @@ -37,23 +44,93 @@
" url: jdbc:sqlite:myfile.db",
" sql: select concert_id, available, a, b, c, d, play_time, library_record, floatn_test, double_test, real_test, numeric_test, date_type, time_type, timez_type, timestamp_type, timestampz_type, interval_type, pay_by_quarter, schedule, json_type, blob_type from pgsql_types",
" fetch: true",
"",
"- id: use-fetched-data",
" type: io.kestra.plugin.jdbc.sqlite.Query",
" url: jdbc:sqlite:myfile.db",
" sql: \"{% for row in outputs.update.rows %} INSERT INTO pl_store_distribute (year_month,store_code, update_date) values ({{row.play_time}}, {{row.concert_id}}, TO_TIMESTAMP('{{row.timestamp_type}}', 'YYYY-MM-DDTHH:MI:SS.US') ); {% endfor %}\""}
),
@Example(
full = true,
title = "Execute a query, using existing sqlite file, and pass the results to another task.",
code = {
"tasks:",
"- id: update",
" type: io.kestra.plugin.jdbc.sqlite.Query",
" url: jdbc:sqlite:myfile.db",
" sqliteFile: {{ outputs.get.outputFiles['myfile.sqlite'] }}",
" sql: select * from pgsql_types",
" fetch: true",
"",
"- id: use-fetched-data",
" type: io.kestra.plugin.jdbc.sqlite.Query",
" url: jdbc:sqlite:myfile.db",
" sqliteFile: {{ outputs.get.outputFiles['myfile.sqlite'] }}",
" sql: \"{% for row in outputs.update.rows %} INSERT INTO pl_store_distribute (year_month,store_code, update_date) values ({{row.play_time}}, {{row.concert_id}}, TO_TIMESTAMP('{{row.timestamp_type}}', 'YYYY-MM-DDTHH:MI:SS.US') ); {% endfor %}\""
}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output>, AutoCommitInterface {
protected final Boolean autoCommit = true;

@Schema(
title = "Add sqlite file.",
description = "The file must be from Kestra's internal storage"
)
@PluginProperty(dynamic = true)
protected String sqliteFile;

@Getter(AccessLevel.NONE)
protected transient Path workingDirectory;

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties properties = super.connectionProperties(runContext);

URI url = URI.create((String) properties.get("jdbc.url"));

// get file name from url scheme parts
String filename = url.getSchemeSpecificPart().split(":")[1];

Path path = runContext.resolve(Path.of(filename));
if (path.toFile().exists()) {
url = URI.create(path.toString());

UriBuilder builder = UriBuilder.of(url);

builder.scheme("jdbc:sqlite");

properties.put("jdbc.url", builder.build().toString());
}

return properties;
}

@Override
public AbstractJdbcQuery.Output run(RunContext runContext) throws Exception {
Properties properties = super.connectionProperties(runContext);

URI url = URI.create((String) properties.get("jdbc.url"));

this.workingDirectory = runContext.tempDir();

if (this.sqliteFile != null) {

// Get file name from url scheme parts, to be equally same as in connection url
String filename = url.getSchemeSpecificPart().split(":")[1];

PluginUtilsService.createInputFiles(
runContext,
workingDirectory,
Map.of(filename, this.sqliteFile),
additionalVars
);
}

return super.run(runContext);
}

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new SqliteCellConverter(zoneId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@
import org.apache.commons.codec.binary.Hex;
import org.junit.jupiter.api.Test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -61,6 +68,43 @@ void select() throws Exception {
assertThat(runOutput.getRow().get("blob_column"), is(Hex.decodeHex("0102030405060708".toCharArray())));
}

@Test
void selectFromExistingDatabase() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

URL resource = SqliteTest.class.getClassLoader().getResource("db/Chinook_Sqlite.sqlite");

URI input = storageInterface.put(
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
);

Query task = Query.builder()
.url("jdbc:sqlite:Chinook_Sqlite.sqlite")
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.timeZoneId("Europe/Paris")
.sqliteFile(input.toString())
.sql("""
SELECT Genre.Name, COUNT(InvoiceLine.InvoiceLineId) AS TracksPurchased
FROM Genre
JOIN Track ON Genre.GenreId = Track.GenreId
JOIN InvoiceLine ON Track.TrackId = InvoiceLine.TrackId
GROUP BY Genre.Name
ORDER BY TracksPurchased DESC
""")
.build();

AbstractJdbcQuery.Output runOutput = task.run(runContext);

assertThat(runOutput.getRow(), notNullValue());

assertThat(runOutput.getRow().get("Name"), is("Rock"));
assertThat(runOutput.getRow().get("TracksPurchased"), is(835));
}

@Test
void update() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());
Expand Down
Binary file not shown.
Loading