Skip to content

Commit

Permalink
Merge pull request #1 from gallejesus/fix-arrays
Browse files Browse the repository at this point in the history
Fix arrays
  • Loading branch information
gallejesus authored Feb 26, 2024
2 parents 278d94c + 045da20 commit ba343ba
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 8 deletions.
53 changes: 50 additions & 3 deletions target_clickhouse/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import contextlib
import typing
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional, Union

import sqlalchemy.types
from clickhouse_sqlalchemy import (
Expand All @@ -11,10 +11,12 @@
from clickhouse_sqlalchemy import (
types as clickhouse_sqlalchemy_types,
)

from pkg_resources import get_distribution, parse_version
from singer_sdk import typing as th
from singer_sdk.connectors import SQLConnector
from sqlalchemy import Column, MetaData, create_engine
import copy

from target_clickhouse.engine_class import SupportedEngines, create_engine_wrapper

Expand All @@ -35,6 +37,51 @@ class ClickhouseConnector(SQLConnector):
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

def to_sql_type_array(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
"""Convert JSON Schema type to a SQL type.
Args:
jsonschema_type: The JSON Schema object.
Returns:
The SQL type.
"""
import typing as t

def nullabilizer(jsonschema_type: dict, type: sqlalchemy.types.TypeEngine):
if th._jsonschema_type_check(jsonschema_type, ("null",)):
return clickhouse_sqlalchemy_types.Nullable(type)
return type

if th._jsonschema_type_check(jsonschema_type, ("string",)):
datelike_type = th.get_datelike_property_type(jsonschema_type)
if datelike_type:
if datelike_type == "date-time":
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.DATETIME()))
if datelike_type in "time":
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.TIME()))
if datelike_type == "date":
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.DATE()))

if th._jsonschema_type_check(jsonschema_type, ("integer",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.INTEGER()))
if th._jsonschema_type_check(jsonschema_type, ("number",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.DECIMAL()))
if th._jsonschema_type_check(jsonschema_type, ("boolean",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.BOOLEAN()))

if th._jsonschema_type_check(jsonschema_type, ("object",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()))
if th._jsonschema_type_check(jsonschema_type, ("array",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine,
clickhouse_sqlalchemy_types.Array(self.to_sql_type_array(jsonschema_type["items"]))))

return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()))





def get_sqlalchemy_url(self, config: dict) -> str:
"""Generates a SQLAlchemy URL for clickhouse.
Expand Down Expand Up @@ -88,7 +135,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
Returns:
The SQLAlchemy type representation of the data type.
"""
sql_type = th.to_sql_type(jsonschema_type)
sql_type = self.to_sql_type_array(jsonschema_type)

# Clickhouse does not support the DECIMAL type without providing precision,
# so we need to use the FLOAT type.
Expand All @@ -99,7 +146,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
elif type(sql_type) == sqlalchemy.types.INTEGER:
sql_type = typing.cast(
sqlalchemy.types.TypeEngine, clickhouse_sqlalchemy_types.Int64(),
)
)

return sql_type

Expand Down
8 changes: 8 additions & 0 deletions target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class ClickhouseSink(SQLSink):

# Investigate larger batch sizes without OOM.
MAX_SIZE_DEFAULT = 10000
def conform_name(
self,
name: str,
object_type: str | None = None, # noqa: ARG002
) -> str:
return name

@property
def max_size(self) -> int:
Expand Down Expand Up @@ -85,6 +91,8 @@ def bulk_insert_records(
for key, value in record.items():
if isinstance(value, (dict, list)):
record[key] = json.dumps(value)
if isinstance(value, list):
record[key] = str(value)

res = super().bulk_insert_records(full_table_name, schema, records)

Expand Down
18 changes: 18 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3'

services:
clickhouse-server:
image: "clickhouse/clickhouse-server:22.5.1.2079"
container_name: test-clickhouse-server
environment:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD:
CLICKHOUSE_DB: default
volumes:
- type: bind
source: ./integration-db
target: /docker-entrypoint-initdb.d
ports:
- "127.0.0.1:9000:9000"
- "127.0.0.1:8123:8123"

10 changes: 10 additions & 0 deletions tests/integration-db/initdb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE simple_table (
id INTEGER,
updated_at DATE,
name VARCHAR
)
ENGINE = MergeTree()
PRIMARY KEY id;

INSERT INTO simple_table VALUES (1, '2023-10-22 10:00:00', 'test1');
INSERT INTO simple_table VALUES (2, '2023-10-22 11:00:00', 'test3');
Loading

0 comments on commit ba343ba

Please sign in to comment.