From 73dc6586d4806cce507886f460b0487c3a3617be Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 27 Sep 2022 14:33:31 +0200 Subject: [PATCH 1/3] Handling complex types properly Complex type descriptions added properly --- .../ingestion/source/bigquery_v2/bigquery.py | 67 ++++++++++++++----- .../source/bigquery_v2/bigquery_schema.py | 7 +- .../datahub/utilities/hive_schema_to_avro.py | 7 +- 3 files changed, 62 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 93f6db2c5feb53..2dd1ad84e488c1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -90,6 +90,10 @@ GlobalTagsClass, TagAssociationClass, ) +from datahub.utilities.hive_schema_to_avro import ( + HiveColumnToAvroConverter, + get_schema_fields_for_hive_column, +) from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.registries.domain_registry import DomainRegistry @@ -785,20 +789,35 @@ def gen_dataset_urn(self, dataset_name: str, project_id: str, table: str) -> str ) return dataset_urn - def gen_schema_metadata( - self, - dataset_urn: str, - table: Union[BigqueryTable, BigqueryView], - dataset_name: str, - ) -> MetadataWorkUnit: - schema_metadata = SchemaMetadata( - schemaName=dataset_name, - platform=make_data_platform_urn(self.platform), - version=0, - hash="", - platformSchema=MySqlDDL(tableSchema=""), - fields=[ - SchemaField( + def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]: + schema_fields: List[SchemaField] = [] + + HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR = " " + _COMPLEX_TYPE = re.compile("^(struct|array)") + last_id = -1 + for col in columns: + + if _COMPLEX_TYPE.match(col.data_type.lower()): + # If the we have seen the ordinal position that most probably means we already processed this complex type + if last_id != col.ordinal_position: + schema_fields.extend( + get_schema_fields_for_hive_column( + col.name, col.data_type.lower(), description=col.comment + ) + ) + + # We have to add complex type comments to the correct level + if col.comment: + for idx, field in enumerate(schema_fields): + # Remove all the [version=2.0].[type=struct]. tags to get the field path + if ( + re.sub(r"\[.*?\]\.", "", field.fieldPath, 0, re.MULTILINE) + == col.field_path + ): + field.description = col.comment + schema_fields[idx] = field + else: + field = SchemaField( fieldPath=col.name, type=SchemaFieldDataType( self.BIGQUERY_FIELD_TYPE_MAPPINGS.get(col.data_type, NullType)() @@ -817,8 +836,24 @@ def gen_schema_metadata( if col.is_partition_column else GlobalTagsClass(tags=[]), ) - for col in table.columns - ], + schema_fields.append(field) + last_id = col.ordinal_position + return schema_fields + + def gen_schema_metadata( + self, + dataset_urn: str, + table: Union[BigqueryTable, BigqueryView], + dataset_name: str, + ) -> MetadataWorkUnit: + + schema_metadata = SchemaMetadata( + schemaName=dataset_name, + platform=make_data_platform_urn(self.platform), + version=0, + hash="", + platformSchema=MySqlDDL(tableSchema=""), + fields=self.gen_schema_fields(table.columns), ) wu = wrap_aspect_as_workunit( "dataset", dataset_urn, "schemaMetadata", schema_metadata diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 58446ff9287912..80b2d210b8abae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -12,10 +12,11 @@ logger: logging.Logger = logging.getLogger(__name__) -@dataclass +@dataclass(frozen=True, eq=True) class BigqueryColumn: name: str ordinal_position: int + field_path: str is_nullable: bool is_partition_column: bool data_type: str @@ -175,6 +176,7 @@ class BigqueryQuery: c.table_name as table_name, c.column_name as column_name, c.ordinal_position as ordinal_position, + cfp.field_path as field_path, c.is_nullable as is_nullable, c.data_type as data_type, description as comment, @@ -194,6 +196,7 @@ class BigqueryQuery: c.table_name as table_name, c.column_name as column_name, c.ordinal_position as ordinal_position, + cfp.field_path as field_path, c.is_nullable as is_nullable, c.data_type as data_type, c.is_hidden as is_hidden, @@ -355,6 +358,7 @@ def get_columns_for_dataset( BigqueryColumn( name=column.column_name, ordinal_position=column.ordinal_position, + field_path=column.field_path, is_nullable=column.is_nullable == "YES", data_type=column.data_type, comment=column.comment, @@ -379,6 +383,7 @@ def get_columns_for_table( name=column.column_name, ordinal_position=column.ordinal_position, is_nullable=column.is_nullable == "YES", + field_path=column.field_path, data_type=column.data_type, comment=column.comment, is_partition_column=column.is_partitioning_column == "YES", diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index 6e8d8da5f3fb82..fe59ad85fc0a59 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -22,7 +22,6 @@ class HiveColumnToAvroConverter: "float": "float", "tinyint": "int", "smallint": "int", - "int": "int", "bigint": "long", "varchar": "string", "char": "string", @@ -34,6 +33,8 @@ class HiveColumnToAvroConverter: _FIXED_STRING = re.compile(r"(var)?char\(\s*(\d+)\s*\)") + _STRUCT_TYPE_SEPARATOR = ":" + @staticmethod def _parse_datatype_string( s: str, **kwargs: Any @@ -103,7 +104,9 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: parts = HiveColumnToAvroConverter._ignore_brackets_split(s, ",") fields = [] for part in parts: - name_and_type = HiveColumnToAvroConverter._ignore_brackets_split(part, ":") + name_and_type = HiveColumnToAvroConverter._ignore_brackets_split( + part.strip(), HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR + ) if len(name_and_type) != 2: raise ValueError( ( From 43aff2a0bbd72fae5590a8fa4f58df9b5e3eb314 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 27 Sep 2022 14:51:13 +0200 Subject: [PATCH 2/3] Fixing sharded regexp pattern as in bigquery sharded table pattern is a table which ends _YYYYMMDD --- .../src/datahub/ingestion/source_config/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py index 21173ca11f0fd9..8ca1296d819c1c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py @@ -4,7 +4,7 @@ from datahub.configuration.common import ConfigModel, ConfigurationError -_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = "((.+)[_$])?(\\d{4,10})$" +_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = "((.+)[_$])?(\\d{8})$" class BigQueryBaseConfig(ConfigModel): From 7f334cefb833473f2bc257185ccde065b797f0fb Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 27 Sep 2022 19:05:30 +0200 Subject: [PATCH 3/3] Fixing table sanitazation --- .../ingestion/source/bigquery_v2/bigquery_audit.py | 11 ++--------- .../datahub/ingestion/source/usage/bigquery_usage.py | 9 +-------- .../tests/unit/test_bigquery_usage_source.py | 2 +- .../tests/unit/test_bigqueryv2_usage_source.py | 2 +- 4 files changed, 5 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index 72094371d1f15d..eac9c16cc54096 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -78,7 +78,7 @@ class BigqueryTableIdentifier: table: str invalid_chars: ClassVar[Set[str]] = {"$", "@"} - _BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: ClassVar[str] = "((.+)[_$])?(\\d{4,10})$" + _BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: ClassVar[str] = "((.+)[_$])?(\\d{8})$" @staticmethod def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]: @@ -101,17 +101,10 @@ def from_string_name(cls, table: str) -> "BigqueryTableIdentifier": def raw_table_name(self): return f"{self.project_id}.{self.dataset}.{self.table}" - @staticmethod - def _remove_suffix(input_string: str, suffixes: List[str]) -> str: - for suffix in suffixes: - if input_string.endswith(suffix): - return input_string[: -len(suffix)] - return input_string - def get_table_display_name(self) -> str: shortened_table_name = self.table # if table name ends in _* or * then we strip it as that represents a query on a sharded table - shortened_table_name = self._remove_suffix(shortened_table_name, ["_*", "*"]) + shortened_table_name = re.sub("(_(.+)?\\*)|\\*$", "", shortened_table_name) table_name, _ = self.get_table_and_shard(shortened_table_name) if not table_name: diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 59cc90261a84d9..b9052c35d465dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -268,19 +268,12 @@ def is_temporary_table(self, prefix: str) -> bool: # Temporary tables will have a dataset that begins with an underscore. return self.dataset.startswith(prefix) - @staticmethod - def remove_suffix(input_string, suffix): - if suffix and input_string.endswith(suffix): - return input_string[: -len(suffix)] - return input_string - def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": # Handle partitioned and sharded tables. table_name: Optional[str] = None shortened_table_name = self.table # if table name ends in _* or * then we strip it as that represents a query on a sharded table - shortened_table_name = self.remove_suffix(shortened_table_name, "_*") - shortened_table_name = self.remove_suffix(shortened_table_name, "*") + shortened_table_name = re.sub("(_(.+)?\\*)|\\*$", "", shortened_table_name) matches = re.match(sharded_table_regex, shortened_table_name) if matches: diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py index dc28751d0996b8..5dd412553cc6a3 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py @@ -179,7 +179,7 @@ def test_bigquery_ref_extra_removal(): table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_2022") new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX) - assert new_table_ref.table == "foo" + assert new_table_ref.table == "foo_2022" assert new_table_ref.project == table_ref.project assert new_table_ref.dataset == table_ref.dataset diff --git a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py index cb2a3dc1904d95..1dd04d0bc5d053 100644 --- a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py @@ -188,7 +188,7 @@ def test_bigquery_table_sanitasitation(): new_table_ref = BigqueryTableIdentifier.from_string_name( table_ref.table_identifier.get_table_name() ) - assert new_table_ref.table == "foo" + assert new_table_ref.table == "foo_2022" assert new_table_ref.project_id == "project-1234" assert new_table_ref.dataset == "dataset-4567"