Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add queries to clickhouse #419

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.jdbc.clickhouse;

import com.clickhouse.data.value.UnsignedLong;
import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.net.Inet4Address;
Expand Down Expand Up @@ -85,6 +86,11 @@ public Object convertCell(int columnIndex, ResultSet rs, Connection connection)
return col.toString().substring(1);
}

if (columnTypeName.equals("UInt64")) {
UnsignedLong col = (UnsignedLong) columnVal;
return col.longValue();
}

return super.convert(columnIndex, rs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.kestra.plugin.jdbc.clickhouse;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on a Clickhouse database."
)
@Plugin(
examples = {
@Example(
title = "Queries on a Clickhouse database.",
full = true,
code = """
id: clickhouse_queries
namespace: company.team

tasks:
- id: queries
type: io.kestra.plugin.jdbc.clickhouse.Queries
url: jdbc:clickhouse://127.0.0.1:56982/
username: ch_user
password: ch_password
sql: select * from employee; select * from laptop;
fetchType: STORE
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ClickHouseCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.clickhouse.jdbc.ClickHouseDriver());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package io.kestra.plugin.jdbc.clickhouse;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.*;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static io.kestra.core.models.tasks.common.FetchType.FETCH;
import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

@KestraTest
public class ClickHouseQueriesTest extends AbstractRdbmsTest {

@Test
void testMultiSelectWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

Map<String, Object> parameters = Map.of(
"age", 40,
"brand", "Apple",
"cpu_frequency", 1.5
);

Queries taskGet = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
SELECT firstName, lastName, age FROM employee where age > :age and age < :age + 10;
SELECT brand, model FROM laptop where brand = :brand and cpu_frequency > :cpu_frequency;
""")
.parameters(Property.of(parameters))
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = taskGet.run(runContext);
assertThat(runOutput.getOutputs().size(), is(2));

List<Map<String, Object>> employees = runOutput.getOutputs().getFirst().getRows();
assertThat("employees", employees, notNullValue());
assertThat("employees", employees.size(), is(1));
assertThat("employee selected", employees.getFirst().get("age"), is(45));
assertThat("employee selected", employees.getFirst().get("firstName"), is("John"));
assertThat("employee selected", employees.getFirst().get("lastName"), is("Doe"));

List<Map<String, Object>>laptops = runOutput.getOutputs().getLast().getRows();
assertThat("laptops", laptops, notNullValue());
assertThat("laptops", laptops.size(), is(1));
assertThat("selected laptop", laptops.getFirst().get("brand"), is("Apple"));
}

@Disabled("Test for rollback disabled : Savepoint and rollback are not supported by ClickHouse")
@Test
void testRollback() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

//Queries should pass in a transaction
Queries queriesPass = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
DROP TABLE IF EXISTS test_transaction;

CREATE TABLE test_transaction(id Int64)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;

INSERT INTO test_transaction (id) VALUES (1);
SELECT COUNT(id) as transaction_count FROM test_transaction;
""")
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = queriesPass.run(runContext);
assertThat(runOutput.getOutputs().size(), is(1));
assertThat(runOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1L));

//Queries should fail due to bad sql
Queries insertsFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
INSERT INTO test_transaction (id) VALUES (2);
INSERT INTO test_transaction (id) VALUES ('random');
""") //Try inserting before failing
.build();

assertThrows(Exception.class, () -> insertsFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
SELECT * FROM test_transaction;
""") //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
assertThat(verifyOutput.getOutputs().size(), is(1));
assertThat(verifyOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1L));
}

@Test
void testNonTransactionalShouldNotRollback() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

//Queries should pass in a transaction
Queries insertOneAndFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.transaction(Property.of(false))
.timeZoneId("Europe/Paris")
.sql("""
DROP TABLE IF EXISTS test_transaction;

CREATE TABLE test_transaction(id Int64)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;

INSERT INTO test_transaction (id) VALUES (1);
INSERT INTO test_transaction (id) VALUES ('random');
INSERT INTO test_transaction (id) VALUES (2);
""")
.build();

assertThrows(Exception.class, () -> insertOneAndFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
""") //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
assertThat(verifyOutput.getOutputs().size(), is(1));
assertThat(verifyOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1L));
}

@Override
protected String getUrl() {
return "jdbc:clickhouse://127.0.0.1:28123/default";
}

@Override
protected void initDatabase() throws SQLException, FileNotFoundException, URISyntaxException {
executeSqlScript("scripts/clickhouse_queries.sql");
}

@Override
@BeforeEach
public void init() throws IOException, URISyntaxException, SQLException {
initDatabase();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CREATE DATABASE IF NOT EXISTS kestra;

-- Create table employee
DROP TABLE IF EXISTS employee;

CREATE TABLE employee (
firstName String,
lastName String,
age Int8,
employee_id Int64
)
ENGINE = MergeTree()
ORDER BY (employee_id)
SETTINGS index_granularity = 8192;

INSERT INTO employee (firstName, lastName, age, employee_id)
VALUES
('John', 'Doe', 45, 1),
('Bryan', 'Grant', 33, 2),
('Jude', 'Philips', 25, 3),
('Michael', 'Page', 62, 4);

-- Create table laptop
DROP TABLE IF EXISTS laptop;

CREATE TABLE laptop
(
brand String,
model String,
cpu_frequency Decimal(3, 2),
laptop_id Int64
) ENGINE = MergeTree()
ORDER BY (laptop_id)
SETTINGS index_granularity = 8192;

INSERT INTO laptop (brand, model, cpu_frequency, laptop_id)
VALUES
('Apple', 'MacBookPro M1 13', 2.20, 1),
('Apple', 'MacBookPro M3 16', 1.50, 2),
('LG', 'Gram', 1.95, 3),
('Lenovo', 'ThinkPad', 1.05, 4);