Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several fixes and additions #153

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions target_clickhouse/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,52 @@ class ClickhouseConnector(SQLConnector):
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

def to_sql_type_array(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
"""Convert JSON Schema type to a SQL type.

Args:
jsonschema_type: The JSON Schema object.

Returns:
The SQL type.
"""
import typing as t

def nullabilizer(jsonschema_type: dict, type: sqlalchemy.types.TypeEngine):
if th._jsonschema_type_check(jsonschema_type, ("null",)):
return clickhouse_sqlalchemy_types.Nullable(type)
return type

if th._jsonschema_type_check(jsonschema_type, ("string",)):
datelike_type = th.get_datelike_property_type(jsonschema_type)
if datelike_type:
if datelike_type == "date-time":
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.DATETIME()))
if datelike_type in "time":
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.TIME()))
if datelike_type == "date":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to use the newly added Date32 Clickhouse SQLAlchemy type I added, as the Clickhouse Date type doesn't support date values before epoch I switched to using that type to be able to ingest dates in the past.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, great! Let me take a look at it and change it :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @BTheunissen , I had time now to check it and I think the last commit fixes it, using the Date32 added in clickhouse-sqlalchemy v0.3.2. Can you check it? Thanks!

return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.DATE()))

if th._jsonschema_type_check(jsonschema_type, ("integer",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.INTEGER()))
if th._jsonschema_type_check(jsonschema_type, ("number",)):
if "multipleOf" not in jsonschema_type: # default to float
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.FLOAT()))
default_precision = 10
scale = abs(jsonschema_type['multipleOf'].as_tuple().exponent)
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine,
clickhouse_sqlalchemy_types.Decimal(default_precision if default_precision >= scale else scale, scale)))
if th._jsonschema_type_check(jsonschema_type, ("boolean",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.BOOLEAN()))

if th._jsonschema_type_check(jsonschema_type, ("object",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()))
if th._jsonschema_type_check(jsonschema_type, ("array",)):
return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine,
clickhouse_sqlalchemy_types.Array(self.to_sql_type_array(jsonschema_type["items"]))))

return nullabilizer(jsonschema_type, t.cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()))

def get_sqlalchemy_url(self, config: dict) -> str:
"""Generates a SQLAlchemy URL for clickhouse.

Expand Down Expand Up @@ -88,7 +134,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
Returns:
The SQLAlchemy type representation of the data type.
"""
sql_type = th.to_sql_type(jsonschema_type)
sql_type = self.to_sql_type_array(jsonschema_type)

# Clickhouse does not support the DECIMAL type without providing precision,
# so we need to use the FLOAT type.
Expand All @@ -100,14 +146,6 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
sql_type = typing.cast(
sqlalchemy.types.TypeEngine, clickhouse_sqlalchemy_types.Int64(),
)
# All date and time types should be flagged as Nullable to allow for NULL value.
elif type(sql_type) in [
sqlalchemy.types.DATE,
sqlalchemy.types.TIMESTAMP,
sqlalchemy.types.TIME,
sqlalchemy.types.DATETIME,
]:
sql_type = clickhouse_sqlalchemy_types.Nullable(sql_type)

return sql_type

Expand Down
11 changes: 11 additions & 0 deletions target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class ClickhouseSink(SQLSink):

# Investigate larger batch sizes without OOM.
MAX_SIZE_DEFAULT = 10000
def conform_name(
self,
name: str,
object_type: str | None = None, # noqa: ARG002
) -> str:
return name

@property
def max_size(self) -> int:
Expand Down Expand Up @@ -92,6 +98,8 @@ def bulk_insert_records(
for key, value in record.items():
if isinstance(value, (dict, list)):
record[key] = json.dumps(value)
if isinstance(value, list):
record[key] = str(value)

res = super().bulk_insert_records(full_table_name, schema, records)

Expand All @@ -111,6 +119,9 @@ def activate_version(self, new_version: int) -> None:
"""
# There's nothing to do if the table doesn't exist yet
# (which it won't the first time the stream is processed)
if not self.config.get("add_record_metadata", True):
return

if not self.connector.table_exists(self.full_table_name):
return

Expand Down
18 changes: 18 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3'

services:
clickhouse-server:
image: "clickhouse/clickhouse-server:22.5.1.2079"
container_name: test-clickhouse-server
environment:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD:
CLICKHOUSE_DB: default
volumes:
- type: bind
source: ./integration-db
target: /docker-entrypoint-initdb.d
ports:
- "127.0.0.1:9000:9000"
- "127.0.0.1:8123:8123"

10 changes: 10 additions & 0 deletions tests/integration-db/initdb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE simple_table (
id INTEGER,
updated_at DATE,
name VARCHAR
)
ENGINE = MergeTree()
PRIMARY KEY id;

INSERT INTO simple_table VALUES (1, '2023-10-22 10:00:00', 'test1');
INSERT INTO simple_table VALUES (2, '2023-10-22 11:00:00', 'test3');
Loading