Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): update CSV source to support description and ownership type #5346

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next

### Breaking Changes

- The `should_overwrite` flag in `csv-enricher` has been replaced with `write_semantics` to match the format used for other sources. See the [documentation](https://datahubproject.io/docs/generated/ingestion/sources/csv/) for more details
### Potential Downtime

### Deprecations
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource,subresource,glossary_terms,tags,owners
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:SavingAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],
resource,subresource,glossary_terms,tags,owners,ownership_type,description
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo!
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar?
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
source:
type: "csv-enricher"
config:
filename: "/Users/adityaradhakrishnan/code/datahub-fork/metadata-ingestion/examples/demo_data/csv_enricher_demo_data.csv"
should_overwrite: false
filename: "./examples/demo_data/csv_enricher_demo_data.csv"
write_semantics: "PATCH"
delimiter: ","
array_delimiter: "|"

Expand Down
240 changes: 160 additions & 80 deletions metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
from typing import Any, Dict

import pydantic

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, ConfigurationError


class CSVEnricherConfig(ConfigModel):
filename: str = pydantic.Field(description="Path to ingestion CSV file")
should_overwrite: bool = pydantic.Field(
default=False,
description="Whether the ingestion should overwrite. Otherwise, we will append data.",
filename: str = pydantic.Field(description="Path to CSV file to ingest")
write_semantics: str = pydantic.Field(
default="PATCH",
description='Whether the new tags, terms and owners to be added will override the existing ones added only by this source or not. Value for this config can be "PATCH" or "OVERRIDE"',
)
delimiter: str = pydantic.Field(
default=",", description="Delimiter to use when parsing CSV"
)
array_delimiter: str = pydantic.Field(
default="|",
description="Delimiter to use when parsing array fields (tags, terms, owners)",
description="Delimiter to use when parsing array fields (tags, terms and owners)",
)

@pydantic.validator("write_semantics")
def validate_write_semantics(cls, write_semantics: str) -> str:
if write_semantics.lower() not in {"patch", "override"}:
raise ConfigurationError(
"write_semantics cannot be any other value than PATCH or OVERRIDE. Default value is PATCH. "
"For PATCH semantics consider using the datahub-rest sink or "
"provide a datahub_api: configuration on your ingestion recipe"
)
return write_semantics

@pydantic.validator("array_delimiter")
def validator_diff(cls, array_delimiter: str, values: Dict[str, Any]) -> str:
if array_delimiter == values["delimiter"]:
raise ConfigurationError(
"array_delimiter and delimiter are the same. Please choose different delimiters."
)
return array_delimiter
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"changeType": "UPSERT",
"aspectName": "glossaryTerms",
"aspect": {
"value": "{\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}",
"value": "{\"terms\": [{\"urn\": \"urn:li:glossaryTerm:CustomerAccount\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -45,7 +45,26 @@
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"value": "{\"owners\": [{\"owner\": \"urn:li:corpuser:datahub\", \"type\": \"NONE\"}, {\"owner\": \"urn:li:corpuser:jdoe\", \"type\": \"NONE\"}], \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}",
"value": "{\"owners\": [{\"owner\": \"urn:li:corpuser:datahub\", \"type\": \"TECHNICAL_OWNER\"}, {\"owner\": \"urn:li:corpuser:jdoe\", \"type\": \"TECHNICAL_OWNER\"}], \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "test-csv-enricher",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "editableDatasetProperties",
"aspect": {
"value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"description\": \"new description\"}",
"contentType": "application/json"
},
"systemMetadata": {
Expand All @@ -64,7 +83,7 @@
"changeType": "UPSERT",
"aspectName": "editableSchemaMetadata",
"aspect": {
"value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"editableSchemaFieldInfo\": [{\"fieldPath\": \"field_foo\", \"glossaryTerms\": {\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}}, {\"fieldPath\": \"field_bar\", \"globalTags\": {\"tags\": [{\"tag\": \"urn:li:tag:Legacy\"}]}}]}",
"value": "{\"created\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"editableSchemaFieldInfo\": [{\"fieldPath\": \"field_foo\", \"description\": \"field_foo!\", \"glossaryTerms\": {\"terms\": [{\"urn\": \"urn:li:glossaryTerm:AccountBalance\"}], \"auditStamp\": {\"time\": 1643871600000, \"actor\": \"urn:li:corpuser:ingestion\"}}}, {\"fieldPath\": \"field_bar\", \"description\": \"field_bar?\", \"globalTags\": {\"tags\": [{\"tag\": \"urn:li:tag:Legacy\"}]}}]}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource,subresource,glossary_terms,tags,owners
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:AccountBalance],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],
resource,subresource,glossary_terms,tags,owners,ownership_type,description
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo!
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar?
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_csv_enricher_config():
config = CSVEnricherConfig.parse_obj(
dict(
filename="../integration/csv_enricher/csv_enricher_test_data.csv",
should_overwrite=True,
write_semantics="OVERRIDE",
delimiter=",",
array_delimiter="|",
)
Expand All @@ -34,7 +34,7 @@ def test_csv_enricher_source(pytestconfig, tmp_path):
"type": "csv-enricher",
"config": {
"filename": f"{test_resources_dir}/csv_enricher_test_data.csv",
"should_overwrite": True,
"write_semantics": "OVERRIDE",
"delimiter": ",",
"array_delimiter": "|",
},
Expand Down
20 changes: 19 additions & 1 deletion metadata-ingestion/tests/unit/test_csv_enricher_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def create_base_csv_enricher_config() -> Dict:
return dict(
{
"filename": "../integration/csv_enricher/csv_enricher_test_data.csv",
"should_overwrite": False,
"write_semantics": "PATCH",
"delimiter": ",",
"array_delimiter": "|",
},
Expand Down Expand Up @@ -167,3 +167,21 @@ def test_get_resource_owners_work_unit_produced():
DATASET_URN, DATASET_ENTITY_TYPE, owners
)
assert maybe_owners_wu


def test_get_resource_description_no_description():
source = create_mocked_csv_enricher_source()
new_description = None
maybe_description_wu = source.get_resource_description_work_unit(
DATASET_URN, DATASET_ENTITY_TYPE, new_description
)
assert not maybe_description_wu


def test_get_resource_description_work_unit_produced():
source = create_mocked_csv_enricher_source()
new_description = "description"
maybe_description_wu = source.get_resource_description_work_unit(
DATASET_URN, DATASET_ENTITY_TYPE, new_description
)
assert maybe_description_wu