Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for Domains in CSV source #5372

Merged
merged 9 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
token: eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImV4cCI6MTY1MzUxMTYzOSwianRpIjoiNDg2ZTU2MTMtM2FhNi00Y2MyLTk2YzItZDYzMzY0MTc5MmQxIiwic3ViIjoiZGF0YWh1YiIsImlzcyI6ImRhdGFodWItbWV0YWRhdGEtc2VydmljZSJ9.gt4mWqTPB2EsgxnZc6GB_AKNm9LxS4p7v5QJgQn6kWo
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import (
DatasetUsageStatisticsClass,
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
OwnershipClass,
Expand Down Expand Up @@ -196,6 +197,13 @@ def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
aspect_type=GlossaryTermsClass,
)

def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domains",
aspect_type=DomainsClass,
)

def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int
) -> Optional[List[DatasetUsageStatisticsClass]]:
Expand Down
79 changes: 74 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
DomainsClass,
EditableDatasetPropertiesClass,
EditableSchemaFieldInfoClass,
EditableSchemaMetadataClass,
Expand All @@ -39,6 +40,7 @@
OWNERSHIP_ASPECT_NAME = "ownership"
EDITABLE_DATASET_PROPERTIES_ASPECT_NAME = "editableDatasetProperties"
ACTOR = "urn:li:corpuser:ingestion"
DOMAIN_ASPECT_NAME = "domains"


def get_audit_stamp() -> AuditStampClass:
Expand Down Expand Up @@ -69,6 +71,7 @@ class SubResourceRow:
term_associations: List[GlossaryTermAssociationClass]
tag_associations: List[TagAssociationClass]
description: Optional[str]
domain: Optional[str]


@dataclass
Expand All @@ -78,6 +81,7 @@ class CSVEnricherReport(SourceReport):
num_owners_workunits_produced: int = 0
num_description_workunits_produced: int = 0
num_editable_schema_metadata_workunits_produced: int = 0
num_domain_workunits_produced: int = 0


@platform_name("CSV")
Expand All @@ -91,11 +95,11 @@ class CSVEnricherSource(Source):

The format of the CSV must be like so, with a few example rows.

|resource |subresource|glossary_terms |tags |owners |ownership_type |description |
|----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! |
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? |
|resource |subresource|glossary_terms |tags |owners |ownership_type |description |domain |
|----------------------------------------------------------------|-----------|------------------------------------|-------------------|---------------------------------------------------|---------------|---------------|-------------|
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)| |[urn:li:glossaryTerm:AccountBalance]|[urn:li:tag:Legacy]|[urn:li:corpuser:datahub|urn:li:corpuser:jdoe]|TECHNICAL_OWNER|new description|Engineering |
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_foo |[urn:li:glossaryTerm:AccountBalance]| | | |field_foo! | |
|urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)|field_bar | |[urn:li:tag:Legacy]| | |field_bar? | |

Note that the first row does not have a subresource populated. That means any glossary terms, tags, and owners will
be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary
Expand Down Expand Up @@ -254,6 +258,38 @@ def get_resource_owners_work_unit(
)
return owners_wu

def get_resource_domain_work_unit(
self,
entity_urn: str,
entity_type: str,
domain: Optional[str],
) -> Optional[MetadataWorkUnit]:
# Check if there is a domain to add. If not, return None.
if not domain:
return None

current_domain: Optional[DomainsClass] = None
if self.ctx.graph and not self.should_overwrite:
# Get the existing domain for the entity from the DataHub graph
current_domain = self.ctx.graph.get_domain(entity_urn=entity_urn)

if not current_domain:
# If we want to overwrite or there is no existing domain, create a new object
current_domain = DomainsClass([domain])

domain_mcpw: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType=entity_type,
entityUrn=entity_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName=DOMAIN_ASPECT_NAME,
aspect=current_domain,
)
domain_wu: MetadataWorkUnit = MetadataWorkUnit(
id=f"{entity_urn}-{DOMAIN_ASPECT_NAME}",
mcp=domain_mcpw,
)
return domain_wu

def get_resource_description_work_unit(
self,
entity_urn: str,
Expand Down Expand Up @@ -308,6 +344,7 @@ def get_resource_workunits(
term_associations: List[GlossaryTermAssociationClass],
tag_associations: List[TagAssociationClass],
owners: List[OwnerClass],
domain: Optional[str],
description: Optional[str],
) -> Iterable[MetadataWorkUnit]:
maybe_terms_wu: Optional[
Expand Down Expand Up @@ -344,6 +381,18 @@ def get_resource_workunits(
self.report.report_workunit(maybe_owners_wu)
yield maybe_owners_wu

maybe_domain_wu: Optional[
MetadataWorkUnit
] = self.get_resource_domain_work_unit(
entity_urn=entity_urn,
entity_type=entity_type,
domain=domain,
)
if maybe_domain_wu:
self.report.num_domain_workunits_produced += 1
self.report.report_workunit(maybe_domain_wu)
yield maybe_domain_wu

maybe_description_wu: Optional[
MetadataWorkUnit
] = self.get_resource_description_work_unit(
Expand Down Expand Up @@ -551,6 +600,17 @@ def maybe_extract_owners(
]
return owners

def maybe_extract_domain(
self, row: Dict[str, str], is_resource_row: bool
) -> List[DomainsClass]:
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
if not row["domain"]:
return []

# Get domain
domain_name = row["domain"]
domain: List[DomainsClass] = [DomainsClass([domain_name])]
return domain

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
with open(self.config.filename, "r") as f:
rows = csv.DictReader(f, delimiter=self.config.delimiter)
Expand All @@ -576,6 +636,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
row, is_resource_row
)

# Does this make sense?
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
domain: Optional[str] = (
row["domain"]
if row["domain"] and entity_type == DATASET_ENTITY_TYPE
else None
)

description: Optional[str] = (
row["description"]
if row["description"] and entity_type == DATASET_ENTITY_TYPE
Expand All @@ -589,6 +656,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
term_associations=term_associations,
tag_associations=tag_associations,
owners=owners,
domain=domain,
description=description,
):
yield wu
Expand All @@ -611,6 +679,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
term_associations=term_associations,
tag_associations=tag_associations,
description=description,
domain=domain,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\":[\"urn:li:domain:Engineering\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "test-csv-enricher",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource,subresource,glossary_terms,tags,owners,ownership_type,description
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description
resource,subresource,glossary_terms,tags,owners,ownership_type,description,domain
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,Engineering
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_foo,[urn:li:glossaryTerm:AccountBalance],,,,field_foo!
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar?
18 changes: 18 additions & 0 deletions metadata-ingestion/tests/unit/test_csv_enricher_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,21 @@ def test_get_resource_description_work_unit_produced():
DATASET_URN, DATASET_ENTITY_TYPE, new_description
)
assert maybe_description_wu


def test_get_resource_domain_no_domain():
source = create_mocked_csv_enricher_source()
new_domain = None
maybe_domain_wu = source.get_resource_domain_work_unit(
DATASET_URN, DATASET_ENTITY_TYPE, new_domain
)
assert not maybe_domain_wu


def test_get_resource_domain_work_unit_produced():
source = create_mocked_csv_enricher_source()
new_domain = "domain"
maybe_domain_wu = source.get_resource_domain_work_unit(
DATASET_URN, DATASET_ENTITY_TYPE, new_domain
)
assert maybe_domain_wu