From f436b75b26ddbbc383b846c8377a45ee234c5fba Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 29 Jan 2025 15:41:37 +0100 Subject: [PATCH] chore(data-warehouse): Remove old sql source and rename the new one (#27985) --- mypy-baseline.txt | 65 ++- .../pipelines/sql_database/__init__.py | 306 +++++++---- .../_json.py | 0 .../arrow_helpers.py | 0 .../pipelines/sql_database/helpers.py | 279 ++++++++--- .../schema_types.py | 2 +- .../pipelines/sql_database/settings.py | 4 +- .../test/test_arrow_helpers.py | 2 +- .../sql_database/test/test_sql_database.py | 45 -- .../pipelines/sql_database_v2/__init__.py | 473 ------------------ .../pipelines/sql_database_v2/helpers.py | 312 ------------ .../pipelines/sql_database_v2/settings.py | 1 - .../workflow_activities/import_data_sync.py | 10 +- .../tests/batch_exports/test_import_data.py | 14 +- 14 files changed, 462 insertions(+), 1051 deletions(-) rename posthog/temporal/data_imports/pipelines/{sql_database_v2 => sql_database}/_json.py (100%) rename posthog/temporal/data_imports/pipelines/{sql_database_v2 => sql_database}/arrow_helpers.py (100%) rename posthog/temporal/data_imports/pipelines/{sql_database_v2 => sql_database}/schema_types.py (98%) rename posthog/temporal/data_imports/pipelines/{sql_database_v2 => sql_database}/test/test_arrow_helpers.py (89%) delete mode 100644 posthog/temporal/data_imports/pipelines/sql_database/test/test_sql_database.py delete mode 100644 posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py delete mode 100644 posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py delete mode 100644 posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 5bc01c51f956d..433902cd00910 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -1,21 +1,20 @@ posthog/warehouse/models/ssh_tunnel.py:0: error: Incompatible types in assignment (expression has type "NoEncryption", variable has type "BestAvailableEncryption") [assignment] -posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py:0: error: Statement is unreachable [unreachable] -posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py:0: error: Non-overlapping equality check (left operand type: "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'json', 'decimal', 'wei', 'date', 'time'] | None", right operand type: "Literal['interval']") [comparison-overlap] -posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, ndarray[Any, dtype[Any]]]"; expected type "str" [index] -posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, ndarray[Any, dtype[Any]]]"; expected type "str" [index] -posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, TColumnSchema]"; expected type "str" [index] +posthog/temporal/data_imports/pipelines/sql_database/schema_types.py:0: error: Statement is unreachable [unreachable] +posthog/temporal/data_imports/pipelines/sql_database/schema_types.py:0: error: Non-overlapping equality check (left operand type: "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'json', 'decimal', 'wei', 'date', 'time'] | None", right operand type: "Literal['interval']") [comparison-overlap] +posthog/temporal/data_imports/pipelines/sql_database/arrow_helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, ndarray[Any, dtype[Any]]]"; expected type "str" [index] +posthog/temporal/data_imports/pipelines/sql_database/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, ndarray[Any, dtype[Any]]]"; expected type "str" [index] +posthog/temporal/data_imports/pipelines/sql_database/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, TColumnSchema]"; expected type "str" [index] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Item "None" of "Incremental[Any] | None" has no attribute "row_order" [union-attr] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Incompatible types in assignment (expression has type "Literal['asc', 'desc'] | Any | None", variable has type "Literal['asc', 'desc']") [assignment] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "Column[Any]") [assignment] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "Literal['asc', 'desc']") [assignment] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Item "None" of "dict[str, Any] | None" has no attribute "get" [union-attr] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Argument "primary_key" to "make_hints" has incompatible type "list[str] | None"; expected "str | Sequence[str] | Callable[[Any], str | Sequence[str]]" [arg-type] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Item "None" of "Incremental[Any] | None" has no attribute "row_order" [union-attr] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Incompatible types in assignment (expression has type "Literal['asc', 'desc'] | Any | None", variable has type "Literal['asc', 'desc']") [assignment] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "Column[Any]") [assignment] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "Literal['asc', 'desc']") [assignment] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Item "None" of "dict[str, Any] | None" has no attribute "get" [union-attr] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Argument "primary_key" to "make_hints" has incompatible type "list[str] | None"; expected "str | Sequence[str] | Callable[[Any], str | Sequence[str]]" [arg-type] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Dict entry 2 has incompatible type "Literal['auto']": "None"; expected "Literal['json_response', 'header_link', 'auto', 'single_page', 'cursor', 'offset', 'page_number']": "type[BasePaginator]" [dict-item] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "AuthConfigBase") [assignment] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Argument 1 to "get_auth_class" has incompatible type "Literal['bearer', 'api_key', 'http_basic'] | None"; expected "Literal['bearer', 'api_key', 'http_basic']" [arg-type] @@ -440,23 +439,23 @@ posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "typ posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: error: No overload variant of "with_only_columns" of "Select" matches argument type "ReadOnlyColumnCollection[str, Column[Any]]" [call-overload] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: Possible overload variants: -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], /) -> Select[tuple[_T0]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], /) -> Select[tuple[_T0, _T1]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], /) -> Select[tuple[_T0, _T1, _T2]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], /) -> Select[tuple[_T0, _T1, _T2, _T3]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5, _T6] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], TypedColumnsClauseRole[_T6] | SQLCoreOperations[_T6] | type[_T6], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], TypedColumnsClauseRole[_T6] | SQLCoreOperations[_T6] | type[_T6], TypedColumnsClauseRole[_T7] | SQLCoreOperations[_T7] | type[_T7], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def with_only_columns(self, *entities: TypedColumnsClauseRole[Any] | ColumnsClauseRole | SQLCoreOperations[Any] | Literal['*', 1] | type[Any] | Inspectable[_HasClauseElement[Any]] | _HasClauseElement[Any], maintain_column_froms: bool = ..., **Any) -> Select[Any] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: error: No overload variant of "resource" matches argument types "Callable[[Engine, Table, int, Literal['sqlalchemy', 'pyarrow', 'pandas', 'connectorx'], Incremental[Any] | None, Any | None, bool, Callable[[Table], None] | None, Literal['minimal', 'full', 'full_with_precision'], dict[str, Any] | None, Callable[[TypeEngine[Any]], TypeEngine[Any] | type[TypeEngine[Any]] | None] | None, list[str] | None, Callable[[Select[Any], Table], Select[Any]] | None, list[str] | None], Iterator[Any]]", "str", "list[str] | None", "list[str] | None", "dict[str, TColumnSchema]", "Collection[str]", "str" [call-overload] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: Possible overload variants: -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TResourceFunParams`-1, TDltResourceImpl: DltResource] resource(Callable[TResourceFunParams, Any], /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> TDltResourceImpl -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(None = ..., /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(None = ..., /, name: str | Callable[[Any], str] = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ..., standalone: Literal[True] = ...) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, TDltResourceImpl]] -posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(list[Any] | tuple[Any] | Iterator[Any], /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> TDltResourceImpl +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: error: No overload variant of "with_only_columns" of "Select" matches argument type "ReadOnlyColumnCollection[str, Column[Any]]" [call-overload] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: Possible overload variants: +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], /) -> Select[tuple[_T0]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], /) -> Select[tuple[_T0, _T1]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1, _T2] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], /) -> Select[tuple[_T0, _T1, _T2]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1, _T2, _T3] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], /) -> Select[tuple[_T0, _T1, _T2, _T3]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5, _T6] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], TypedColumnsClauseRole[_T6] | SQLCoreOperations[_T6] | type[_T6], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], TypedColumnsClauseRole[_T6] | SQLCoreOperations[_T6] | type[_T6], TypedColumnsClauseRole[_T7] | SQLCoreOperations[_T7] | type[_T7], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def with_only_columns(self, *entities: TypedColumnsClauseRole[Any] | ColumnsClauseRole | SQLCoreOperations[Any] | Literal['*', 1] | type[Any] | Inspectable[_HasClauseElement[Any]] | _HasClauseElement[Any], maintain_column_froms: bool = ..., **Any) -> Select[Any] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: error: No overload variant of "resource" matches argument types "Callable[[Engine, Table, int, Literal['sqlalchemy', 'pyarrow', 'pandas', 'connectorx'], Incremental[Any] | None, Any | None, bool, Callable[[Table], None] | None, Literal['minimal', 'full', 'full_with_precision'], dict[str, Any] | None, Callable[[TypeEngine[Any]], TypeEngine[Any] | type[TypeEngine[Any]] | None] | None, list[str] | None, Callable[[Select[Any], Table], Select[Any]] | None, list[str] | None], Iterator[Any]]", "str", "list[str] | None", "list[str] | None", "dict[str, TColumnSchema]", "Collection[str]", "str" [call-overload] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: Possible overload variants: +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [TResourceFunParams`-1, TDltResourceImpl: DltResource] resource(Callable[TResourceFunParams, Any], /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> TDltResourceImpl +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(None = ..., /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(None = ..., /, name: str | Callable[[Any], str] = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ..., standalone: Literal[True] = ...) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, TDltResourceImpl]] +posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(list[Any] | tuple[Any] | Iterator[Any], /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> TDltResourceImpl posthog/tasks/test/test_update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] posthog/tasks/test/test_stop_surveys_reached_target.py:0: error: No overload variant of "__sub__" of "datetime" matches argument type "None" [operator] posthog/tasks/test/test_stop_surveys_reached_target.py:0: note: Possible overload variants: diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index ae81f9fa61fe6..e7faa8de79583 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -1,36 +1,53 @@ """Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.""" from datetime import datetime, date -from typing import Any, Optional, Union, List, cast # noqa: UP035 -from collections.abc import Iterable -from zoneinfo import ZoneInfo +from typing import Optional, Union, Any +from collections.abc import Callable, Iterable + from sqlalchemy import MetaData, Table, create_engine -from sqlalchemy.engine import Engine, CursorResult +from sqlalchemy.engine import Engine +from zoneinfo import ZoneInfo import dlt from dlt.sources import DltResource, DltSource -from dlt.common.schema.typing import TColumnSchema - - -from dlt.sources.credentials import ConnectionStringCredentials from urllib.parse import quote +from dlt.common.libs.pyarrow import pyarrow as pa +from dlt.sources.credentials import ConnectionStringCredentials from posthog.settings.utils import get_from_env +from posthog.temporal.data_imports.pipelines.sql_database.settings import DEFAULT_CHUNK_SIZE +from posthog.temporal.data_imports.pipelines.sql_database._json import BigQueryJSON from posthog.utils import str_to_bool +from posthog.warehouse.models import ExternalDataSource from posthog.warehouse.types import IncrementalFieldType -from posthog.warehouse.models.external_data_source import ExternalDataSource -from sqlalchemy.sql import text from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from .helpers import ( + SelectAny, table_rows, engine_from_credentials, + TableBackend, + SqlTableResourceConfiguration, + _detect_precision_hints_deprecated, +) +from .schema_types import ( + default_table_adapter, + table_to_columns, get_primary_key, - SqlDatabaseTableConfiguration, + ReflectionLevel, + TTypeAdapter, ) +from sqlalchemy_bigquery import BigQueryDialect, __all__ +from sqlalchemy_bigquery._types import _type_map + +# Workaround to get JSON support in the BigQuery Dialect +BigQueryDialect.JSON = BigQueryJSON +_type_map["JSON"] = BigQueryJSON +__all__.append("JSON") + def incremental_type_to_initial_value(field_type: IncrementalFieldType) -> Any: if field_type == IncrementalFieldType.Integer or field_type == IncrementalFieldType.Numeric: @@ -104,9 +121,10 @@ def sql_source_for_type( schema=schema, table_names=table_names, incremental=incremental, + db_incremental_field_last_value=db_incremental_field_last_value, team_id=team_id, connect_args=connect_args, - db_incremental_field_last_value=db_incremental_field_last_value, + chunk_size=DEFAULT_CHUNK_SIZE, ) return db_source @@ -181,6 +199,7 @@ def snowflake_source( table_names=table_names, incremental=incremental, db_incremental_field_last_value=db_incremental_field_last_value, + chunk_size=DEFAULT_CHUNK_SIZE, ) return db_source @@ -226,38 +245,32 @@ def bigquery_source( table_names=[table_name], incremental=incremental, db_incremental_field_last_value=db_incremental_field_last_value, + chunk_size=DEFAULT_CHUNK_SIZE, ) -# Temp while DLT doesn't support `interval` columns -def remove_columns(columns_to_drop: list[str], team_id: Optional[int]): - def internal_remove(doc: dict) -> dict: - if team_id == 1 or team_id == 2: - if "sync_frequency_interval" in doc: - del doc["sync_frequency_interval"] - - for col in columns_to_drop: - if col in doc: - del doc[col] - - return doc - - return internal_remove - - @dlt.source(max_table_nesting=0) def sql_database( db_incremental_field_last_value: Optional[Any], credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, schema: Optional[str] = dlt.config.value, metadata: Optional[MetaData] = None, - table_names: Optional[List[str]] = dlt.config.value, # noqa: UP006 + table_names: Optional[list[str]] = dlt.config.value, + chunk_size: int = DEFAULT_CHUNK_SIZE, + backend: TableBackend = "pyarrow", + detect_precision_hints: Optional[bool] = False, + reflection_level: Optional[ReflectionLevel] = "full", + defer_table_reflect: Optional[bool] = None, + table_adapter_callback: Optional[Callable[[Table], None]] = None, + backend_kwargs: Optional[dict[str, Any]] = None, + include_views: bool = False, + type_adapter_callback: Optional[TTypeAdapter] = None, incremental: Optional[dlt.sources.incremental] = None, team_id: Optional[int] = None, connect_args: Optional[list[str]] = None, ) -> Iterable[DltResource]: """ - A DLT source which loads data from an SQL database using SQLAlchemy. + A dlt source which loads data from an SQL database using SQLAlchemy. Resources are automatically created for each table in the schema or from the given list of tables. Args: @@ -265,105 +278,196 @@ def sql_database( schema (Optional[str]): Name of the database schema to load (if different from default). metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. + chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. + backend (TableBackend): Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". + "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. + "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, + "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. + detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. + This is disabled by default. + reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. + "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. + "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. + "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. + defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Requires table_names to be explicitly passed. + Enable this option when running on Airflow. Available on dlt 0.4.4 and later. + table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected. + backend_kwargs (**kwargs): kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. + include_views (bool): Reflect views as well as tables. Note view names included in `table_names` are always included regardless of this setting. + type_adapter_callback(Optional[Callable]): Callable to override type inference when reflecting columns. + Argument is a single sqlalchemy data type (`TypeEngine` instance) and it should return another sqlalchemy data type, or `None` (type will be inferred from data) + query_adapter_callback(Optional[Callable[Select, Table], Select]): Callable to override the SELECT query used to fetch data from the table. + The callback receives the sqlalchemy `Select` and corresponding `Table` objects and should return the modified `Select`. Returns: Iterable[DltResource]: A list of DLT resources for each table to be loaded. """ + # detect precision hints is deprecated + _detect_precision_hints_deprecated(detect_precision_hints) + + if detect_precision_hints: + reflection_level = "full_with_precision" + else: + reflection_level = reflection_level or "minimal" # set up alchemy engine engine = engine_from_credentials(credentials) - engine.execution_options(stream_results=True) + engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) metadata = metadata or MetaData(schema=schema) # use provided tables or all tables if table_names: - tables = [Table(name, metadata, autoload_with=engine) for name in table_names] + tables = [Table(name, metadata, autoload_with=None if defer_table_reflect else engine) for name in table_names] else: - metadata.reflect(bind=engine) + if defer_table_reflect: + raise ValueError("You must pass table names to defer table reflection") + metadata.reflect(bind=engine, views=include_views) tables = list(metadata.tables.values()) for table in tables: - # TODO(@Gilbert09): Read column types, convert them to DLT types - # and pass them in here to get empty table materialization - binary_columns_to_drop = get_binary_columns(engine, schema or "", table.name) - - yield ( - dlt.resource( - table_rows, - name=table.name, - primary_key=get_primary_key(table), - merge_key=get_primary_key(table), - write_disposition={ - "disposition": "merge", - "strategy": "upsert", - } - if incremental - else "replace", - spec=SqlDatabaseTableConfiguration, - table_format="delta", - columns=get_column_hints(engine, schema or "", table.name), - ).add_map(remove_columns(binary_columns_to_drop, team_id))( - engine=engine, - table=table, - incremental=incremental, - connect_args=connect_args, - db_incremental_field_last_value=db_incremental_field_last_value, - ) + yield sql_table( + credentials=engine, + table=table.name, + schema=table.schema, + metadata=metadata, + chunk_size=chunk_size, + backend=backend, + reflection_level=reflection_level, + defer_table_reflect=defer_table_reflect, + table_adapter_callback=table_adapter_callback, + backend_kwargs=backend_kwargs, + type_adapter_callback=type_adapter_callback, + incremental=incremental, + db_incremental_field_last_value=db_incremental_field_last_value, + team_id=team_id, + connect_args=connect_args, ) -def get_binary_columns(engine: Engine, schema_name: str, table_name: str) -> list[str]: - # TODO(@Gilbert09): Add support for querying bigquery here - if engine.url.drivername == "bigquery": - return [] +# Temp while we dont support binary columns in HogQL +def remove_columns(columns_to_drop: list[str], team_id: Optional[int]): + col_len = len(columns_to_drop) - with engine.connect() as conn: - execute_result: CursorResult = conn.execute( - text( - "SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = :schema_name AND table_name = :table_name" - ), - {"schema_name": schema_name, "table_name": table_name}, - ) + def internal_remove(table: pa.Table) -> pa.Table: + if col_len == 0: + return table - cursor_result = cast(CursorResult, execute_result) - results = cursor_result.fetchall() + table_cols = [n for n in columns_to_drop if n in table.column_names] + if len(table_cols) > 0: + return table.drop(columns_to_drop) - binary_cols: list[str] = [] + return table - for column_name, data_type in results: - lower_data_type = data_type.lower() - if lower_data_type == "bytea" or lower_data_type == "binary" or lower_data_type == "varbinary": - binary_cols.append(column_name) + return internal_remove - return binary_cols +@dlt.resource(name=lambda args: args["table"], standalone=True, spec=SqlTableResourceConfiguration) +def sql_table( + db_incremental_field_last_value: Optional[Any], + credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, + table: str = dlt.config.value, + schema: Optional[str] = dlt.config.value, + metadata: Optional[MetaData] = None, + incremental: Optional[dlt.sources.incremental[Any]] = None, + chunk_size: int = DEFAULT_CHUNK_SIZE, + backend: TableBackend = "sqlalchemy", + detect_precision_hints: Optional[bool] = None, + reflection_level: Optional[ReflectionLevel] = "full", + defer_table_reflect: Optional[bool] = None, + table_adapter_callback: Optional[Callable[[Table], None]] = None, + backend_kwargs: Optional[dict[str, Any]] = None, + type_adapter_callback: Optional[TTypeAdapter] = None, + included_columns: Optional[list[str]] = None, + team_id: Optional[int] = None, + connect_args: Optional[list[str]] = None, +) -> DltResource: + """ + A dlt resource which loads data from an SQL database table using SQLAlchemy. -def get_column_hints(engine: Engine, schema_name: str, table_name: str) -> dict[str, TColumnSchema]: - # TODO(@Gilbert09): Add support for querying bigquery here - if engine.url.drivername == "bigquery": - return {} + Args: + credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `Engine` instance representing the database connection. + table (str): Name of the table or view to load. + schema (Optional[str]): Optional name of the schema the table belongs to. + metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored. + incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table. + E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))` + chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. + backend (TableBackend): Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". + "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. + "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, + "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. + reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. + "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. + "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. + "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. + detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. + This is disabled by default. + defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Enable this option when running on Airflow. Available + on dlt 0.4.4 and later + table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected. + backend_kwargs (**kwargs): kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. + type_adapter_callback(Optional[Callable]): Callable to override type inference when reflecting columns. + Argument is a single sqlalchemy data type (`TypeEngine` instance) and it should return another sqlalchemy data type, or `None` (type will be inferred from data) + included_columns (Optional[List[str]): List of column names to select from the table. If not provided, all columns are loaded. + query_adapter_callback(Optional[Callable[Select, Table], Select]): Callable to override the SELECT query used to fetch data from the table. + The callback receives the sqlalchemy `Select` and corresponding `Table` objects and should return the modified `Select`. - with engine.connect() as conn: - execute_result: CursorResult = conn.execute( - text( - "SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = :schema_name AND table_name = :table_name" - ), - {"schema_name": schema_name, "table_name": table_name}, - ) + Returns: + DltResource: The dlt resource for loading data from the SQL database table. + """ + _detect_precision_hints_deprecated(detect_precision_hints) - cursor_result = cast(CursorResult, execute_result) - results = cursor_result.fetchall() + if detect_precision_hints: + reflection_level = "full_with_precision" + else: + reflection_level = reflection_level or "minimal" - columns: dict[str, TColumnSchema] = {} + engine = engine_from_credentials(credentials, may_dispose_after_use=True) + engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) + metadata = metadata or MetaData(schema=schema) - for column_name, data_type, numeric_precision, numeric_scale in results: - if data_type != "numeric": - continue + table_obj: Table | None = metadata.tables.get("table") + if table_obj is None: + table_obj = Table(table, metadata, autoload_with=None if defer_table_reflect else engine) - columns[column_name] = { - "data_type": "decimal", - "precision": numeric_precision or 76, - "scale": numeric_scale or 32, - } + if not defer_table_reflect: + default_table_adapter(table_obj, included_columns) + if table_adapter_callback: + table_adapter_callback(table_obj) + + columns = table_to_columns(table_obj, reflection_level, type_adapter_callback) + + def query_adapter_callback(query: SelectAny, table: Table): + cols_to_select = list(columns.keys()) + + return query.with_only_columns(table.c[*cols_to_select]) - return columns + return dlt.resource( + table_rows, + name=table_obj.name, + primary_key=get_primary_key(table_obj), + merge_key=get_primary_key(table_obj), + columns=columns, + write_disposition={ + "disposition": "merge", + "strategy": "upsert", + } + if incremental + else "replace", + table_format="delta", + )( + engine=engine, + table=table_obj, + chunk_size=chunk_size, + backend=backend, + incremental=incremental, + db_incremental_field_last_value=db_incremental_field_last_value, + reflection_level=reflection_level, + defer_table_reflect=defer_table_reflect, + table_adapter_callback=table_adapter_callback, + backend_kwargs=backend_kwargs, + type_adapter_callback=type_adapter_callback, + included_columns=included_columns, + query_adapter_callback=query_adapter_callback, + connect_args=connect_args, + ) diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py b/posthog/temporal/data_imports/pipelines/sql_database/_json.py similarity index 100% rename from posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py rename to posthog/temporal/data_imports/pipelines/sql_database/_json.py diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py b/posthog/temporal/data_imports/pipelines/sql_database/arrow_helpers.py similarity index 100% rename from posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py rename to posthog/temporal/data_imports/pipelines/sql_database/arrow_helpers.py diff --git a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py index 9bf72a26f3c1e..d202fde94ba43 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py @@ -1,161 +1,312 @@ """SQL database source helpers""" +import warnings from typing import ( Any, + Literal, Optional, Union, ) -from collections.abc import Iterator +from collections.abc import Callable, Iterator import operator import dlt -from dlt.sources.credentials import ConnectionStringCredentials from dlt.common.configuration.specs import BaseConfiguration, configspec -from dlt.common.typing import TDataItem -from .settings import DEFAULT_CHUNK_SIZE +from dlt.common.exceptions import MissingDependencyException +from dlt.common.schema import TTableSchemaColumns +from dlt.common.typing import TDataItem, TSortOrder + +from dlt.sources.credentials import ConnectionStringCredentials -from sqlalchemy import Table, create_engine, Column, text +from posthog.temporal.data_imports.pipelines.sql_database.settings import DEFAULT_CHUNK_SIZE + +from .arrow_helpers import row_tuples_to_arrow +from .schema_types import ( + default_table_adapter, + table_to_columns, + get_primary_key, + SelectAny, + ReflectionLevel, + TTypeAdapter, +) + +from sqlalchemy import Table, create_engine, text from sqlalchemy.engine import Engine -from sqlalchemy.sql import Select +from sqlalchemy.exc import CompileError + + +TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] +TQueryAdapter = Callable[[SelectAny, Table], SelectAny] class TableLoader: def __init__( self, engine: Engine, + backend: TableBackend, table: Table, + columns: TTableSchemaColumns, chunk_size: int = DEFAULT_CHUNK_SIZE, incremental: Optional[dlt.sources.incremental[Any]] = None, - connect_args: Optional[list[str]] = None, db_incremental_field_last_value: Optional[Any] = None, + query_adapter_callback: Optional[TQueryAdapter] = None, + connect_args: Optional[list[str]] = None, ) -> None: self.engine = engine + self.backend = backend self.table = table + self.columns = columns self.chunk_size = chunk_size + self.query_adapter_callback = query_adapter_callback self.incremental = incremental self.connect_args = connect_args if incremental: try: - self.cursor_column: Optional[Column[Any]] = table.c[incremental.cursor_path] + self.cursor_column = table.c[incremental.cursor_path] except KeyError as e: - try: - self.cursor_column = table.c[incremental.cursor_path.lower()] - except KeyError: - raise KeyError( - f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'" - ) from e + raise KeyError( + f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'" + ) from e self.last_value = ( db_incremental_field_last_value if db_incremental_field_last_value is not None else incremental.last_value ) + self.end_value = incremental.end_value + self.row_order: TSortOrder = self.incremental.row_order else: self.cursor_column = None self.last_value = None + self.end_value = None + self.row_order = None - def make_query(self) -> Select[Any]: + def _make_query(self) -> SelectAny: table = self.table query = table.select() if not self.incremental: return query last_value_func = self.incremental.last_value_func + + # generate where if last_value_func is max: # Query ordered and filtered according to last_value function - order_by = self.cursor_column.asc() # type: ignore - if self.last_value == self.incremental.initial_value: - filter_op = operator.ge - else: - filter_op = operator.gt + filter_op = operator.ge + filter_op_end = operator.lt elif last_value_func is min: - order_by = self.cursor_column.desc() # type: ignore - if self.last_value == self.incremental.initial_value: - filter_op = operator.le - else: - filter_op = operator.lt + filter_op = operator.le + filter_op_end = operator.gt else: # Custom last_value, load everything and let incremental handle filtering return query - query = query.order_by(order_by) - if self.last_value is None: - return query - return query.where(filter_op(self.cursor_column, self.last_value)) # type: ignore - def load_rows(self) -> Iterator[list[TDataItem]]: + if self.last_value is not None: + query = query.where(filter_op(self.cursor_column, self.last_value)) + if self.end_value is not None: + query = query.where(filter_op_end(self.cursor_column, self.end_value)) + + # generate order by from declared row order + order_by = None + if (self.row_order == "asc" and last_value_func is max) or ( + self.row_order == "desc" and last_value_func is min + ): + order_by = self.cursor_column.asc() + elif (self.row_order == "asc" and last_value_func is min) or ( + self.row_order == "desc" and last_value_func is max + ): + order_by = self.cursor_column.desc() + if order_by is not None: + query = query.order_by(order_by) + + return query + + def make_query(self) -> SelectAny: + if self.query_adapter_callback: + return self.query_adapter_callback(self._make_query(), self.table) + return self._make_query() + + def load_rows(self, backend_kwargs: Optional[dict[str, Any]] = None) -> Iterator[TDataItem]: + # make copy of kwargs + backend_kwargs = dict(backend_kwargs or {}) query = self.make_query() + if self.backend == "connectorx": + yield from self._load_rows_connectorx(query, backend_kwargs) + else: + yield from self._load_rows(query, backend_kwargs) + + def _load_rows(self, query: SelectAny, backend_kwargs: Optional[dict[str, Any]]) -> TDataItem: with self.engine.connect() as conn: if self.connect_args: for stmt in self.connect_args: conn.execute(text(stmt)) result = conn.execution_options(yield_per=self.chunk_size).execute(query) + # NOTE: cursor returns not normalized column names! may be quite useful in case of Oracle dialect + # that normalizes columns + # columns = [c[0] for c in result.cursor.description] + columns = list(result.keys()) for partition in result.partitions(size=self.chunk_size): - yield [dict(row._mapping) for row in partition] + if self.backend == "sqlalchemy": + yield [dict(row._mapping) for row in partition] + elif self.backend == "pandas": + from dlt.common.libs.pandas_sql import _wrap_result + + df = _wrap_result( + partition, + columns, + **{"dtype_backend": "pyarrow", **(backend_kwargs or {})}, + ) + yield df + elif self.backend == "pyarrow": + yield row_tuples_to_arrow(partition, self.columns, tz=backend_kwargs.get("tz", "UTC")) + + def _load_rows_connectorx(self, query: SelectAny, backend_kwargs: Optional[dict[str, Any]]) -> Iterator[TDataItem]: + try: + import connectorx as cx # type: ignore + except ImportError: + raise MissingDependencyException("Connector X table backend", ["connectorx"]) + + # default settings + backend_kwargs = { + "return_type": "arrow2", + "protocol": "binary", + **(backend_kwargs or {}), + } + conn = backend_kwargs.pop( + "conn", + self.engine.url._replace(drivername=self.engine.url.get_backend_name()).render_as_string( + hide_password=False + ), + ) + try: + query_str = str(query.compile(self.engine, compile_kwargs={"literal_binds": True})) + except CompileError as ex: + raise NotImplementedError( + f"Query for table {self.table.name} could not be compiled to string to execute it on ConnectorX. If you are on SQLAlchemy 1.4.x the causing exception is due to literals that cannot be rendered, upgrade to 2.x: {str(ex)}" + ) from ex + df = cx.read_sql(conn, query_str, **backend_kwargs) + yield df def table_rows( engine: Engine, table: Table, - chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_size: int, + backend: TableBackend, incremental: Optional[dlt.sources.incremental[Any]] = None, - connect_args: Optional[list[str]] = None, db_incremental_field_last_value: Optional[Any] = None, + defer_table_reflect: bool = False, + table_adapter_callback: Optional[Callable[[Table], None]] = None, + reflection_level: ReflectionLevel = "minimal", + backend_kwargs: Optional[dict[str, Any]] = None, + type_adapter_callback: Optional[TTypeAdapter] = None, + included_columns: Optional[list[str]] = None, + query_adapter_callback: Optional[TQueryAdapter] = None, + connect_args: Optional[list[str]] = None, ) -> Iterator[TDataItem]: - """ - A DLT source which loads data from an SQL database using SQLAlchemy. - Resources are automatically created for each table in the schema or from the given list of tables. + columns: TTableSchemaColumns | None = None + if defer_table_reflect: + table = Table(table.name, table.metadata, autoload_with=engine, extend_existing=True) + default_table_adapter(table, included_columns) + if table_adapter_callback: + table_adapter_callback(table) + columns = table_to_columns(table, reflection_level, type_adapter_callback) - Args: - credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. - schema (Optional[str]): Name of the database schema to load (if different from default). - metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. - table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. + # set the primary_key in the incremental + if incremental and incremental.primary_key is None: + primary_key = get_primary_key(table) + if primary_key is not None: + incremental.primary_key = primary_key + + # yield empty record to set hints + yield dlt.mark.with_hints( + [], + dlt.mark.make_hints( + primary_key=get_primary_key(table), + columns=columns, + ), + ) + else: + # table was already reflected + columns = table_to_columns(table, reflection_level, type_adapter_callback) - Returns: - Iterable[DltResource]: A list of DLT resources for each table to be loaded. - """ yield dlt.mark.materialize_table_schema() # type: ignore loader = TableLoader( engine, + backend, table, + columns, incremental=incremental, + db_incremental_field_last_value=db_incremental_field_last_value, chunk_size=chunk_size, + query_adapter_callback=query_adapter_callback, connect_args=connect_args, - db_incremental_field_last_value=db_incremental_field_last_value, ) - yield from loader.load_rows() + + yield from loader.load_rows(backend_kwargs) engine.dispose() -def engine_from_credentials(credentials: Union[ConnectionStringCredentials, Engine, str]) -> Engine: +def engine_from_credentials( + credentials: Union[ConnectionStringCredentials, Engine, str], + may_dispose_after_use: bool = False, + **backend_kwargs: Any, +) -> Engine: if isinstance(credentials, Engine): return credentials if isinstance(credentials, ConnectionStringCredentials): credentials = credentials.to_native_representation() - return create_engine(credentials, pool_pre_ping=True) + engine = create_engine(credentials, **backend_kwargs) + setattr(engine, "may_dispose_after_use", may_dispose_after_use) # noqa + return engine + + +def unwrap_json_connector_x(field: str) -> TDataItem: + """Creates a transform function to be added with `add_map` that will unwrap JSON columns + ingested via connectorx. Such columns are additionally quoted and translate SQL NULL to json "null" + """ + import pyarrow.compute as pc + import pyarrow as pa + def _unwrap(table: TDataItem) -> TDataItem: + col_index = table.column_names.index(field) + # remove quotes + column = pc.replace_substring_regex(table[field], '"(.*)"', "\\1") + # convert json null to null + column = pc.replace_with_mask( + column, + pc.equal(column, "null").combine_chunks(), + pa.scalar(None, pa.large_string()), + ) + return table.set_column(col_index, table.schema.field(col_index), column) -def get_primary_key(table: Table) -> list[str]: - primary_keys = [c.name for c in table.primary_key] - if len(primary_keys) > 0: - return primary_keys + return _unwrap - column_names = [c.name for c in table.columns] - if "id" in column_names: - return ["id"] - return [] +def _detect_precision_hints_deprecated(value: Optional[bool]) -> None: + if value is None: + return + + msg = "`detect_precision_hints` argument is deprecated and will be removed in a future release. " + if value: + msg += "Use `reflection_level='full_with_precision'` which has the same effect instead." + + warnings.warn(msg, DeprecationWarning, stacklevel=1) @configspec class SqlDatabaseTableConfiguration(BaseConfiguration): - incremental: Optional[dlt.sources.incremental] = None + incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] + included_columns: Optional[list[str]] = None @configspec class SqlTableResourceConfiguration(BaseConfiguration): - credentials: ConnectionStringCredentials - table: str - schema: Optional[str] - incremental: Optional[dlt.sources.incremental] = None - - -__source_name__ = "sql_database" + credentials: Optional[Union[ConnectionStringCredentials, Engine, str]] = None + table: Optional[str] = None + schema: Optional[str] = None + incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] + chunk_size: int = DEFAULT_CHUNK_SIZE + backend: TableBackend = "sqlalchemy" + detect_precision_hints: Optional[bool] = None + defer_table_reflect: Optional[bool] = False + reflection_level: Optional[ReflectionLevel] = "full" + included_columns: Optional[list[str]] = None diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py b/posthog/temporal/data_imports/pipelines/sql_database/schema_types.py similarity index 98% rename from posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py rename to posthog/temporal/data_imports/pipelines/sql_database/schema_types.py index 6f6e883b3dc0d..4c9dab5997d94 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/schema_types.py @@ -17,7 +17,7 @@ from dlt.common import logger from dlt.common.schema.typing import TColumnSchema, TTableSchemaColumns -from posthog.temporal.data_imports.pipelines.sql_database_v2._json import BigQueryJSON +from posthog.temporal.data_imports.pipelines.sql_database._json import BigQueryJSON from sqlalchemy_bigquery import STRUCT as BigQueryStruct ReflectionLevel = Literal["minimal", "full", "full_with_precision"] diff --git a/posthog/temporal/data_imports/pipelines/sql_database/settings.py b/posthog/temporal/data_imports/pipelines/sql_database/settings.py index 21439aa0d6bee..d730961c096e8 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/settings.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/settings.py @@ -1,3 +1 @@ -"""Sql Database source settings and constants""" - -DEFAULT_CHUNK_SIZE = 5000 +DEFAULT_CHUNK_SIZE = 10_000 diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/test/test_arrow_helpers.py b/posthog/temporal/data_imports/pipelines/sql_database/test/test_arrow_helpers.py similarity index 89% rename from posthog/temporal/data_imports/pipelines/sql_database_v2/test/test_arrow_helpers.py rename to posthog/temporal/data_imports/pipelines/sql_database/test/test_arrow_helpers.py index 8f26dd3f18dd6..978fb8770fd65 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/test/test_arrow_helpers.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/test/test_arrow_helpers.py @@ -1,6 +1,6 @@ import pytest import pyarrow as pa -from posthog.temporal.data_imports.pipelines.sql_database_v2.arrow_helpers import json_dumps +from posthog.temporal.data_imports.pipelines.sql_database.arrow_helpers import json_dumps from dlt.common.json import json diff --git a/posthog/temporal/data_imports/pipelines/sql_database/test/test_sql_database.py b/posthog/temporal/data_imports/pipelines/sql_database/test/test_sql_database.py deleted file mode 100644 index edf217c4a67a4..0000000000000 --- a/posthog/temporal/data_imports/pipelines/sql_database/test/test_sql_database.py +++ /dev/null @@ -1,45 +0,0 @@ -from unittest.mock import MagicMock - -from posthog.temporal.data_imports.pipelines.sql_database import get_column_hints - - -def _setup(return_value): - mock_engine = MagicMock() - mock_engine_enter = MagicMock() - mock_connection = MagicMock() - mock_result = MagicMock() - - mock_engine.configure_mock(**{"connect.return_value": mock_engine_enter}) - mock_engine_enter.configure_mock(**{"__enter__.return_value": mock_connection}) - mock_connection.configure_mock(**{"execute.return_value": mock_result}) - mock_result.configure_mock(**{"fetchall.return_value": return_value}) - - return mock_engine - - -def test_get_column_hints_numeric_no_results(): - mock_engine = _setup([]) - - assert get_column_hints(mock_engine, "some_schema", "some_table") == {} - - -def test_get_column_hints_numeric_with_scale_and_precision(): - mock_engine = _setup([("column", "numeric", 10, 2)]) - - assert get_column_hints(mock_engine, "some_schema", "some_table") == { - "column": {"data_type": "decimal", "precision": 10, "scale": 2} - } - - -def test_get_column_hints_numeric_with_missing_scale_and_precision(): - mock_engine = _setup([("column", "numeric", None, None)]) - - assert get_column_hints(mock_engine, "some_schema", "some_table") == { - "column": {"data_type": "decimal", "precision": 76, "scale": 32} - } - - -def test_get_column_hints_numeric_with_no_numeric(): - mock_engine = _setup([("column", "bigint", None, None)]) - - assert get_column_hints(mock_engine, "some_schema", "some_table") == {} diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py deleted file mode 100644 index c5bc6db6674ab..0000000000000 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py +++ /dev/null @@ -1,473 +0,0 @@ -"""Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.""" - -from datetime import datetime, date -from typing import Optional, Union, Any -from collections.abc import Callable, Iterable - -from sqlalchemy import MetaData, Table, create_engine -from sqlalchemy.engine import Engine -from zoneinfo import ZoneInfo - -import dlt -from dlt.sources import DltResource, DltSource -from urllib.parse import quote -from dlt.common.libs.pyarrow import pyarrow as pa -from dlt.sources.credentials import ConnectionStringCredentials - -from posthog.settings.utils import get_from_env -from posthog.temporal.data_imports.pipelines.sql_database_v2.settings import DEFAULT_CHUNK_SIZE -from posthog.temporal.data_imports.pipelines.sql_database_v2._json import BigQueryJSON -from posthog.utils import str_to_bool -from posthog.warehouse.models import ExternalDataSource -from posthog.warehouse.types import IncrementalFieldType - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization - -from .helpers import ( - SelectAny, - table_rows, - engine_from_credentials, - TableBackend, - SqlTableResourceConfiguration, - _detect_precision_hints_deprecated, -) -from .schema_types import ( - default_table_adapter, - table_to_columns, - get_primary_key, - ReflectionLevel, - TTypeAdapter, -) - -from sqlalchemy_bigquery import BigQueryDialect, __all__ -from sqlalchemy_bigquery._types import _type_map - -# Workaround to get JSON support in the BigQuery Dialect -BigQueryDialect.JSON = BigQueryJSON -_type_map["JSON"] = BigQueryJSON -__all__.append("JSON") - - -def incremental_type_to_initial_value(field_type: IncrementalFieldType) -> Any: - if field_type == IncrementalFieldType.Integer or field_type == IncrementalFieldType.Numeric: - return 0 - if field_type == IncrementalFieldType.DateTime or field_type == IncrementalFieldType.Timestamp: - return datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=ZoneInfo("UTC")) - if field_type == IncrementalFieldType.Date: - return date(1970, 1, 1) - - -def sql_source_for_type( - source_type: ExternalDataSource.Type, - host: str, - port: int, - user: str, - password: str, - database: str, - sslmode: str, - schema: str, - table_names: list[str], - db_incremental_field_last_value: Optional[Any], - using_ssl: Optional[bool] = True, - team_id: Optional[int] = None, - incremental_field: Optional[str] = None, - incremental_field_type: Optional[IncrementalFieldType] = None, -) -> DltSource: - host = quote(host) - user = quote(user) - password = quote(password) - database = quote(database) - sslmode = quote(sslmode) - - if incremental_field is not None and incremental_field_type is not None: - incremental: dlt.sources.incremental | None = dlt.sources.incremental( - cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type) - ) - else: - incremental = None - - connect_args = [] - - if source_type == ExternalDataSource.Type.POSTGRES: - credentials = ConnectionStringCredentials( - f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}" - ) - elif source_type == ExternalDataSource.Type.MYSQL: - query_params = "" - - if using_ssl: - # We have to get DEBUG in temporal workers cos we're not loading Django in the same way as the app - is_debug = get_from_env("DEBUG", False, type_cast=str_to_bool) - ssl_ca = "/etc/ssl/cert.pem" if is_debug else "/etc/ssl/certs/ca-certificates.crt" - query_params = f"ssl_ca={ssl_ca}&ssl_verify_cert=false" - - credentials = ConnectionStringCredentials( - f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?{query_params}" - ) - - # PlanetScale needs this to be set - if host.endswith("psdb.cloud"): - connect_args = ["SET workload = 'OLAP';"] - elif source_type == ExternalDataSource.Type.MSSQL: - credentials = ConnectionStringCredentials( - f"mssql+pyodbc://{user}:{password}@{host}:{port}/{database}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes" - ) - else: - raise Exception("Unsupported source_type") - - db_source = sql_database( - credentials=credentials, - schema=schema, - table_names=table_names, - incremental=incremental, - db_incremental_field_last_value=db_incremental_field_last_value, - team_id=team_id, - connect_args=connect_args, - chunk_size=DEFAULT_CHUNK_SIZE, - ) - - return db_source - - -def snowflake_source( - account_id: str, - user: Optional[str], - password: Optional[str], - passphrase: Optional[str], - private_key: Optional[str], - auth_type: str, - database: str, - warehouse: str, - schema: str, - table_names: list[str], - db_incremental_field_last_value: Optional[Any], - role: Optional[str] = None, - incremental_field: Optional[str] = None, - incremental_field_type: Optional[IncrementalFieldType] = None, -) -> DltSource: - if incremental_field is not None and incremental_field_type is not None: - incremental: dlt.sources.incremental | None = dlt.sources.incremental( - cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type) - ) - else: - incremental = None - - if auth_type == "password" and user is not None and password is not None: - account_id = quote(account_id) - user = quote(user) - password = quote(password) - database = quote(database) - warehouse = quote(warehouse) - role = quote(role) if role else None - - credentials = create_engine( - f"snowflake://{user}:{password}@{account_id}/{database}/{schema}?warehouse={warehouse}{f'&role={role}' if role else ''}" - ) - else: - assert private_key is not None - assert user is not None - - account_id = quote(account_id) - user = quote(user) - database = quote(database) - warehouse = quote(warehouse) - role = quote(role) if role else None - - p_key = serialization.load_pem_private_key( - private_key.encode("utf-8"), - password=passphrase.encode() if passphrase is not None else None, - backend=default_backend(), - ) - - pkb = p_key.private_bytes( - encoding=serialization.Encoding.DER, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) - - credentials = create_engine( - f"snowflake://{user}@{account_id}/{database}/{schema}?warehouse={warehouse}{f'&role={role}' if role else ''}", - connect_args={ - "private_key": pkb, - }, - ) - - db_source = sql_database( - credentials=credentials, - schema=schema, - table_names=table_names, - incremental=incremental, - db_incremental_field_last_value=db_incremental_field_last_value, - chunk_size=DEFAULT_CHUNK_SIZE, - ) - - return db_source - - -def bigquery_source( - dataset_id: str, - project_id: str, - private_key: str, - private_key_id: str, - client_email: str, - token_uri: str, - table_name: str, - bq_destination_table_id: str, - db_incremental_field_last_value: Optional[Any], - incremental_field: Optional[str] = None, - incremental_field_type: Optional[IncrementalFieldType] = None, -) -> DltSource: - if incremental_field is not None and incremental_field_type is not None: - incremental: dlt.sources.incremental | None = dlt.sources.incremental( - cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type) - ) - else: - incremental = None - - credentials_info = { - "type": "service_account", - "project_id": project_id, - "private_key": private_key, - "private_key_id": private_key_id, - "client_email": client_email, - "token_uri": token_uri, - } - - engine = create_engine( - f"bigquery://{project_id}/{dataset_id}?create_disposition=CREATE_IF_NEEDED&allowLargeResults=true&destination={bq_destination_table_id}", - credentials_info=credentials_info, - ) - - return sql_database( - credentials=engine, - schema=None, - table_names=[table_name], - incremental=incremental, - db_incremental_field_last_value=db_incremental_field_last_value, - chunk_size=DEFAULT_CHUNK_SIZE, - ) - - -@dlt.source(max_table_nesting=0) -def sql_database( - db_incremental_field_last_value: Optional[Any], - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - table_names: Optional[list[str]] = dlt.config.value, - chunk_size: int = DEFAULT_CHUNK_SIZE, - backend: TableBackend = "pyarrow", - detect_precision_hints: Optional[bool] = False, - reflection_level: Optional[ReflectionLevel] = "full", - defer_table_reflect: Optional[bool] = None, - table_adapter_callback: Optional[Callable[[Table], None]] = None, - backend_kwargs: Optional[dict[str, Any]] = None, - include_views: bool = False, - type_adapter_callback: Optional[TTypeAdapter] = None, - incremental: Optional[dlt.sources.incremental] = None, - team_id: Optional[int] = None, - connect_args: Optional[list[str]] = None, -) -> Iterable[DltResource]: - """ - A dlt source which loads data from an SQL database using SQLAlchemy. - Resources are automatically created for each table in the schema or from the given list of tables. - - Args: - credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. - schema (Optional[str]): Name of the database schema to load (if different from default). - metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. - table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. - chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. - backend (TableBackend): Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". - "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. - "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, - "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. - detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. - This is disabled by default. - reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. - "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. - "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. - "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. - defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Requires table_names to be explicitly passed. - Enable this option when running on Airflow. Available on dlt 0.4.4 and later. - table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected. - backend_kwargs (**kwargs): kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. - include_views (bool): Reflect views as well as tables. Note view names included in `table_names` are always included regardless of this setting. - type_adapter_callback(Optional[Callable]): Callable to override type inference when reflecting columns. - Argument is a single sqlalchemy data type (`TypeEngine` instance) and it should return another sqlalchemy data type, or `None` (type will be inferred from data) - query_adapter_callback(Optional[Callable[Select, Table], Select]): Callable to override the SELECT query used to fetch data from the table. - The callback receives the sqlalchemy `Select` and corresponding `Table` objects and should return the modified `Select`. - - Returns: - Iterable[DltResource]: A list of DLT resources for each table to be loaded. - """ - # detect precision hints is deprecated - _detect_precision_hints_deprecated(detect_precision_hints) - - if detect_precision_hints: - reflection_level = "full_with_precision" - else: - reflection_level = reflection_level or "minimal" - - # set up alchemy engine - engine = engine_from_credentials(credentials) - engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) - metadata = metadata or MetaData(schema=schema) - - # use provided tables or all tables - if table_names: - tables = [Table(name, metadata, autoload_with=None if defer_table_reflect else engine) for name in table_names] - else: - if defer_table_reflect: - raise ValueError("You must pass table names to defer table reflection") - metadata.reflect(bind=engine, views=include_views) - tables = list(metadata.tables.values()) - - for table in tables: - yield sql_table( - credentials=engine, - table=table.name, - schema=table.schema, - metadata=metadata, - chunk_size=chunk_size, - backend=backend, - reflection_level=reflection_level, - defer_table_reflect=defer_table_reflect, - table_adapter_callback=table_adapter_callback, - backend_kwargs=backend_kwargs, - type_adapter_callback=type_adapter_callback, - incremental=incremental, - db_incremental_field_last_value=db_incremental_field_last_value, - team_id=team_id, - connect_args=connect_args, - ) - - -# Temp while we dont support binary columns in HogQL -def remove_columns(columns_to_drop: list[str], team_id: Optional[int]): - col_len = len(columns_to_drop) - - def internal_remove(table: pa.Table) -> pa.Table: - if col_len == 0: - return table - - table_cols = [n for n in columns_to_drop if n in table.column_names] - if len(table_cols) > 0: - return table.drop(columns_to_drop) - - return table - - return internal_remove - - -@dlt.resource(name=lambda args: args["table"], standalone=True, spec=SqlTableResourceConfiguration) -def sql_table( - db_incremental_field_last_value: Optional[Any], - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - table: str = dlt.config.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - incremental: Optional[dlt.sources.incremental[Any]] = None, - chunk_size: int = DEFAULT_CHUNK_SIZE, - backend: TableBackend = "sqlalchemy", - detect_precision_hints: Optional[bool] = None, - reflection_level: Optional[ReflectionLevel] = "full", - defer_table_reflect: Optional[bool] = None, - table_adapter_callback: Optional[Callable[[Table], None]] = None, - backend_kwargs: Optional[dict[str, Any]] = None, - type_adapter_callback: Optional[TTypeAdapter] = None, - included_columns: Optional[list[str]] = None, - team_id: Optional[int] = None, - connect_args: Optional[list[str]] = None, -) -> DltResource: - """ - A dlt resource which loads data from an SQL database table using SQLAlchemy. - - Args: - credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `Engine` instance representing the database connection. - table (str): Name of the table or view to load. - schema (Optional[str]): Optional name of the schema the table belongs to. - metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored. - incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table. - E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))` - chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. - backend (TableBackend): Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". - "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. - "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, - "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. - reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. - "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. - "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. - "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. - detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. - This is disabled by default. - defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Enable this option when running on Airflow. Available - on dlt 0.4.4 and later - table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected. - backend_kwargs (**kwargs): kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. - type_adapter_callback(Optional[Callable]): Callable to override type inference when reflecting columns. - Argument is a single sqlalchemy data type (`TypeEngine` instance) and it should return another sqlalchemy data type, or `None` (type will be inferred from data) - included_columns (Optional[List[str]): List of column names to select from the table. If not provided, all columns are loaded. - query_adapter_callback(Optional[Callable[Select, Table], Select]): Callable to override the SELECT query used to fetch data from the table. - The callback receives the sqlalchemy `Select` and corresponding `Table` objects and should return the modified `Select`. - - Returns: - DltResource: The dlt resource for loading data from the SQL database table. - """ - _detect_precision_hints_deprecated(detect_precision_hints) - - if detect_precision_hints: - reflection_level = "full_with_precision" - else: - reflection_level = reflection_level or "minimal" - - engine = engine_from_credentials(credentials, may_dispose_after_use=True) - engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) - metadata = metadata or MetaData(schema=schema) - - table_obj: Table | None = metadata.tables.get("table") - if table_obj is None: - table_obj = Table(table, metadata, autoload_with=None if defer_table_reflect else engine) - - if not defer_table_reflect: - default_table_adapter(table_obj, included_columns) - if table_adapter_callback: - table_adapter_callback(table_obj) - - columns = table_to_columns(table_obj, reflection_level, type_adapter_callback) - - def query_adapter_callback(query: SelectAny, table: Table): - cols_to_select = list(columns.keys()) - - return query.with_only_columns(table.c[*cols_to_select]) - - return dlt.resource( - table_rows, - name=table_obj.name, - primary_key=get_primary_key(table_obj), - merge_key=get_primary_key(table_obj), - columns=columns, - write_disposition={ - "disposition": "merge", - "strategy": "upsert", - } - if incremental - else "replace", - table_format="delta", - )( - engine=engine, - table=table_obj, - chunk_size=chunk_size, - backend=backend, - incremental=incremental, - db_incremental_field_last_value=db_incremental_field_last_value, - reflection_level=reflection_level, - defer_table_reflect=defer_table_reflect, - table_adapter_callback=table_adapter_callback, - backend_kwargs=backend_kwargs, - type_adapter_callback=type_adapter_callback, - included_columns=included_columns, - query_adapter_callback=query_adapter_callback, - connect_args=connect_args, - ) diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py deleted file mode 100644 index 74a79650caa15..0000000000000 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py +++ /dev/null @@ -1,312 +0,0 @@ -"""SQL database source helpers""" - -import warnings -from typing import ( - Any, - Literal, - Optional, - Union, -) -from collections.abc import Callable, Iterator -import operator - -import dlt -from dlt.common.configuration.specs import BaseConfiguration, configspec -from dlt.common.exceptions import MissingDependencyException -from dlt.common.schema import TTableSchemaColumns -from dlt.common.typing import TDataItem, TSortOrder - -from dlt.sources.credentials import ConnectionStringCredentials - -from posthog.temporal.data_imports.pipelines.sql_database_v2.settings import DEFAULT_CHUNK_SIZE - -from .arrow_helpers import row_tuples_to_arrow -from .schema_types import ( - default_table_adapter, - table_to_columns, - get_primary_key, - SelectAny, - ReflectionLevel, - TTypeAdapter, -) - -from sqlalchemy import Table, create_engine, text -from sqlalchemy.engine import Engine -from sqlalchemy.exc import CompileError - - -TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] -TQueryAdapter = Callable[[SelectAny, Table], SelectAny] - - -class TableLoader: - def __init__( - self, - engine: Engine, - backend: TableBackend, - table: Table, - columns: TTableSchemaColumns, - chunk_size: int = DEFAULT_CHUNK_SIZE, - incremental: Optional[dlt.sources.incremental[Any]] = None, - db_incremental_field_last_value: Optional[Any] = None, - query_adapter_callback: Optional[TQueryAdapter] = None, - connect_args: Optional[list[str]] = None, - ) -> None: - self.engine = engine - self.backend = backend - self.table = table - self.columns = columns - self.chunk_size = chunk_size - self.query_adapter_callback = query_adapter_callback - self.incremental = incremental - self.connect_args = connect_args - if incremental: - try: - self.cursor_column = table.c[incremental.cursor_path] - except KeyError as e: - raise KeyError( - f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'" - ) from e - self.last_value = ( - db_incremental_field_last_value - if db_incremental_field_last_value is not None - else incremental.last_value - ) - self.end_value = incremental.end_value - self.row_order: TSortOrder = self.incremental.row_order - else: - self.cursor_column = None - self.last_value = None - self.end_value = None - self.row_order = None - - def _make_query(self) -> SelectAny: - table = self.table - query = table.select() - if not self.incremental: - return query - last_value_func = self.incremental.last_value_func - - # generate where - if last_value_func is max: # Query ordered and filtered according to last_value function - filter_op = operator.ge - filter_op_end = operator.lt - elif last_value_func is min: - filter_op = operator.le - filter_op_end = operator.gt - else: # Custom last_value, load everything and let incremental handle filtering - return query - - if self.last_value is not None: - query = query.where(filter_op(self.cursor_column, self.last_value)) - if self.end_value is not None: - query = query.where(filter_op_end(self.cursor_column, self.end_value)) - - # generate order by from declared row order - order_by = None - if (self.row_order == "asc" and last_value_func is max) or ( - self.row_order == "desc" and last_value_func is min - ): - order_by = self.cursor_column.asc() - elif (self.row_order == "asc" and last_value_func is min) or ( - self.row_order == "desc" and last_value_func is max - ): - order_by = self.cursor_column.desc() - if order_by is not None: - query = query.order_by(order_by) - - return query - - def make_query(self) -> SelectAny: - if self.query_adapter_callback: - return self.query_adapter_callback(self._make_query(), self.table) - return self._make_query() - - def load_rows(self, backend_kwargs: Optional[dict[str, Any]] = None) -> Iterator[TDataItem]: - # make copy of kwargs - backend_kwargs = dict(backend_kwargs or {}) - query = self.make_query() - if self.backend == "connectorx": - yield from self._load_rows_connectorx(query, backend_kwargs) - else: - yield from self._load_rows(query, backend_kwargs) - - def _load_rows(self, query: SelectAny, backend_kwargs: Optional[dict[str, Any]]) -> TDataItem: - with self.engine.connect() as conn: - if self.connect_args: - for stmt in self.connect_args: - conn.execute(text(stmt)) - result = conn.execution_options(yield_per=self.chunk_size).execute(query) - # NOTE: cursor returns not normalized column names! may be quite useful in case of Oracle dialect - # that normalizes columns - # columns = [c[0] for c in result.cursor.description] - columns = list(result.keys()) - for partition in result.partitions(size=self.chunk_size): - if self.backend == "sqlalchemy": - yield [dict(row._mapping) for row in partition] - elif self.backend == "pandas": - from dlt.common.libs.pandas_sql import _wrap_result - - df = _wrap_result( - partition, - columns, - **{"dtype_backend": "pyarrow", **(backend_kwargs or {})}, - ) - yield df - elif self.backend == "pyarrow": - yield row_tuples_to_arrow(partition, self.columns, tz=backend_kwargs.get("tz", "UTC")) - - def _load_rows_connectorx(self, query: SelectAny, backend_kwargs: Optional[dict[str, Any]]) -> Iterator[TDataItem]: - try: - import connectorx as cx # type: ignore - except ImportError: - raise MissingDependencyException("Connector X table backend", ["connectorx"]) - - # default settings - backend_kwargs = { - "return_type": "arrow2", - "protocol": "binary", - **(backend_kwargs or {}), - } - conn = backend_kwargs.pop( - "conn", - self.engine.url._replace(drivername=self.engine.url.get_backend_name()).render_as_string( - hide_password=False - ), - ) - try: - query_str = str(query.compile(self.engine, compile_kwargs={"literal_binds": True})) - except CompileError as ex: - raise NotImplementedError( - f"Query for table {self.table.name} could not be compiled to string to execute it on ConnectorX. If you are on SQLAlchemy 1.4.x the causing exception is due to literals that cannot be rendered, upgrade to 2.x: {str(ex)}" - ) from ex - df = cx.read_sql(conn, query_str, **backend_kwargs) - yield df - - -def table_rows( - engine: Engine, - table: Table, - chunk_size: int, - backend: TableBackend, - incremental: Optional[dlt.sources.incremental[Any]] = None, - db_incremental_field_last_value: Optional[Any] = None, - defer_table_reflect: bool = False, - table_adapter_callback: Optional[Callable[[Table], None]] = None, - reflection_level: ReflectionLevel = "minimal", - backend_kwargs: Optional[dict[str, Any]] = None, - type_adapter_callback: Optional[TTypeAdapter] = None, - included_columns: Optional[list[str]] = None, - query_adapter_callback: Optional[TQueryAdapter] = None, - connect_args: Optional[list[str]] = None, -) -> Iterator[TDataItem]: - columns: TTableSchemaColumns | None = None - if defer_table_reflect: - table = Table(table.name, table.metadata, autoload_with=engine, extend_existing=True) - default_table_adapter(table, included_columns) - if table_adapter_callback: - table_adapter_callback(table) - columns = table_to_columns(table, reflection_level, type_adapter_callback) - - # set the primary_key in the incremental - if incremental and incremental.primary_key is None: - primary_key = get_primary_key(table) - if primary_key is not None: - incremental.primary_key = primary_key - - # yield empty record to set hints - yield dlt.mark.with_hints( - [], - dlt.mark.make_hints( - primary_key=get_primary_key(table), - columns=columns, - ), - ) - else: - # table was already reflected - columns = table_to_columns(table, reflection_level, type_adapter_callback) - - yield dlt.mark.materialize_table_schema() # type: ignore - - loader = TableLoader( - engine, - backend, - table, - columns, - incremental=incremental, - db_incremental_field_last_value=db_incremental_field_last_value, - chunk_size=chunk_size, - query_adapter_callback=query_adapter_callback, - connect_args=connect_args, - ) - - yield from loader.load_rows(backend_kwargs) - - engine.dispose() - - -def engine_from_credentials( - credentials: Union[ConnectionStringCredentials, Engine, str], - may_dispose_after_use: bool = False, - **backend_kwargs: Any, -) -> Engine: - if isinstance(credentials, Engine): - return credentials - if isinstance(credentials, ConnectionStringCredentials): - credentials = credentials.to_native_representation() - engine = create_engine(credentials, **backend_kwargs) - setattr(engine, "may_dispose_after_use", may_dispose_after_use) # noqa - return engine - - -def unwrap_json_connector_x(field: str) -> TDataItem: - """Creates a transform function to be added with `add_map` that will unwrap JSON columns - ingested via connectorx. Such columns are additionally quoted and translate SQL NULL to json "null" - """ - import pyarrow.compute as pc - import pyarrow as pa - - def _unwrap(table: TDataItem) -> TDataItem: - col_index = table.column_names.index(field) - # remove quotes - column = pc.replace_substring_regex(table[field], '"(.*)"', "\\1") - # convert json null to null - column = pc.replace_with_mask( - column, - pc.equal(column, "null").combine_chunks(), - pa.scalar(None, pa.large_string()), - ) - return table.set_column(col_index, table.schema.field(col_index), column) - - return _unwrap - - -def _detect_precision_hints_deprecated(value: Optional[bool]) -> None: - if value is None: - return - - msg = "`detect_precision_hints` argument is deprecated and will be removed in a future release. " - if value: - msg += "Use `reflection_level='full_with_precision'` which has the same effect instead." - - warnings.warn(msg, DeprecationWarning, stacklevel=1) - - -@configspec -class SqlDatabaseTableConfiguration(BaseConfiguration): - incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] - included_columns: Optional[list[str]] = None - - -@configspec -class SqlTableResourceConfiguration(BaseConfiguration): - credentials: Optional[Union[ConnectionStringCredentials, Engine, str]] = None - table: Optional[str] = None - schema: Optional[str] = None - incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] - chunk_size: int = DEFAULT_CHUNK_SIZE - backend: TableBackend = "sqlalchemy" - detect_precision_hints: Optional[bool] = None - defer_table_reflect: Optional[bool] = False - reflection_level: Optional[ReflectionLevel] = "full" - included_columns: Optional[list[str]] = None diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py deleted file mode 100644 index d730961c096e8..0000000000000 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py +++ /dev/null @@ -1 +0,0 @@ -DEFAULT_CHUNK_SIZE = 10_000 diff --git a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py index 3daa83dd8caaa..fcf851c490134 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py @@ -181,7 +181,7 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs): ExternalDataSource.Type.MYSQL, ExternalDataSource.Type.MSSQL, ]: - from posthog.temporal.data_imports.pipelines.sql_database_v2 import sql_source_for_type + from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type host = model.pipeline.job_inputs.get("host") port = model.pipeline.job_inputs.get("port") @@ -277,7 +277,7 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.SNOWFLAKE: - from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + from posthog.temporal.data_imports.pipelines.sql_database import ( snowflake_source, ) @@ -411,7 +411,7 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.BIGQUERY: - from posthog.temporal.data_imports.pipelines.sql_database_v2 import bigquery_source + from posthog.temporal.data_imports.pipelines.sql_database import bigquery_source dataset_id = model.pipeline.job_inputs.get("dataset_id") project_id = model.pipeline.job_inputs.get("project_id") @@ -518,7 +518,3 @@ def _run( pipeline = PipelineNonDLT(source, logger, job_inputs.run_id, schema.is_incremental, reset_pipeline) pipeline.run() del pipeline - - schema = ExternalDataSchema.objects.get(id=inputs.schema_id) - schema.sync_type_config.pop("reset_pipeline", None) - schema.save() diff --git a/posthog/temporal/tests/batch_exports/test_import_data.py b/posthog/temporal/tests/batch_exports/test_import_data.py index 7d484ad4b159b..e0163f41d2161 100644 --- a/posthog/temporal/tests/batch_exports/test_import_data.py +++ b/posthog/temporal/tests/batch_exports/test_import_data.py @@ -68,9 +68,7 @@ def test_job_inputs_with_whitespace(activity_environment, team, **kwargs): activity_inputs = _setup(team, job_inputs) with ( - mock.patch( - "posthog.temporal.data_imports.pipelines.sql_database_v2.sql_source_for_type" - ) as sql_source_for_type, + mock.patch("posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type") as sql_source_for_type, mock.patch("posthog.temporal.data_imports.workflow_activities.import_data_sync._run"), ): activity_environment.run(import_data_activity_sync, activity_inputs) @@ -107,9 +105,7 @@ def test_postgres_source_without_ssh_tunnel(activity_environment, team, **kwargs activity_inputs = _setup(team, job_inputs) with ( - mock.patch( - "posthog.temporal.data_imports.pipelines.sql_database_v2.sql_source_for_type" - ) as sql_source_for_type, + mock.patch("posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type") as sql_source_for_type, mock.patch("posthog.temporal.data_imports.workflow_activities.import_data_sync._run"), ): activity_environment.run(import_data_activity_sync, activity_inputs) @@ -149,9 +145,7 @@ def test_postgres_source_with_ssh_tunnel_disabled(activity_environment, team, ** activity_inputs = _setup(team, job_inputs) with ( - mock.patch( - "posthog.temporal.data_imports.pipelines.sql_database_v2.sql_source_for_type" - ) as sql_source_for_type, + mock.patch("posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type") as sql_source_for_type, mock.patch("posthog.temporal.data_imports.workflow_activities.import_data_sync._run"), ): activity_environment.run(import_data_activity_sync, activity_inputs) @@ -209,7 +203,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback): with ( mock.patch( - "posthog.temporal.data_imports.pipelines.sql_database_v2.sql_source_for_type" + "posthog.temporal.data_imports.pipelines.sql_database.sql_source_for_type" ) as sql_source_for_type_v2, mock.patch("posthog.temporal.data_imports.workflow_activities.import_data_sync._run"), mock.patch.object(SSHTunnel, "get_tunnel", mock_get_tunnel),