Skip to content

Commit

Permalink
fix(ingest): various BigQuery source fixes (#2836)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jul 7, 2021
1 parent 6b59cde commit 523fa32
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
37 changes: 36 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
from typing import Optional, Tuple
from unittest.mock import patch

# This import verifies that the dependencies are available.
import pybigquery # noqa: F401
import pybigquery.sqlalchemy_bigquery

from .sql_common import SQLAlchemyConfig, SQLAlchemySource
from .sql_common import (
SQLAlchemyConfig,
SQLAlchemySource,
make_sqlalchemy_type,
register_custom_type,
)

# The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/e0f1496c99dd627e0ed04a0c4e89ca5b14611be2/pybigquery/sqlalchemy_bigquery.py#L967-L974.
# The existing implementation does not use the schema parameter and hence
# does not properly resolve the view definitions. As such, we must monkey
# patch the implementation.


def get_view_definition(self, connection, view_name, schema=None, **kw):
view = self._get_table(connection, view_name, schema)
return view.view_query


pybigquery.sqlalchemy_bigquery.BigQueryDialect.get_view_definition = get_view_definition

# Handle the GEOGRAPHY type. We will temporarily patch the _type_map
# in the get_workunits method of the source.
GEOGRAPHY = make_sqlalchemy_type("GEOGRAPHY")
register_custom_type(GEOGRAPHY)
assert pybigquery.sqlalchemy_bigquery._type_map


class BigQueryConfig(SQLAlchemyConfig):
Expand Down Expand Up @@ -48,3 +75,11 @@ def __init__(self, config, ctx):
def create(cls, config_dict, ctx):
config = BigQueryConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self):
with patch.dict(
"pybigquery.sqlalchemy_bigquery._type_map",
{"GEOGRAPHY": GEOGRAPHY},
clear=False,
):
return super().get_workunits()
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigModel

from .sql_common import (
RecordTypeClass,
SQLAlchemyConfig,
SQLAlchemySource,
TimeTypeClass,
Expand All @@ -24,7 +25,7 @@
register_custom_type(custom_types.TIMESTAMP_TZ, TimeTypeClass)
register_custom_type(custom_types.TIMESTAMP_LTZ, TimeTypeClass)
register_custom_type(custom_types.TIMESTAMP_NTZ, TimeTypeClass)
register_custom_type(custom_types.VARIANT)
register_custom_type(custom_types.VARIANT, RecordTypeClass)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down
24 changes: 21 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
MySqlDDL,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
Expand Down Expand Up @@ -154,6 +155,7 @@ class SqlWorkUnit(MetadataWorkUnit):
types.DateTime: TimeTypeClass,
types.DATETIME: TimeTypeClass,
types.TIMESTAMP: TimeTypeClass,
types.JSON: RecordTypeClass,
# When SQLAlchemy is unable to map a type into its internally hierarchy, it
# assigns the NullType by default. We want to carry this warning through.
types.NullType: NullTypeClass,
Expand All @@ -173,6 +175,24 @@ def register_custom_type(
_known_unknown_field_types.add(tp)


class _CustomSQLAlchemyDummyType(types.TypeDecorator):
impl = types.LargeBinary


def make_sqlalchemy_type(name: str) -> Type[types.TypeEngine]:
# This usage of type() dynamically constructs a class.
# See https://stackoverflow.com/a/15247202/5004662 and
# https://docs.python.org/3/library/functions.html#type.
sqlalchemy_type: Type[types.TypeEngine] = type(
name,
(_CustomSQLAlchemyDummyType,),
{
"__repr__": lambda self: f"{name}()",
},
)
return sqlalchemy_type


def get_column_type(
sql_report: SQLSourceReport, dataset_name: str, column_type: Any
) -> SchemaFieldDataType:
Expand Down Expand Up @@ -258,9 +278,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
for inspector in self.get_inspectors():
for schema in inspector.get_schema_names():
if not sql_config.schema_pattern.allowed(schema):
self.report.report_dropped(
".".join(sql_config.standardize_schema_table_names(schema, "*"))
)
self.report.report_dropped(f"{schema}.*")
continue

if sql_config.include_tables:
Expand Down

0 comments on commit 523fa32

Please sign in to comment.