From 2267f77e4ca2b7f17729c2eab008df0ef13cad18 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 4 Oct 2022 23:26:45 +0200 Subject: [PATCH 1/5] - Not failing in Hive type parsing error - Remove duplicate entries from hive struct type when parsing --- .../ingestion/extractor/schema_util.py | 12 ++---- .../datahub/utilities/hive_schema_to_avro.py | 39 ++++++++++++++----- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index faa035dbfb8262..0bd35b6ed7587b 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -515,14 +515,10 @@ def avro_schema_to_mce_fields( :param is_key_schema: True if it is a key-schema. Default is False (value-schema). :return: The list of MCE compatible SchemaFields. """ - schema_fields: List[SchemaField] = [] - try: - schema_fields = list( - AvroToMceSchemaConverter.to_mce_fields( - avro_schema_string, is_key_schema, default_nullable - ) + schema_fields: List[SchemaField] = list( + AvroToMceSchemaConverter.to_mce_fields( + avro_schema_string, is_key_schema, default_nullable ) - except Exception: - logger.exception(f"Failed to parse {avro_schema_string} to mce_fields.") + ) return schema_fields diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index fe59ad85fc0a59..6857fb706ce6e4 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -1,10 +1,14 @@ import json +import logging import re import uuid from typing import Any, Dict, List, Optional, Union from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeClass + +logger: logging.Logger = logging.getLogger(__name__) class HiveColumnToAvroConverter: @@ -13,7 +17,6 @@ class HiveColumnToAvroConverter: _PRIVIMITE_HIVE_TYPE_TO_AVRO_TYPE = { "string": "string", - "int": "int", "integer": "int", "double": "double", "double precision": "double", @@ -102,7 +105,7 @@ def _parse_datatype_string( @staticmethod def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: parts = HiveColumnToAvroConverter._ignore_brackets_split(s, ",") - fields = [] + fields: List[Dict] = [] for part in parts: name_and_type = HiveColumnToAvroConverter._ignore_brackets_split( part.strip(), HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR @@ -123,7 +126,9 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: field_type = HiveColumnToAvroConverter._parse_datatype_string( name_and_type[1] ) - fields.append({"name": field_name, "type": field_type}) + + if not any(field["name"] == field_name for field in fields): + fields.append({"name": field_name, "type": field_type}) if kwargs.get("ustruct_seqn") is not None: struct_name = f'__structn_{kwargs["ustruct_seqn"]}_{str(uuid.uuid4()).replace("-", "")}' @@ -259,13 +264,27 @@ def get_schema_fields_for_hive_column( default_nullable: bool = False, is_part_of_key: bool = False, ) -> List[SchemaField]: - avro_schema_json = get_avro_schema_for_hive_column( - hive_column_name=hive_column_name, hive_column_type=hive_column_type - ) - schema_fields = avro_schema_to_mce_fields( - avro_schema_string=json.dumps(avro_schema_json), - default_nullable=default_nullable, - ) + + try: + avro_schema_json = get_avro_schema_for_hive_column( + hive_column_name=hive_column_name, hive_column_type=hive_column_type + ) + schema_fields = avro_schema_to_mce_fields( + avro_schema_string=json.dumps(avro_schema_json), + default_nullable=default_nullable, + ) + except Exception as e: + logger.warning( + f"Unable to parse column {hive_column_name} and type {hive_column_type} the error was: {e}" + ) + schema_fields = [ + SchemaField( + fieldPath=hive_column_name, + type=SchemaFieldDataTypeClass(type=NullTypeClass()), + nativeDataType=hive_column_type, + ) + ] + assert schema_fields if HiveColumnToAvroConverter.is_primitive_hive_type(hive_column_type): # Primitive avro schema does not have any field names. Append it to fieldPath. From 0d4d4e71b4ba7b71a04c1df7f2fbbbd263b97732 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 4 Oct 2022 23:46:02 +0200 Subject: [PATCH 2/5] Adding tests --- .../tests/unit/utilities/__init__.py | 0 .../utilities/test_hive_schema_to_avro.py | 39 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 metadata-ingestion/tests/unit/utilities/__init__.py create mode 100644 metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py diff --git a/metadata-ingestion/tests/unit/utilities/__init__.py b/metadata-ingestion/tests/unit/utilities/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py b/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py new file mode 100644 index 00000000000000..62ef29743e708c --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py @@ -0,0 +1,39 @@ +from datahub.metadata.schema_classes import ( + NullTypeClass, + RecordTypeClass, + NumberTypeClass, +) +from datahub.utilities.hive_schema_to_avro import ( + get_schema_fields_for_hive_column, +) + + +def test_get_avro_schema_for_hive_column(): + schema_fields = get_schema_fields_for_hive_column("test", "int") + assert schema_fields[0].type.type == NumberTypeClass() + # Len will be the struct + 2 key there which should remain after the deduplication + assert len(schema_fields) == 1 + + +def test_get_avro_schema_for_struct_hive_column(): + schema_fields = get_schema_fields_for_hive_column("test", "struct") + assert schema_fields[0].type.type == RecordTypeClass() + assert len(schema_fields) == 2 + + +def test_get_avro_schema_for_struct_hive_with_duplicate_column(): + schema_fields = get_schema_fields_for_hive_column( + "test", "struct" + ) + assert schema_fields[0].type.type == RecordTypeClass() + # Len will be the struct + 2 key there which should remain after the deduplication + assert len(schema_fields) == 3 + + +def test_get_avro_schema_for_struct_hive_with_duplicate_column2(): + invalid_schema: str = "struct!test:intdsfs, test2:int, test:int>" + schema_fields = get_schema_fields_for_hive_column("test", invalid_schema) + assert len(schema_fields) == 1 + assert schema_fields[0].type.type == NullTypeClass() + assert schema_fields[0].fieldPath == "test" + assert schema_fields[0].nativeDataType == invalid_schema From 05519691be28d899595a6bc7e17f37f3db15a2ac Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 4 Oct 2022 23:58:10 +0200 Subject: [PATCH 3/5] Isorting --- .../tests/unit/utilities/test_hive_schema_to_avro.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py b/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py index 62ef29743e708c..56b48a130936ca 100644 --- a/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py +++ b/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py @@ -1,11 +1,9 @@ from datahub.metadata.schema_classes import ( NullTypeClass, - RecordTypeClass, NumberTypeClass, + RecordTypeClass, ) -from datahub.utilities.hive_schema_to_avro import ( - get_schema_fields_for_hive_column, -) +from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column def test_get_avro_schema_for_hive_column(): From 525bc3830151c74e9c455af89739ad819feeb2f9 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 5 Oct 2022 00:43:35 +0200 Subject: [PATCH 4/5] Fixing tests --- metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index 6857fb706ce6e4..71acd8a73fe426 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -17,6 +17,7 @@ class HiveColumnToAvroConverter: _PRIVIMITE_HIVE_TYPE_TO_AVRO_TYPE = { "string": "string", + "int": "int", "integer": "int", "double": "double", "double precision": "double", From 59f28612b71f94f9cc7fe0928166ba3e6861480a Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Tue, 4 Oct 2022 20:19:22 -0700 Subject: [PATCH 5/5] keep backcompat with old behavior --- .../ingestion/extractor/schema_util.py | 24 +++++++++++++------ .../datahub/utilities/hive_schema_to_avro.py | 1 + 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 0bd35b6ed7587b..ad04d6352e466e 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -507,18 +507,28 @@ def to_mce_fields( def avro_schema_to_mce_fields( - avro_schema_string: str, is_key_schema: bool = False, default_nullable: bool = False + avro_schema_string: str, + is_key_schema: bool = False, + default_nullable: bool = False, + swallow_exceptions: bool = True, ) -> List[SchemaField]: """ Converts an avro schema into schema fields compatible with MCE. :param avro_schema_string: String representation of the AVRO schema. :param is_key_schema: True if it is a key-schema. Default is False (value-schema). + :param swallow_exceptions: True if the caller wants exceptions to be suppressed :return: The list of MCE compatible SchemaFields. """ - schema_fields: List[SchemaField] = list( - AvroToMceSchemaConverter.to_mce_fields( - avro_schema_string, is_key_schema, default_nullable - ) - ) - return schema_fields + try: + return list( + AvroToMceSchemaConverter.to_mce_fields( + avro_schema_string, is_key_schema, default_nullable + ) + ) + except Exception: + if swallow_exceptions: + logger.exception(f"Failed to parse {avro_schema_string} into mce fields.") + return [] + else: + raise diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index 71acd8a73fe426..ba85788adc61c6 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -273,6 +273,7 @@ def get_schema_fields_for_hive_column( schema_fields = avro_schema_to_mce_fields( avro_schema_string=json.dumps(avro_schema_json), default_nullable=default_nullable, + swallow_exceptions=False, ) except Exception as e: logger.warning(