Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): kafka - properly picking doc from union type #6472

Merged
merged 10 commits into from
Nov 23, 2022
27 changes: 18 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,24 @@ def _get_underlying_type_if_option_as_union(

class SchemaFieldEmissionContextManager:
"""Context Manager for MCE SchemaFiled emission
- handles prefix name stack management and AVRO record-field generation for non-complex types."""
- handles prefix name stack management and AVRO record-field generation for non-complex types.
- actual_schema contains the underlying no-null type's schema if the schema is a union
This way we can use the type/description of the non-null type if needed.
"""

def __init__(
self,
schema: avro.schema.Schema,
actual_schema: avro.schema.Schema,
converter: "AvroToMceSchemaConverter",
description: Optional[str] = None,
default_value: Optional[str] = None,
):
self._schema = schema
self._actual_schema = actual_schema
self._converter = converter
self._description = description
self._default_value = default_value

def __enter__(self):
type_annotation = self._converter._get_type_annotation(self._actual_schema)
Expand Down Expand Up @@ -289,8 +294,11 @@ def emit(self) -> Generator[SchemaField, None, None]:
)

description = self._description
if description is None:
description = schema.props.get("doc", None)
if not description and actual_schema.props.get("doc"):
description = actual_schema.props.get("doc")

if self._default_value is not None:
description = f"{description if description else ''}\nField default value: {self._default_value}"

native_data_type = self._converter._prefix_name_stack[-1]
if isinstance(schema, (avro.schema.Field, avro.schema.UnionSchema)):
Expand All @@ -314,6 +322,8 @@ def emit(self) -> Generator[SchemaField, None, None]:
description = (
f"<span style=\"color:red\">DEPRECATED: {merged_props['deprecated']}</span>\n"
+ description
if description
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this effectively do the same thing you are trying to prevent above?

If description is None we set it to "" ... so why not just set description to "" above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of other integration test expects this property as None and not empty string in their golden files. (like Glue, Hive, etc...) and those broke when I had empty string.
This way the output is identical what we had before.

else ""
)
tags = GlobalTagsClass(
tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")]
Expand Down Expand Up @@ -407,13 +417,12 @@ def _gen_from_last_field(
last_field_schema = self._fields_stack[-1]
# Generate the custom-description for the field.
description = last_field_schema.doc if last_field_schema.doc else None
if last_field_schema.has_default and last_field_schema.default is not None:
description = (
f"{description}\nField default value: {last_field_schema.default}"
)

with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
last_field_schema, last_field_schema, self, description
last_field_schema,
last_field_schema,
self,
description,
last_field_schema.default,
) as f_emit:
yield from f_emit.emit()

Expand Down
25 changes: 24 additions & 1 deletion metadata-ingestion/tests/unit/test_schema_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,17 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
"namespace": "some.event.namespace",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "amount", "type": "double", "doc": "amountDoc"},
{"name": "name","type": "string","default": ""},
{"name": "phoneNumber",
"type": [{
"type": "record",
"name": "PhoneNumber",
"doc": "testDoc",
"fields": [{
"name": "areaCode",
"type": "string",
"doc": "areaCodeDoc",
"default": ""
}, {
"name": "countryCode",
Expand All @@ -298,6 +300,21 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
"null"
],
"default": "null"
},
{"name": "address",
"type": [{
"type": "record",
"name": "Address",
"fields": [{
"name": "street",
"type": "string",
"default": ""
}]
},
"null"
],
"doc": "addressDoc",
"default": "null"
}
]
}
Expand All @@ -312,8 +329,14 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
"[version=2.0].[type=Payment].[type=Address].address",
"[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
]
assert_field_paths_match(fields, expected_field_paths)
assert fields[1].description == "amountDoc"
assert fields[3].description == "testDoc\nField default value: null"
assert fields[4].description == "areaCodeDoc\nField default value: "
assert fields[8].description == "addressDoc\nField default value: null"


def test_avro_schema_to_mce_fields_with_nesting_across_records():
Expand Down