From d2f645c6a205a4ff4682cb9146f526bf191579cc Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 11 Jul 2022 21:13:31 +0100 Subject: [PATCH 1/8] Adds support for Domains in CSV source --- .../recipes/example_to_datahub_rest.dhub.yaml | 1 + .../src/datahub/ingestion/graph/client.py | 8 ++ .../datahub/ingestion/source/csv_enricher.py | 89 +++++++++++++++++-- .../csv-enricher/csv_enricher_golden.json | 19 ++++ .../csv-enricher/csv_enricher_test_data.csv | 4 +- .../tests/unit/test_csv_enricher_source.py | 17 ++++ 6 files changed, 128 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml b/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml index 38cf31a61d5eaf..e2a2f59055aa82 100644 --- a/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml +++ b/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml @@ -9,3 +9,4 @@ sink: type: "datahub-rest" config: server: "http://localhost:8080" + token: eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImV4cCI6MTY1MzUxMTYzOSwianRpIjoiNDg2ZTU2MTMtM2FhNi00Y2MyLTk2YzItZDYzMzY0MTc5MmQxIiwic3ViIjoiZGF0YWh1YiIsImlzcyI6ImRhdGFodWItbWV0YWRhdGEtc2VydmljZSJ9.gt4mWqTPB2EsgxnZc6GB_AKNm9LxS4p7v5QJgQn6kWo \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 04f792f7244714..5354963a924b2e 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -15,6 +15,7 @@ from datahub.emitter.serialization_helper import post_json_transform from datahub.metadata.schema_classes import ( DatasetUsageStatisticsClass, + DomainsClass, GlobalTagsClass, GlossaryTermsClass, OwnershipClass, @@ -196,6 +197,13 @@ def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]: aspect_type=GlossaryTermsClass, ) + def get_domains(self, entity_urn: str) -> Optional[DomainsClass]: + return self.get_aspect_v2( + entity_urn=entity_urn, + aspect="domains", + aspect_type=DomainsClass, + ) + def get_usage_aspects_from_urn( self, entity_urn: str, start_timestamp: int, end_timestamp: int ) -> Optional[List[DatasetUsageStatisticsClass]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 8a1827926018fb..fa9c6b842eb31c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -18,6 +18,7 @@ from datahub.metadata.schema_classes import ( AuditStampClass, ChangeTypeClass, + DomainsClass, EditableDatasetPropertiesClass, EditableSchemaFieldInfoClass, EditableSchemaMetadataClass, @@ -39,7 +40,7 @@ OWNERSHIP_ASPECT_NAME = "ownership" EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties" ACTOR = "urn:li:corpuser:ingestion" - +DOMAIN_ASPECT_NAME = "domains" def get_audit_stamp() -> AuditStampClass: now = int(time.time() * 1000) @@ -69,6 +70,7 @@ class SubResourceRow: term_associations: List[GlossaryTermAssociationClass] tag_associations: List[TagAssociationClass] description: Optional[str] + domain: Optional[str] @dataclass @@ -78,6 +80,7 @@ class CSVEnricherReport(SourceReport): num_owners_workunits_produced: int = 0 num_description_workunits_produced: int = 0 num_editable_schema_metadata_workunits_produced: int = 0 + num_domain_workunits_produced: int = 0 @platform_name("CSV") @@ -91,11 +94,11 @@ class CSVEnricherSource(Source): The format of the CSV must be like so, with a few example rows. - |resource |subresource|glossary_terms |tags |owners |ownership_type |description | - |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | + |resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain | + |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|-------------| + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|Engineering | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | | Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary @@ -117,7 +120,7 @@ def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext): "Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe." ) - def get_resource_glossary_terms_work_unit( + def get_resource_glossary_terms_work_unit( self, entity_urn: str, entity_type: str, @@ -160,7 +163,7 @@ def get_resource_glossary_terms_work_unit( ) terms_wu: MetadataWorkUnit = MetadataWorkUnit( id=f"{entity_urn}-{GLOSSARY_TERMS_ASPECT_NAME}", - mcp=terms_mcpw, + mcp=terms_mcpw,glossaryTerm ) return terms_wu @@ -254,6 +257,38 @@ def get_resource_owners_work_unit( ) return owners_wu + def get_resource_domain_work_unit( + self, + entity_urn: str, + entity_type: str, + domain: Optional[DomainsClass], + ) -> Optional[MetadataWorkUnit]: + # Check if there is a domain to add. If not, return None. + if not domain: + return None + + current_domain: Optional[DomainsClass] = None + if self.ctx.graph and not self.should_overwrite: + # Get the existing domain for the entity from the DataHub graph + current_domain = self.ctx.graph.get_domain(entity_urn=entity_urn) + + if not current_domain: + # If we want to overwrite or there is no existing domain, create a new object + current_domain = DomainsClass(domain, get_audit_stamp()) + + domain_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=DOMAIN_ASPECT_NAME, + aspect=current_domain, + ) + domain_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{DOMAIN_ASPECT_NAME}", + mcp=domain_mcpw, + ) + return domain_wu + def get_resource_description_work_unit( self, entity_urn: str, @@ -308,6 +343,7 @@ def get_resource_workunits( term_associations: List[GlossaryTermAssociationClass], tag_associations: List[TagAssociationClass], owners: List[OwnerClass], + domain: Optional[str], description: Optional[str], ) -> Iterable[MetadataWorkUnit]: maybe_terms_wu: Optional[ @@ -344,6 +380,18 @@ def get_resource_workunits( self.report.report_workunit(maybe_owners_wu) yield maybe_owners_wu + maybe_domain_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_domain_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + domain=domain, + ) + if maybe_domain_wu: + self.report.num_domain_workunits_produced += 1 + self.report.report_workunit(maybe_domain_wu) + yield maybe_domain_wu + maybe_description_wu: Optional[ MetadataWorkUnit ] = self.get_resource_description_work_unit( @@ -367,6 +415,7 @@ def process_sub_resource_row( GlossaryTermAssociationClass ] = sub_resource_row.term_associations tag_associations: List[TagAssociationClass] = sub_resource_row.tag_associations + domain: Optional[str] = sub_resource_row.domain description: Optional[str] = sub_resource_row.description has_terms: bool = len(term_associations) > 0 has_tags: bool = len(tag_associations) > 0 @@ -551,6 +600,17 @@ def maybe_extract_owners( ] return owners + def maybe_extract_domain( + self, row: Dict[str, str], is_resource_row: bool + ) -> List[DomainsClass]: + if not row["domain"]: + return [] + + # Get domain + domain_name = row["domain"] + domain: List[DomainsClass] = [DomainsClass([domain_name])] + return domain + def get_workunits(self) -> Iterable[MetadataWorkUnit]: with open(self.config.filename, "r") as f: rows = csv.DictReader(f, delimiter=self.config.delimiter) @@ -576,6 +636,17 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: row, is_resource_row ) + domains: List[DomainsClass] = self.maybe_extract_domain( + row, is_resource_row + ) + + # Does this make sense? + domain: Optional[DomainsClass] = ( + row["domain"] + if row["domain"] and entity_type == DATASET_ENTITY_TYPE + else None + ) + description: Optional[str] = ( row["description"] if row["description"] and entity_type == DATASET_ENTITY_TYPE @@ -589,6 +660,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: term_associations=term_associations, tag_associations=tag_associations, owners=owners, + domain=domain, description=description, ): yield wu @@ -611,6 +683,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: term_associations=term_associations, tag_associations=tag_associations, description=description, + domain=domain ) ) diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json index 610e083e6510aa..bd3f390f2b8695 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json @@ -56,6 +56,25 @@ "properties": null } }, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "value": "{\"domains\":[\"urn:li:domain:Engineering\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, { "auditHeader": null, "entityType": "dataset", diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv index 9023699f38e858..61626dbd571db2 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -1,4 +1,4 @@ -resource,subresource,glossary_terms,tags,owners,ownership_type,description -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +resource,subresource,glossary_terms,tags,owners,ownership_type,description,domain +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index d331f127c0ed2f..e26313156cb6c6 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -185,3 +185,20 @@ def test_get_resource_description_work_unit_produced(): DATASET_URN, DATASET_ENTITY_TYPE, new_description ) assert maybe_description_wu + +def test_get_resource_description_no_description(): + source = create_mocked_csv_enricher_source() + new_domain = None + maybe_domain_wu = source.get_resource_domain_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_domain + ) + assert not maybe_domain_wu + + +def test_get_resource_domain_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_domain = "domain" + maybe_domain_wu = source.get_resource_domain_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_domain + ) + assert maybe_domain_wu \ No newline at end of file From e8166ff67f8bcd5ee136e12b2f4303af54d93190 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 11 Jul 2022 21:15:54 +0100 Subject: [PATCH 2/8] Fix lint --- metadata-ingestion/tests/unit/test_csv_enricher_source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index e26313156cb6c6..7bb9c406aa96f9 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -186,6 +186,7 @@ def test_get_resource_description_work_unit_produced(): ) assert maybe_description_wu + def test_get_resource_description_no_description(): source = create_mocked_csv_enricher_source() new_domain = None @@ -201,4 +202,4 @@ def test_get_resource_domain_work_unit_produced(): maybe_domain_wu = source.get_resource_domain_work_unit( DATASET_URN, DATASET_ENTITY_TYPE, new_domain ) - assert maybe_domain_wu \ No newline at end of file + assert maybe_domain_wu From 48b319647db82c82385f4d4294bf33072c6feac3 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 11 Jul 2022 21:21:33 +0100 Subject: [PATCH 3/8] fix lint issue --- metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index fa9c6b842eb31c..fec54119caacab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -120,7 +120,7 @@ def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext): "Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe." ) - def get_resource_glossary_terms_work_unit( + def get_resource_glossary_terms_work_unit( self, entity_urn: str, entity_type: str, From 1b30898077f7b9064266fc25d51ba9387466fae8 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 11 Jul 2022 21:52:13 +0100 Subject: [PATCH 4/8] Fixed metadata-ingestion lint --- .../src/datahub/ingestion/graph/client.py | 2 +- .../src/datahub/ingestion/source/csv_enricher.py | 16 ++++++---------- .../tests/unit/test_csv_enricher_source.py | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 5354963a924b2e..f381fabbce324b 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -197,7 +197,7 @@ def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]: aspect_type=GlossaryTermsClass, ) - def get_domains(self, entity_urn: str) -> Optional[DomainsClass]: + def get_domain(self, entity_urn: str) -> Optional[DomainsClass]: return self.get_aspect_v2( entity_urn=entity_urn, aspect="domains", diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index fec54119caacab..5dd78777690680 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -42,6 +42,7 @@ ACTOR = "urn:li:corpuser:ingestion" DOMAIN_ASPECT_NAME = "domains" + def get_audit_stamp() -> AuditStampClass: now = int(time.time() * 1000) return AuditStampClass(now, ACTOR) @@ -163,7 +164,7 @@ def get_resource_glossary_terms_work_unit( ) terms_wu: MetadataWorkUnit = MetadataWorkUnit( id=f"{entity_urn}-{GLOSSARY_TERMS_ASPECT_NAME}", - mcp=terms_mcpw,glossaryTerm + mcp=terms_mcpw, ) return terms_wu @@ -261,7 +262,7 @@ def get_resource_domain_work_unit( self, entity_urn: str, entity_type: str, - domain: Optional[DomainsClass], + domain: Optional[str], ) -> Optional[MetadataWorkUnit]: # Check if there is a domain to add. If not, return None. if not domain: @@ -274,7 +275,7 @@ def get_resource_domain_work_unit( if not current_domain: # If we want to overwrite or there is no existing domain, create a new object - current_domain = DomainsClass(domain, get_audit_stamp()) + current_domain = DomainsClass([domain]) domain_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( entityType=entity_type, @@ -415,7 +416,6 @@ def process_sub_resource_row( GlossaryTermAssociationClass ] = sub_resource_row.term_associations tag_associations: List[TagAssociationClass] = sub_resource_row.tag_associations - domain: Optional[str] = sub_resource_row.domain description: Optional[str] = sub_resource_row.description has_terms: bool = len(term_associations) > 0 has_tags: bool = len(tag_associations) > 0 @@ -636,12 +636,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: row, is_resource_row ) - domains: List[DomainsClass] = self.maybe_extract_domain( - row, is_resource_row - ) - # Does this make sense? - domain: Optional[DomainsClass] = ( + domain: Optional[str] = ( row["domain"] if row["domain"] and entity_type == DATASET_ENTITY_TYPE else None @@ -683,7 +679,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: term_associations=term_associations, tag_associations=tag_associations, description=description, - domain=domain + domain=domain, ) ) diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index 7bb9c406aa96f9..c294605f72de49 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -187,7 +187,7 @@ def test_get_resource_description_work_unit_produced(): assert maybe_description_wu -def test_get_resource_description_no_description(): +def test_get_resource_domain_no_domain(): source = create_mocked_csv_enricher_source() new_domain = None maybe_domain_wu = source.get_resource_domain_work_unit( From a6085dd236726018b191c9056cdcb91c923b3877 Mon Sep 17 00:00:00 2001 From: argo-app-delete pod Date: Thu, 14 Jul 2022 18:08:38 +0100 Subject: [PATCH 5/8] Apply review comments --- .../examples/demo_data/csv_enricher_demo_data.csv | 2 +- .../recipes/example_to_datahub_rest.dhub.yaml | 1 - .../src/datahub/ingestion/source/csv_enricher.py | 14 +++----------- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv index 9023699f38e858..d53f68f21858d8 100644 --- a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv +++ b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv @@ -1,4 +1,4 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml b/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml index e2a2f59055aa82..38cf31a61d5eaf 100644 --- a/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml +++ b/metadata-ingestion/examples/recipes/example_to_datahub_rest.dhub.yaml @@ -9,4 +9,3 @@ sink: type: "datahub-rest" config: server: "http://localhost:8080" - token: eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImV4cCI6MTY1MzUxMTYzOSwianRpIjoiNDg2ZTU2MTMtM2FhNi00Y2MyLTk2YzItZDYzMzY0MTc5MmQxIiwic3ViIjoiZGF0YWh1YiIsImlzcyI6ImRhdGFodWItbWV0YWRhdGEtc2VydmljZSJ9.gt4mWqTPB2EsgxnZc6GB_AKNm9LxS4p7v5QJgQn6kWo \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 5dd78777690680..ca4a18fff6b77e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -561,6 +561,7 @@ def maybe_extract_glossary_terms( # Sanitizing the terms string to just get the list of term urns terms_array_string = sanitize_array_string(row["glossary_terms"]) term_urns: List[str] = terms_array_string.split(self.config.array_delimiter) + term_associations: List[GlossaryTermAssociationClass] = [ GlossaryTermAssociationClass(term) for term in term_urns ] @@ -573,6 +574,7 @@ def maybe_extract_tags(self, row: Dict[str, str]) -> List[TagAssociationClass]: # Sanitizing the tags string to just get the list of tag urns tags_array_string = sanitize_array_string(row["tags"]) tag_urns: List[str] = tags_array_string.split(self.config.array_delimiter) + tag_associations: List[TagAssociationClass] = [ TagAssociationClass(tag) for tag in tag_urns ] @@ -595,22 +597,12 @@ def maybe_extract_owners( # Sanitizing the owners string to just get the list of owner urns owners_array_string: str = sanitize_array_string(row["owners"]) owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter) + owners: List[OwnerClass] = [ OwnerClass(owner_urn, type=ownership_type) for owner_urn in owner_urns ] return owners - def maybe_extract_domain( - self, row: Dict[str, str], is_resource_row: bool - ) -> List[DomainsClass]: - if not row["domain"]: - return [] - - # Get domain - domain_name = row["domain"] - domain: List[DomainsClass] = [DomainsClass([domain_name])] - return domain - def get_workunits(self) -> Iterable[MetadataWorkUnit]: with open(self.config.filename, "r") as f: rows = csv.DictReader(f, delimiter=self.config.delimiter) From 1df4416de84f6efadadb6916aac73922009671df Mon Sep 17 00:00:00 2001 From: argo-app-delete pod Date: Thu, 14 Jul 2022 18:11:06 +0100 Subject: [PATCH 6/8] remove comment --- metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index ca4a18fff6b77e..93c48afa714896 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -628,7 +628,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: row, is_resource_row ) - # Does this make sense? domain: Optional[str] = ( row["domain"] if row["domain"] and entity_type == DATASET_ENTITY_TYPE From 0bdb5bb29cff8bdfb277023a6b4434d651152857 Mon Sep 17 00:00:00 2001 From: argo-app-delete pod Date: Thu, 14 Jul 2022 19:02:40 +0100 Subject: [PATCH 7/8] Update docs --- metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 93c48afa714896..3543cd6009da44 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -89,7 +89,7 @@ class CSVEnricherReport(SourceReport): @support_status(SupportStatus.INCUBATING) class CSVEnricherSource(Source): """ - This plugin is used to apply glossary terms, tags and owners at the entity level. It can also be used to apply tags + This plugin is used to apply glossary terms, tags, owners and domain at the entity level. It can also be used to apply tags and glossary terms at the column level. These values are read from a CSV file and can be used to either overwrite or append the above aspects to entities. From 0fc41741d44164c24ea89590d25fc4821e06737d Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Thu, 14 Jul 2022 22:04:21 +0100 Subject: [PATCH 8/8] Fix test --- .../src/datahub/ingestion/source/csv_enricher.py | 10 +++++----- .../csv-enricher/csv_enricher_test_data.csv | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 3543cd6009da44..1ac80968739f16 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -95,11 +95,11 @@ class CSVEnricherSource(Source): The format of the CSV must be like so, with a few example rows. - |resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain | - |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|-------------| - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|Engineering | - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | | - |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | | + |resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain | + |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|---------------------------| + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|urn:li:domain:Engineering | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | | Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv index 61626dbd571db2..d92dcbf7172fca 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -1,4 +1,4 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description,domain -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file