diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index faa035dbfb826..ad04d6352e466 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -507,22 +507,28 @@ def to_mce_fields( def avro_schema_to_mce_fields( - avro_schema_string: str, is_key_schema: bool = False, default_nullable: bool = False + avro_schema_string: str, + is_key_schema: bool = False, + default_nullable: bool = False, + swallow_exceptions: bool = True, ) -> List[SchemaField]: """ Converts an avro schema into schema fields compatible with MCE. :param avro_schema_string: String representation of the AVRO schema. :param is_key_schema: True if it is a key-schema. Default is False (value-schema). + :param swallow_exceptions: True if the caller wants exceptions to be suppressed :return: The list of MCE compatible SchemaFields. """ - schema_fields: List[SchemaField] = [] + try: - schema_fields = list( + return list( AvroToMceSchemaConverter.to_mce_fields( avro_schema_string, is_key_schema, default_nullable ) ) except Exception: - logger.exception(f"Failed to parse {avro_schema_string} to mce_fields.") - - return schema_fields + if swallow_exceptions: + logger.exception(f"Failed to parse {avro_schema_string} into mce fields.") + return [] + else: + raise diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index fe59ad85fc0a5..ba85788adc61c 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -1,10 +1,14 @@ import json +import logging import re import uuid from typing import Any, Dict, List, Optional, Union from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeClass + +logger: logging.Logger = logging.getLogger(__name__) class HiveColumnToAvroConverter: @@ -102,7 +106,7 @@ def _parse_datatype_string( @staticmethod def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: parts = HiveColumnToAvroConverter._ignore_brackets_split(s, ",") - fields = [] + fields: List[Dict] = [] for part in parts: name_and_type = HiveColumnToAvroConverter._ignore_brackets_split( part.strip(), HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR @@ -123,7 +127,9 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: field_type = HiveColumnToAvroConverter._parse_datatype_string( name_and_type[1] ) - fields.append({"name": field_name, "type": field_type}) + + if not any(field["name"] == field_name for field in fields): + fields.append({"name": field_name, "type": field_type}) if kwargs.get("ustruct_seqn") is not None: struct_name = f'__structn_{kwargs["ustruct_seqn"]}_{str(uuid.uuid4()).replace("-", "")}' @@ -259,13 +265,28 @@ def get_schema_fields_for_hive_column( default_nullable: bool = False, is_part_of_key: bool = False, ) -> List[SchemaField]: - avro_schema_json = get_avro_schema_for_hive_column( - hive_column_name=hive_column_name, hive_column_type=hive_column_type - ) - schema_fields = avro_schema_to_mce_fields( - avro_schema_string=json.dumps(avro_schema_json), - default_nullable=default_nullable, - ) + + try: + avro_schema_json = get_avro_schema_for_hive_column( + hive_column_name=hive_column_name, hive_column_type=hive_column_type + ) + schema_fields = avro_schema_to_mce_fields( + avro_schema_string=json.dumps(avro_schema_json), + default_nullable=default_nullable, + swallow_exceptions=False, + ) + except Exception as e: + logger.warning( + f"Unable to parse column {hive_column_name} and type {hive_column_type} the error was: {e}" + ) + schema_fields = [ + SchemaField( + fieldPath=hive_column_name, + type=SchemaFieldDataTypeClass(type=NullTypeClass()), + nativeDataType=hive_column_type, + ) + ] + assert schema_fields if HiveColumnToAvroConverter.is_primitive_hive_type(hive_column_type): # Primitive avro schema does not have any field names. Append it to fieldPath. diff --git a/metadata-ingestion/tests/unit/utilities/__init__.py b/metadata-ingestion/tests/unit/utilities/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py b/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py new file mode 100644 index 0000000000000..56b48a130936c --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_hive_schema_to_avro.py @@ -0,0 +1,37 @@ +from datahub.metadata.schema_classes import ( + NullTypeClass, + NumberTypeClass, + RecordTypeClass, +) +from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column + + +def test_get_avro_schema_for_hive_column(): + schema_fields = get_schema_fields_for_hive_column("test", "int") + assert schema_fields[0].type.type == NumberTypeClass() + # Len will be the struct + 2 key there which should remain after the deduplication + assert len(schema_fields) == 1 + + +def test_get_avro_schema_for_struct_hive_column(): + schema_fields = get_schema_fields_for_hive_column("test", "struct") + assert schema_fields[0].type.type == RecordTypeClass() + assert len(schema_fields) == 2 + + +def test_get_avro_schema_for_struct_hive_with_duplicate_column(): + schema_fields = get_schema_fields_for_hive_column( + "test", "struct" + ) + assert schema_fields[0].type.type == RecordTypeClass() + # Len will be the struct + 2 key there which should remain after the deduplication + assert len(schema_fields) == 3 + + +def test_get_avro_schema_for_struct_hive_with_duplicate_column2(): + invalid_schema: str = "struct!test:intdsfs, test2:int, test:int>" + schema_fields = get_schema_fields_for_hive_column("test", invalid_schema) + assert len(schema_fields) == 1 + assert schema_fields[0].type.type == NullTypeClass() + assert schema_fields[0].fieldPath == "test" + assert schema_fields[0].nativeDataType == invalid_schema