Skip to content

Commit

Permalink
feat: Mysql - Batch implementation (#293)
Browse files Browse the repository at this point in the history
* Add test for query with aliases

* #109

* #124 Implemented batch

* Fixed flow issue. Changed datetime type
  • Loading branch information
iNikitaGricenko authored Apr 29, 2024
1 parent eeaa34a commit c427086
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.kestra.plugin.jdbc.mysql;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;
import io.micronaut.http.uri.UriBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.net.URI;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Properties;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Execute a batch query to a MySQL server."
)
@Plugin(
examples = {
@Example(
title = "Fetch rows from a table, and bulk insert them to another one.",
full = true,
code = {
"tasks:",
" - id: query",
" type: io.kestra.plugin.jdbc.mysql.Query",
" url: jdbc:mysql://127.0.0.1:3306/",
" username: mysql_user",
" password: mysql_passwd",
" sql: |",
" SELECT *",
" FROM xref",
" LIMIT 1500;",
" store: true",
" - id: update",
" type: io.kestra.plugin.jdbc.mysql.Batch",
" from: \"{{ outputs.query.uri }}\"",
" url: jdbc:mysql://127.0.0.1:3306/",
" username: mysql_user",
" password: mysql_passwd",
" sql: |",
" insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )",
}
)
}
)
public class Batch extends AbstractJdbcBatch implements RunnableTask<AbstractJdbcBatch.Output> {

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new MysqlCellConverter(zoneId);
}

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties props = super.connectionProperties(runContext);

URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url);

builder.scheme("jdbc:mysql");

props.put("generateSimpleParameterMetadata", "true");

props.put("jdbc.url", builder.build().toString());

return props;
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import com.mysql.cj.jdbc.result.ResultSetImpl;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.*;
import java.time.*;
import java.util.Calendar;
import java.util.Collection;
import java.util.TimeZone;
import java.util.UUID;

public class MysqlCellConverter extends AbstractCellConverter {
public MysqlCellConverter(ZoneId zoneId) {
Expand All @@ -23,14 +28,76 @@ public Object convertCell(int columnIndex, ResultSet rs, Connection connection)

String columnTypeName = rs.getMetaData().getColumnTypeName(columnIndex);

switch (columnTypeName.toLowerCase()) {
case "time":
return ((ResultSetImpl) rs).getLocalTime(columnIndex);
case "datetime":
case "timestamp":
return rs.getTimestamp(columnIndex).toInstant();
return switch (columnTypeName.toLowerCase()) {
case "time" -> ((ResultSetImpl) rs).getLocalTime(columnIndex);
case "datetime" -> rs.getTimestamp(columnIndex).toLocalDateTime();
case "timestamp" -> ((ResultSetImpl) rs).getLocalDateTime(columnIndex).toInstant(ZoneOffset.UTC);
default -> super.convert(columnIndex, rs);
};

}

@Override
public PreparedStatement addPreparedStatementValue(PreparedStatement ps, AbstractJdbcBatch.ParameterType parameterType, Object value, int index, Connection connection) throws Exception {
if (value == null) {
ps.setNull(index, parameterType.getType(index));
return ps;
}

return super.convert(columnIndex, rs);
Class<?> cls = value.getClass();

try {
switch (cls.getSimpleName()) {
case "Integer", "int" -> ps.setInt(index, (Integer) value);
case "Short", "short" -> ps.setShort(index, Short.parseShort(value.toString()));
case "String" -> ps.setString(index, (String) value);
case "UUID" -> ps.setObject(index, value);
case "Long", "long", "BigInteger" -> ps.setLong(index, ((Number) value).longValue());
case "Double", "double" -> ps.setDouble(index, (Double) value);
case "Float", "float" -> ps.setFloat(index, (Float) value);
case "BigDecimal" -> ps.setBigDecimal(index, (BigDecimal) value);
case "LocalDate" -> ps.setDate(index, Date.valueOf((LocalDate) value));
case "LocalTime" -> ps.setTime(index, Time.valueOf((LocalTime) value));
case "OffsetTime" -> {
OffsetTime current = (OffsetTime) value;
ps.setTime(index, Time.valueOf(current.toLocalTime()), Calendar.getInstance(TimeZone.getTimeZone(current.getOffset())));
}
case "Instant" -> {
Instant current = (Instant) value;
ps.setTime(index, Time.valueOf(LocalTime.from(current.atZone(ZoneId.systemDefault()))));
}
case "LocalDateTime" -> ps.setTimestamp(index, Timestamp.valueOf((LocalDateTime) value));
case "ZonedDateTime" -> {
ZonedDateTime current = (ZonedDateTime) value;
ps.setTimestamp(index, Timestamp.valueOf(current.toLocalDateTime()), Calendar.getInstance(TimeZone.getTimeZone(current.getZone())));
}
case "OffsetDateTime" -> {
OffsetDateTime current = (OffsetDateTime) value;
ps.setTimestamp(index, Timestamp.valueOf(current.toLocalDateTime()), Calendar.getInstance(TimeZone.getTimeZone(current.toZonedDateTime().getZone())));
}
case "Boolean", "boolean" -> ps.setBoolean(index, (Boolean) value);
case "[B", "byte[]" -> ps.setBytes(index, (byte[]) value);
default -> {
if (Blob.class.isAssignableFrom(cls)) {
Blob blob = connection.createBlob();
if (value instanceof byte[]) {
blob.setBytes(1, (byte[]) value);
}
ps.setBlob(index, blob);
} else if (Clob.class.isAssignableFrom(cls)) {
Clob clob = connection.createClob();
clob.setString(1, (String) value);
ps.setClob(index, clob);
} else if (NClob.class.isAssignableFrom(cls)) {
NClob nclob = connection.createNClob();
nclob.setString(1, (String) value);
ps.setNClob(index, nclob);
}
}
}
return ps;
} catch (SQLException e) {
throw addPreparedStatementException(parameterType, index, value, e);
}
}
}
Loading

0 comments on commit c427086

Please sign in to comment.