diff --git a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv index 9023699f38e85..d53f68f21858d 100644 --- a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv +++ b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv @@ -1,4 +1,4 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 9b069f148f7f9..f79e792b54f65 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -15,6 +15,7 @@ from datahub.emitter.serialization_helper import post_json_transform from datahub.metadata.schema_classes import ( DatasetUsageStatisticsClass, + DomainsClass, GlobalTagsClass, GlossaryTermsClass, OwnershipClass, @@ -196,6 +197,13 @@ def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]: aspect_type=GlossaryTermsClass, ) + def get_domain(self, entity_urn: str) -> Optional[DomainsClass]: + return self.get_aspect_v2( + entity_urn=entity_urn, + aspect="domains", + aspect_type=DomainsClass, + ) + def get_usage_aspects_from_urn( self, entity_urn: str, start_timestamp: int, end_timestamp: int ) -> Optional[List[DatasetUsageStatisticsClass]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 8a1827926018f..1ac80968739f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -18,6 +18,7 @@ from datahub.metadata.schema_classes import ( AuditStampClass, ChangeTypeClass, + DomainsClass, EditableDatasetPropertiesClass, EditableSchemaFieldInfoClass, EditableSchemaMetadataClass, @@ -39,6 +40,7 @@ OWNERSHIP_ASPECT_NAME = "ownership" EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties" ACTOR = "urn:li:corpuser:ingestion" +DOMAIN_ASPECT_NAME = "domains" def get_audit_stamp() -> AuditStampClass: @@ -69,6 +71,7 @@ class SubResourceRow: term_associations: List[GlossaryTermAssociationClass] tag_associations: List[TagAssociationClass] description: Optional[str] + domain: Optional[str] @dataclass @@ -78,6 +81,7 @@ class CSVEnricherReport(SourceReport): num_owners_workunits_produced: int = 0 num_description_workunits_produced: int = 0 num_editable_schema_metadata_workunits_produced: int = 0 + num_domain_workunits_produced: int = 0 @platform_name("CSV") @@ -85,17 +89,17 @@ class CSVEnricherReport(SourceReport): @support_status(SupportStatus.INCUBATING) class CSVEnricherSource(Source): """ - This plugin is used to apply glossary terms, tags and owners at the entity level. It can also be used to apply tags + This plugin is used to apply glossary terms, tags, owners and domain at the entity level. It can also be used to apply tags and glossary terms at the column level. These values are read from a CSV file and can be used to either overwrite or append the above aspects to entities. The format of the CSV must be like so, with a few example rows. - |resource |subresource|glossary_terms |tags |owners |ownership_type |description | - |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | + |resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain | + |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|---------------------------| + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|urn:li:domain:Engineering | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | | Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary @@ -254,6 +258,38 @@ def get_resource_owners_work_unit( ) return owners_wu + def get_resource_domain_work_unit( + self, + entity_urn: str, + entity_type: str, + domain: Optional[str], + ) -> Optional[MetadataWorkUnit]: + # Check if there is a domain to add. If not, return None. + if not domain: + return None + + current_domain: Optional[DomainsClass] = None + if self.ctx.graph and not self.should_overwrite: + # Get the existing domain for the entity from the DataHub graph + current_domain = self.ctx.graph.get_domain(entity_urn=entity_urn) + + if not current_domain: + # If we want to overwrite or there is no existing domain, create a new object + current_domain = DomainsClass([domain]) + + domain_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=DOMAIN_ASPECT_NAME, + aspect=current_domain, + ) + domain_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{DOMAIN_ASPECT_NAME}", + mcp=domain_mcpw, + ) + return domain_wu + def get_resource_description_work_unit( self, entity_urn: str, @@ -308,6 +344,7 @@ def get_resource_workunits( term_associations: List[GlossaryTermAssociationClass], tag_associations: List[TagAssociationClass], owners: List[OwnerClass], + domain: Optional[str], description: Optional[str], ) -> Iterable[MetadataWorkUnit]: maybe_terms_wu: Optional[ @@ -344,6 +381,18 @@ def get_resource_workunits( self.report.report_workunit(maybe_owners_wu) yield maybe_owners_wu + maybe_domain_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_domain_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + domain=domain, + ) + if maybe_domain_wu: + self.report.num_domain_workunits_produced += 1 + self.report.report_workunit(maybe_domain_wu) + yield maybe_domain_wu + maybe_description_wu: Optional[ MetadataWorkUnit ] = self.get_resource_description_work_unit( @@ -512,6 +561,7 @@ def maybe_extract_glossary_terms( # Sanitizing the terms string to just get the list of term urns terms_array_string = sanitize_array_string(row["glossary_terms"]) term_urns: List[str] = terms_array_string.split(self.config.array_delimiter) + term_associations: List[GlossaryTermAssociationClass] = [ GlossaryTermAssociationClass(term) for term in term_urns ] @@ -524,6 +574,7 @@ def maybe_extract_tags(self, row: Dict[str, str]) -> List[TagAssociationClass]: # Sanitizing the tags string to just get the list of tag urns tags_array_string = sanitize_array_string(row["tags"]) tag_urns: List[str] = tags_array_string.split(self.config.array_delimiter) + tag_associations: List[TagAssociationClass] = [ TagAssociationClass(tag) for tag in tag_urns ] @@ -546,6 +597,7 @@ def maybe_extract_owners( # Sanitizing the owners string to just get the list of owner urns owners_array_string: str = sanitize_array_string(row["owners"]) owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter) + owners: List[OwnerClass] = [ OwnerClass(owner_urn, type=ownership_type) for owner_urn in owner_urns ] @@ -576,6 +628,12 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: row, is_resource_row ) + domain: Optional[str] = ( + row["domain"] + if row["domain"] and entity_type == DATASET_ENTITY_TYPE + else None + ) + description: Optional[str] = ( row["description"] if row["description"] and entity_type == DATASET_ENTITY_TYPE @@ -589,6 +647,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: term_associations=term_associations, tag_associations=tag_associations, owners=owners, + domain=domain, description=description, ): yield wu @@ -611,6 +670,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: term_associations=term_associations, tag_associations=tag_associations, description=description, + domain=domain, ) ) diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json index 610e083e6510a..bd3f390f2b869 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json @@ -56,6 +56,25 @@ "properties": null } }, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "value": "{\"domains\":[\"urn:li:domain:Engineering\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, { "auditHeader": null, "entityType": "dataset", diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv index 9023699f38e85..d92dcbf7172fc 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -1,4 +1,4 @@ -resource,subresource,glossary_terms,tags,owners,ownership_type,description -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +resource,subresource,glossary_terms,tags,owners,ownership_type,description,domain +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index d331f127c0ed2..c294605f72de4 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -185,3 +185,21 @@ def test_get_resource_description_work_unit_produced(): DATASET_URN, DATASET_ENTITY_TYPE, new_description ) assert maybe_description_wu + + +def test_get_resource_domain_no_domain(): + source = create_mocked_csv_enricher_source() + new_domain = None + maybe_domain_wu = source.get_resource_domain_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_domain + ) + assert not maybe_domain_wu + + +def test_get_resource_domain_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_domain = "domain" + maybe_domain_wu = source.get_resource_domain_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_domain + ) + assert maybe_domain_wu