Skip to content

Commit

Permalink
fix(ingest): kafka - properly picking doc from union type (#6472)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Nov 23, 2022
1 parent 055196c commit 8d525d6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
27 changes: 18 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,24 @@ def _get_underlying_type_if_option_as_union(

class SchemaFieldEmissionContextManager:
"""Context Manager for MCE SchemaFiled emission
- handles prefix name stack management and AVRO record-field generation for non-complex types."""
- handles prefix name stack management and AVRO record-field generation for non-complex types.
- actual_schema contains the underlying no-null type's schema if the schema is a union
This way we can use the type/description of the non-null type if needed.
"""

def __init__(
self,
schema: avro.schema.Schema,
actual_schema: avro.schema.Schema,
converter: "AvroToMceSchemaConverter",
description: Optional[str] = None,
default_value: Optional[str] = None,
):
self._schema = schema
self._actual_schema = actual_schema
self._converter = converter
self._description = description
self._default_value = default_value

def __enter__(self):
type_annotation = self._converter._get_type_annotation(self._actual_schema)
Expand Down Expand Up @@ -289,8 +294,11 @@ def emit(self) -> Generator[SchemaField, None, None]:
)

description = self._description
if description is None:
description = schema.props.get("doc", None)
if not description and actual_schema.props.get("doc"):
description = actual_schema.props.get("doc")

if self._default_value is not None:
description = f"{description if description else ''}\nField default value: {self._default_value}"

native_data_type = self._converter._prefix_name_stack[-1]
if isinstance(schema, (avro.schema.Field, avro.schema.UnionSchema)):
Expand All @@ -314,6 +322,8 @@ def emit(self) -> Generator[SchemaField, None, None]:
description = (
f"<span style=\"color:red\">DEPRECATED: {merged_props['deprecated']}</span>\n"
+ description
if description
else ""
)
tags = GlobalTagsClass(
tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")]
Expand Down Expand Up @@ -407,13 +417,12 @@ def _gen_from_last_field(
last_field_schema = self._fields_stack[-1]
# Generate the custom-description for the field.
description = last_field_schema.doc if last_field_schema.doc else None
if last_field_schema.has_default and last_field_schema.default is not None:
description = (
f"{description}\nField default value: {last_field_schema.default}"
)

with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
last_field_schema, last_field_schema, self, description
last_field_schema,
last_field_schema,
self,
description,
last_field_schema.default,
) as f_emit:
yield from f_emit.emit()

Expand Down
25 changes: 24 additions & 1 deletion metadata-ingestion/tests/unit/test_schema_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,17 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
"namespace": "some.event.namespace",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "amount", "type": "double", "doc": "amountDoc"},
{"name": "name","type": "string","default": ""},
{"name": "phoneNumber",
"type": [{
"type": "record",
"name": "PhoneNumber",
"doc": "testDoc",
"fields": [{
"name": "areaCode",
"type": "string",
"doc": "areaCodeDoc",
"default": ""
}, {
"name": "countryCode",
Expand All @@ -298,6 +300,21 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
"null"
],
"default": "null"
},
{"name": "address",
"type": [{
"type": "record",
"name": "Address",
"fields": [{
"name": "street",
"type": "string",
"default": ""
}]
},
"null"
],
"doc": "addressDoc",
"default": "null"
}
]
}
Expand All @@ -312,8 +329,14 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
"[version=2.0].[type=Payment].[type=Address].address",
"[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
]
assert_field_paths_match(fields, expected_field_paths)
assert fields[1].description == "amountDoc"
assert fields[3].description == "testDoc\nField default value: null"
assert fields[4].description == "areaCodeDoc\nField default value: "
assert fields[8].description == "addressDoc\nField default value: null"


def test_avro_schema_to_mce_fields_with_nesting_across_records():
Expand Down

0 comments on commit 8d525d6

Please sign in to comment.