diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index f5c9fec6efd555..7d63fedd8a7037 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -5,7 +5,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ## Next ### Breaking Changes - +- The `should_overwrite` flag in `csv-enricher` has been replaced with `write_semantics` to match the format used for other sources. See the [documentation](https://datahubproject.io/docs/generated/ingestion/sources/csv/) for more details ### Potential Downtime ### Deprecations diff --git a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv index 0719ae1bfeda97..9023699f38e858 100644 --- a/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv +++ b/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv @@ -1,4 +1,4 @@ -resource,subresource,glossary_terms,tags,owners -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:SavingAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe] -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],, -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy], \ No newline at end of file +resource,subresource,glossary_terms,tags,owners,ownership_type,description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml b/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml index 1138cc44459260..b1cfb6311ab17d 100644 --- a/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml +++ b/metadata-ingestion/examples/recipes/csv_enricher_to_datahub_rest.dhub.yml @@ -3,8 +3,8 @@ source: type: "csv-enricher" config: - filename: "/Users/adityaradhakrishnan/code/datahub-fork/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv" - should_overwrite: false + filename: "./examples/demo_data/csv_enricher_demo_data.csv" + write_semantics: "PATCH" delimiter: "," array_delimiter: "|" diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 1cbe446d4159cf..8a1827926018fb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -1,8 +1,9 @@ import csv import time from dataclasses import dataclass -from typing import Dict, Iterable, List, Optional, Set, Tuple +from typing import Dict, Iterable, List, Optional, Set, Tuple, Union +from datahub.configuration.common import ConfigurationError from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -17,6 +18,7 @@ from datahub.metadata.schema_classes import ( AuditStampClass, ChangeTypeClass, + EditableDatasetPropertiesClass, EditableSchemaFieldInfoClass, EditableSchemaMetadataClass, GlobalTagsClass, @@ -35,6 +37,7 @@ GLOSSARY_TERMS_ASPECT_NAME = "glossaryTerms" TAGS_ASPECT_NAME = "globalTags" OWNERSHIP_ASPECT_NAME = "ownership" +EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties" ACTOR = "urn:li:corpuser:ingestion" @@ -65,6 +68,7 @@ class SubResourceRow: field_path: str term_associations: List[GlossaryTermAssociationClass] tag_associations: List[TagAssociationClass] + description: Optional[str] @dataclass @@ -72,6 +76,7 @@ class CSVEnricherReport(SourceReport): num_glossary_term_workunits_produced: int = 0 num_tag_workunits_produced: int = 0 num_owners_workunits_produced: int = 0 + num_description_workunits_produced: int = 0 num_editable_schema_metadata_workunits_produced: int = 0 @@ -86,11 +91,11 @@ class CSVEnricherSource(Source): The format of the CSV must be like so, with a few example rows. - | resource | subresource | glossary_terms | tags | owners | - |--------------------------------------------------------------------------|-------------|---------------------------------------|---------------------|-------------------------------------------------| - | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | | [urn:li:glossaryTerm:AccountBalance] | [urn:li:tag:Legacy] | [urn:li:corpuser:datahub] | - | urn:li:dataset:(urn:li:dataPlatform:bigquery,SampleBigqueryDataset,PROD) | field_foo | [urn:li:glossaryTerm:CustomerAccount] | | | - | urn:li:dataset:(urn:li:dataPlatform:redshift,SampleRedshiftDataset,PROD) | field_bar | | [urn:li:tag:Legacy] | | + |resource |subresource|glossary_terms |tags |owners |ownership_type |description | + |----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------| + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description| + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | + |urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary @@ -98,11 +103,6 @@ class CSVEnricherSource(Source): be applied at the resource level and will be ignored if populated for a row with a subresource. """ - # @classmethod - # def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - # config = CSVEnricherConfig.parse_obj(config_dict) - # return cls(config, ctx) - def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext): super().__init__(ctx) self.config: CSVEnricherConfig = config @@ -110,6 +110,12 @@ def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext): self.report: CSVEnricherReport = CSVEnricherReport() # Map from entity urn to a list of SubResourceRow. self.editable_schema_metadata_map: Dict[str, List[SubResourceRow]] = {} + self.should_overwrite: bool = self.config.write_semantics == "OVERRIDE" + if not self.should_overwrite and not self.ctx.graph: + raise ConfigurationError( + "With PATCH semantics, the csv-enricher source requires a datahub_api to connect to. " + "Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe." + ) def get_resource_glossary_terms_work_unit( self, @@ -122,11 +128,7 @@ def get_resource_glossary_terms_work_unit( return None current_terms: Optional[GlossaryTermsClass] = None - if not self.config.should_overwrite: - # Cannot append if the DataHub graph is None - if not self.ctx.graph: - return None - + if self.ctx.graph and not self.should_overwrite: # Get the existing terms for the entity from the DataHub graph current_terms = self.ctx.graph.get_glossary_terms(entity_urn=entity_urn) @@ -173,11 +175,7 @@ def get_resource_tags_work_unit( return None current_tags: Optional[GlobalTagsClass] = None - if not self.config.should_overwrite: - # Cannot append if the DataHub graph is None - if not self.ctx.graph: - return None - + if self.ctx.graph and not self.should_overwrite: # Get the existing tags for the entity from the DataHub graph current_tags = self.ctx.graph.get_tags(entity_urn=entity_urn) @@ -222,11 +220,7 @@ def get_resource_owners_work_unit( return None current_ownership: Optional[OwnershipClass] = None - if not self.config.should_overwrite: - # Cannot append if the DataHub graph is None - if not self.ctx.graph: - return None - + if self.ctx.graph and not self.should_overwrite: # Get the existing owner for the entity from the DataHub graph current_ownership = self.ctx.graph.get_ownership(entity_urn=entity_urn) @@ -260,6 +254,108 @@ def get_resource_owners_work_unit( ) return owners_wu + def get_resource_description_work_unit( + self, + entity_urn: str, + entity_type: str, + description: Optional[str], + ) -> Optional[MetadataWorkUnit]: + # Check if there is a description to add. If not, return None. + if not description: + return None + + # If the description is empty, return None. + if len(description) <= 0: + return None + + current_editable_properties: Optional[EditableDatasetPropertiesClass] = None + if self.ctx.graph and not self.should_overwrite: + # Get the existing editable properties for the entity from the DataHub graph + current_editable_properties = self.ctx.graph.get_aspect_v2( + entity_urn=entity_urn, + aspect=EDITABLE_DATASET_PROPERTIES_ASPECT_NAME, + aspect_type=EditableDatasetPropertiesClass, + ) + + if not current_editable_properties: + # If we want to overwrite or there are no existing editable dataset properties, create a new object + current_editable_properties = EditableDatasetPropertiesClass( + created=get_audit_stamp(), + lastModified=get_audit_stamp(), + description=description, + ) + else: + current_editable_properties.description = description + current_editable_properties.lastModified = get_audit_stamp() + + description_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName=EDITABLE_DATASET_PROPERTIES_ASPECT_NAME, + aspect=current_editable_properties, + ) + description_wu: MetadataWorkUnit = MetadataWorkUnit( + id=f"{entity_urn}-{EDITABLE_DATASET_PROPERTIES_ASPECT_NAME}", + mcp=description_mcpw, + ) + return description_wu + + def get_resource_workunits( + self, + entity_urn: str, + entity_type: str, + term_associations: List[GlossaryTermAssociationClass], + tag_associations: List[TagAssociationClass], + owners: List[OwnerClass], + description: Optional[str], + ) -> Iterable[MetadataWorkUnit]: + maybe_terms_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_glossary_terms_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + term_associations=term_associations, + ) + if maybe_terms_wu: + self.report.num_glossary_term_workunits_produced += 1 + self.report.report_workunit(maybe_terms_wu) + yield maybe_terms_wu + + maybe_tags_wu: Optional[MetadataWorkUnit] = self.get_resource_tags_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + tag_associations=tag_associations, + ) + if maybe_tags_wu: + self.report.num_tag_workunits_produced += 1 + self.report.report_workunit(maybe_tags_wu) + yield maybe_tags_wu + + maybe_owners_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_owners_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + owners=owners, + ) + if maybe_owners_wu: + self.report.num_owners_workunits_produced += 1 + self.report.report_workunit(maybe_owners_wu) + yield maybe_owners_wu + + maybe_description_wu: Optional[ + MetadataWorkUnit + ] = self.get_resource_description_work_unit( + entity_urn=entity_urn, + entity_type=entity_type, + description=description, + ) + if maybe_description_wu: + self.report.num_description_workunits_produced += 1 + self.report.report_workunit(maybe_description_wu) + yield maybe_description_wu + def process_sub_resource_row( self, sub_resource_row: SubResourceRow, @@ -271,11 +367,13 @@ def process_sub_resource_row( GlossaryTermAssociationClass ] = sub_resource_row.term_associations tag_associations: List[TagAssociationClass] = sub_resource_row.tag_associations + description: Optional[str] = sub_resource_row.description has_terms: bool = len(term_associations) > 0 has_tags: bool = len(tag_associations) > 0 + has_description: bool = description is not None and len(description) > 0 - # We can skip this row if there are no tags or terms to add. - if not has_tags and not has_terms: + # We can skip this row if there are no tags, terms or description to edit. + if not has_tags and not has_terms and not has_description: return current_editable_schema_metadata, needs_write # Objects that may or not be written depending on which conditions get triggered. @@ -290,6 +388,8 @@ def process_sub_resource_row( tags_aspect = GlobalTagsClass(tag_associations) if has_tags else None if tags_aspect: field_info_to_set.globalTags = tags_aspect + if has_description: + field_info_to_set.description = description # Boolean field to tell whether we have found a field match. field_match = False @@ -303,7 +403,7 @@ def process_sub_resource_row( # we have some editable schema metadata for this field field_match = True if has_terms: - if field_info.glossaryTerms and not self.config.should_overwrite: + if field_info.glossaryTerms and not self.should_overwrite: current_term_urns = set( [term.urn for term in field_info.glossaryTerms.terms] ) @@ -322,7 +422,7 @@ def process_sub_resource_row( needs_write = True if has_tags: - if field_info.globalTags and not self.config.should_overwrite: + if field_info.globalTags and not self.should_overwrite: current_tag_urns = set( [tag.tag for tag in field_info.globalTags.tags] ) @@ -338,6 +438,10 @@ def process_sub_resource_row( field_info.globalTags = tags_aspect needs_write = True + if has_description: + field_info.description = description + needs_write = True + if not field_match: # this field isn't present in the editable schema metadata aspect, add it current_editable_schema_metadata.editableSchemaFieldInfo.append( @@ -355,11 +459,7 @@ def get_sub_resource_work_units(self) -> Iterable[MetadataWorkUnit]: current_editable_schema_metadata: Optional[ EditableSchemaMetadataClass ] = None - if not self.config.should_overwrite: - # Cannot append if the DataHub graph is None - if not self.ctx.graph: - continue - + if self.ctx.graph and not self.should_overwrite: # Fetch the current editable schema metadata current_editable_schema_metadata = self.ctx.graph.get_aspect_v2( entity_urn=entity_urn, @@ -438,12 +538,16 @@ def maybe_extract_owners( if not row["owners"]: return [] + # Getting the ownership type + ownership_type: Union[str, OwnershipTypeClass] = ( + row["ownership_type"] if row["ownership_type"] else OwnershipTypeClass.NONE + ) + # Sanitizing the owners string to just get the list of owner urns - owners_array_string = sanitize_array_string(row["owners"]) + owners_array_string: str = sanitize_array_string(row["owners"]) owner_urns: List[str] = owners_array_string.split(self.config.array_delimiter) owners: List[OwnerClass] = [ - OwnerClass(owner_urn, type=OwnershipTypeClass.NONE) - for owner_urn in owner_urns + OwnerClass(owner_urn, type=ownership_type) for owner_urn in owner_urns ] return owners @@ -464,57 +568,33 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: GlossaryTermAssociationClass ] = self.maybe_extract_glossary_terms(row) - # If this is a resource row, try to emit an MCP - if len(term_associations) > 0 and is_resource_row: - maybe_terms_wu: Optional[ - MetadataWorkUnit - ] = self.get_resource_glossary_terms_work_unit( - entity_urn=entity_urn, - entity_type=entity_type, - term_associations=term_associations, - ) - if maybe_terms_wu: - self.report.num_glossary_term_workunits_produced += 1 - self.report.report_workunit(maybe_terms_wu) - yield maybe_terms_wu - tag_associations: List[TagAssociationClass] = self.maybe_extract_tags( row ) - # If this a resource row, try to emit an MCP - if len(tag_associations) > 0 and is_resource_row: - maybe_tags_wu: Optional[ - MetadataWorkUnit - ] = self.get_resource_tags_work_unit( - entity_urn=entity_urn, - entity_type=entity_type, - tag_associations=tag_associations, - ) - if maybe_tags_wu: - self.report.num_tag_workunits_produced += 1 - self.report.report_workunit(maybe_tags_wu) - yield maybe_tags_wu - owners: List[OwnerClass] = self.maybe_extract_owners( row, is_resource_row ) - if len(owners) > 0: - maybe_owners_wu: Optional[ - MetadataWorkUnit - ] = self.get_resource_owners_work_unit( + + description: Optional[str] = ( + row["description"] + if row["description"] and entity_type == DATASET_ENTITY_TYPE + else None + ) + + if is_resource_row: + for wu in self.get_resource_workunits( entity_urn=entity_urn, entity_type=entity_type, + term_associations=term_associations, + tag_associations=tag_associations, owners=owners, - ) - if maybe_owners_wu: - self.report.num_owners_workunits_produced += 1 - self.report.report_workunit(maybe_owners_wu) - yield maybe_owners_wu - - # Check if this row is applying aspects at the subresource level. Note that this only corresponds - # to EditableSchemaMetadata for now. - if not is_resource_row: + description=description, + ): + yield wu + + # If this row is not applying changes at the resource level, modify the EditableSchemaMetadata map. + else: # Only dataset sub-resources are currently supported. if entity_type != DATASET_ENTITY_TYPE: continue @@ -530,12 +610,12 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: field_path=field_path, term_associations=term_associations, tag_associations=tag_associations, + description=description, ) ) # Yield sub resource work units once the map has been fully populated. for wu in self.get_sub_resource_work_units(): - self.report.workunits_produced += 1 self.report.num_editable_schema_metadata_workunits_produced += 1 self.report.report_workunit(wu) yield wu diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py index faf8fd9a7359c9..eb1ec323885695 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/csv_enricher.py @@ -1,18 +1,38 @@ +from typing import Any, Dict + import pydantic -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import ConfigModel, ConfigurationError class CSVEnricherConfig(ConfigModel): - filename: str = pydantic.Field(description="Path to ingestion CSV file") - should_overwrite: bool = pydantic.Field( - default=False, - description="Whether the ingestion should overwrite. Otherwise, we will append data.", + filename: str = pydantic.Field(description="Path to CSV file to ingest") + write_semantics: str = pydantic.Field( + default="PATCH", + description='Whether the new tags, terms and owners to be added will override the existing ones added only by this source or not. Value for this config can be "PATCH" or "OVERRIDE"', ) delimiter: str = pydantic.Field( default=",", description="Delimiter to use when parsing CSV" ) array_delimiter: str = pydantic.Field( default="|", - description="Delimiter to use when parsing array fields (tags, terms, owners)", + description="Delimiter to use when parsing array fields (tags, terms and owners)", ) + + @pydantic.validator("write_semantics") + def validate_write_semantics(cls, write_semantics: str) -> str: + if write_semantics.lower() not in {"patch", "override"}: + raise ConfigurationError( + "write_semantics cannot be any other value than PATCH or OVERRIDE. Default value is PATCH. " + "For PATCH semantics consider using the datahub-rest sink or " + "provide a datahub_api: configuration on your ingestion recipe" + ) + return write_semantics + + @pydantic.validator("array_delimiter") + def validator_diff(cls, array_delimiter: str, values: Dict[str, Any]) -> str: + if array_delimiter == values["delimiter"]: + raise ConfigurationError( + "array_delimiter and delimiter are the same. Please choose different delimiters." + ) + return array_delimiter diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json index 28ae94e59565d8..610e083e6510aa 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json @@ -7,7 +7,7 @@ "changeType": "UPSERT", "aspectName": "glossaryTerms", "aspect": { - "value": "{\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}", + "value": "{\"terms\": [{\"urn\": \"urn:li:glossaryTerm:CustomerAccount\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}", "contentType": "application/json" }, "systemMetadata": { @@ -45,7 +45,26 @@ "changeType": "UPSERT", "aspectName": "ownership", "aspect": { - "value": "{\"owners\": [{\"owner\": \"urn:li:corpuser:datahub\", \"type\": \"NONE\"}, {\"owner\": \"urn:li:corpuser:jdoe\", \"type\": \"NONE\"}], \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}", + "value": "{\"owners\": [{\"owner\": \"urn:li:corpuser:datahub\", \"type\": \"TECHNICAL_OWNER\"}, {\"owner\": \"urn:li:corpuser:jdoe\", \"type\": \"TECHNICAL_OWNER\"}], \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "editableDatasetProperties", + "aspect": { + "value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"description\": \"new description\"}", "contentType": "application/json" }, "systemMetadata": { @@ -64,7 +83,7 @@ "changeType": "UPSERT", "aspectName": "editableSchemaMetadata", "aspect": { - "value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"editableSchemaFieldInfo\": [{\"fieldPath\": \"field_foo\", \"glossaryTerms\": {\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}}, {\"fieldPath\": \"field_bar\", \"globalTags\": {\"tags\": [{\"tag\": \"urn:li:tag:Legacy\"}]}}]}", + "value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"editableSchemaFieldInfo\": [{\"fieldPath\": \"field_foo\", \"description\": \"field_foo!\", \"glossaryTerms\": {\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}}, {\"fieldPath\": \"field_bar\", \"description\": \"field_bar?\", \"globalTags\": {\"tags\": [{\"tag\": \"urn:li:tag:Legacy\"}]}}]}", "contentType": "application/json" }, "systemMetadata": { diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv index 10e3e653d77b8d..9023699f38e858 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -1,4 +1,4 @@ -resource,subresource,glossary_terms,tags,owners -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:AccountBalance],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe] -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],, -"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy], \ No newline at end of file +resource,subresource,glossary_terms,tags,owners,ownership_type,description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo! +"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py b/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py index dea2e2284d4fcd..4a447037d1dda2 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py +++ b/metadata-ingestion/tests/integration/csv-enricher/test_csv_enricher.py @@ -13,7 +13,7 @@ def test_csv_enricher_config(): config = CSVEnricherConfig.parse_obj( dict( filename="../integration/csv_enricher/csv_enricher_test_data.csv", - should_overwrite=True, + write_semantics="OVERRIDE", delimiter=",", array_delimiter="|", ) @@ -34,7 +34,7 @@ def test_csv_enricher_source(pytestconfig, tmp_path): "type": "csv-enricher", "config": { "filename": f"{test_resources_dir}/csv_enricher_test_data.csv", - "should_overwrite": True, + "write_semantics": "OVERRIDE", "delimiter": ",", "array_delimiter": "|", }, diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index 747bfa62bdeea6..d331f127c0ed2f 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -60,7 +60,7 @@ def create_base_csv_enricher_config() -> Dict: return dict( { "filename": "../integration/csv_enricher/csv_enricher_test_data.csv", - "should_overwrite": False, + "write_semantics": "PATCH", "delimiter": ",", "array_delimiter": "|", }, @@ -167,3 +167,21 @@ def test_get_resource_owners_work_unit_produced(): DATASET_URN, DATASET_ENTITY_TYPE, owners ) assert maybe_owners_wu + + +def test_get_resource_description_no_description(): + source = create_mocked_csv_enricher_source() + new_description = None + maybe_description_wu = source.get_resource_description_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_description + ) + assert not maybe_description_wu + + +def test_get_resource_description_work_unit_produced(): + source = create_mocked_csv_enricher_source() + new_description = "description" + maybe_description_wu = source.get_resource_description_work_unit( + DATASET_URN, DATASET_ENTITY_TYPE, new_description + ) + assert maybe_description_wu