Skip to content

Commit

Permalink
dremio store variable fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iNikitaGricenko committed Oct 16, 2023
1 parent 81f9006 commit 35ff848
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 67 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AutoCommitInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.io.*;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
Expand All @@ -37,11 +40,15 @@
"sql: select * FROM source.database.table",
"fetchOne: true",
}
),
@Example(

)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output>, AutoCommitInterface {
protected final Boolean autoCommit = true;
private boolean store = false;

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
Expand All @@ -55,6 +62,25 @@ public void registerDriver() throws SQLException {

@Override
public Output run(RunContext runContext) throws Exception {
if (store) {
Output output = super.run(runContext);

File tempFile = runContext.tempFile(".ion").toFile();
saveIntoFile(output.getRows(), tempFile);

return Output.builder()
.size(output.getSize())
.uri(runContext.putTempFile(tempFile))
.build();
}
return super.run(runContext);
}

private void saveIntoFile(List<Map<String, Object>> rows, File tempFile) throws IOException {
try (OutputStream output = new FileOutputStream(tempFile)) {
rows.forEach(throwConsumer(row -> FileSerde.write(output, row)));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.fetch(this.isFetch())
.store(this.isStore())
.fetch(this.isStore() || this.isFetch())
.fetchOne(this.isFetchOne())
.additionalVars(this.additionalVars)
.build();
Expand Down

0 comments on commit 35ff848

Please sign in to comment.