Skip to content

Commit

Permalink
feat(transformers): Add semantics & transform_aspect support in trans…
Browse files Browse the repository at this point in the history
…formers (#5514)

Co-authored-by: MohdSiddique Bagwan <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
3 people authored Sep 6, 2022
1 parent 9434afc commit 2f65e2f
Show file tree
Hide file tree
Showing 22 changed files with 2,528 additions and 1,040 deletions.
2 changes: 1 addition & 1 deletion docs-website/genJsonSchema/gen_json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_base() -> Any:
"type": "array",
"items": {
"type": "object",
"description": "Transformer configs see at https://datahubproject.io/docs/metadata-ingestion/transformers",
"description": "Transformer configs see at https://datahubproject.io/docs/metadata-ingestion/docs/transformer",
"properties": {
"type": {"type": "string", "description": "Transformer type"},
"config": {
Expand Down
7 changes: 6 additions & 1 deletion docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ module.exports = {
{
Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"),
},
"metadata-ingestion/transformers",
{
Transformers: [
"metadata-ingestion/docs/transformer/intro",
"metadata-ingestion/docs/transformer/dataset_transformer",
],
},
{
"Advanced Guides": [
{
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ transformers: # an array of transformers applied sequentially
# default sink, no config needed
```

Check out the [transformers guide](./transformers.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!
Check out the [transformers guide](./docs/transformer/intro.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!

## Using as a library (SDK)

Expand All @@ -195,5 +195,5 @@ In some cases, you might want to configure and run a pipeline entirely from with

## Developing

See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./transformers.md).
See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./docs/transformer/intro.md).

2 changes: 1 addition & 1 deletion metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ext {
}

task checkPythonVersion(type: Exec) {
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 7)'
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 6)'
}

task environmentSetup(type: Exec, dependsOn: checkPythonVersion) {
Expand Down
1,203 changes: 1,203 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions metadata-ingestion/docs/transformer/intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
title: "Introduction"
---

# Transformers

## What’s a transformer?

Oftentimes we want to modify metadata before it reaches the ingestion sink – for instance, we might want to add custom tags, ownership, properties, or patch some fields. A transformer allows us to do exactly these things.

Moreover, a transformer allows one to have fine-grained control over the metadata that’s ingested without having to modify the ingestion framework's code yourself. Instead, you can write your own module that can transform metadata events however you like. To include a transformer into a recipe, all that's needed is the name of the transformer as well as any configuration that the transformer needs.

## Provided transformers

Aside from the option of writing your own transformer (see below), we provide some simple transformers for the use cases of adding: tags, glossary terms, properties and ownership information.

DataHub provided transformers for dataset are:
- [Simple Add Dataset ownership](./dataset_transformer.md#simple-add-dataset-ownership)
- [Pattern Add Dataset ownership](./dataset_transformer.md#pattern-add-dataset-ownership)
- [Simple Remove Dataset ownership](./dataset_transformer.md#simple-remove-dataset-ownership)
- [Mark Dataset Status](./dataset_transformer.md#mark-dataset-status)
- [Simple Add Dataset globalTags](./dataset_transformer.md#simple-add-dataset-globaltags)
- [Pattern Add Dataset globalTags](./dataset_transformer.md#pattern-add-dataset-globaltags)
- [Add Dataset globalTags](./dataset_transformer.md#add-dataset-globaltags)
- [Set Dataset browsePath](./dataset_transformer.md#set-dataset-browsepath)
- [Simple Add Dataset glossaryTerms](./dataset_transformer.md#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](./dataset_transformer.md#pattern-add-dataset-glossaryterms)
- [Pattern Add Dataset Schema Field glossaryTerms](./dataset_transformer.md#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](./dataset_transformer.md#pattern-add-dataset-schema-field-globaltags)
- [Simple Add Dataset datasetProperties](./dataset_transformer.md#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](./dataset_transformer.md#add-dataset-datasetproperties)
- [Simple Add Dataset domains](./dataset_transformer.md#simple-add-dataset-domains)
- [Pattern Add Dataset domains](./dataset_transformer.md#pattern-add-dataset-domains)
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import json
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, TransformerSemantics
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.add_dataset_ownership import Semantics
from datahub.ingestion.transformer.base_transformer import (
BaseTransformer,
SingleAspectTransformer,
Expand All @@ -18,7 +17,7 @@

class AddCustomOwnershipConfig(ConfigModel):
owners_json: str
semantics: Semantics = Semantics.OVERWRITE
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE


class AddCustomOwnership(BaseTransformer, SingleAspectTransformer):
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TransformerSemantics(Enum):

class TransformerSemanticsConfigModel(ConfigModel):
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE
replace_existing: bool = False

@validator("semantics", pre=True)
def ensure_semantics_is_upper_case(cls, v: str) -> str:
Expand Down
26 changes: 26 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
DatasetUsageStatisticsClass,
DomainPropertiesClass,
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
OwnershipClass,
SchemaMetadataClass,
TelemetryClientIdClass,
)
from datahub.utilities.urns.urn import Urn
Expand Down Expand Up @@ -185,13 +188,29 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
aspect_type=OwnershipClass,
)

def get_schema_metadata(self, entity_urn: str) -> Optional[SchemaMetadataClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="schemaMetadata",
aspect_type=SchemaMetadataClass,
)

def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domainProperties",
aspect_type=DomainPropertiesClass,
)

def get_dataset_properties(
self, entity_urn: str
) -> Optional[DatasetPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="datasetProperties",
aspect_type=DatasetPropertiesClass,
)

def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
Expand All @@ -213,6 +232,13 @@ def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
aspect_type=DomainsClass,
)

def get_browse_path(self, entity_urn: str) -> Optional[BrowsePathsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="browsePaths",
aspect_type=BrowsePathsClass,
)

def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int
) -> Optional[List[DatasetUsageStatisticsClass]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from typing import List
from typing import List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import (
DatasetBrowsePathsTransformer,
)
from datahub.metadata.schema_classes import BrowsePathsClass, MetadataChangeEventClass
from datahub.metadata.schema_classes import BrowsePathsClass


class AddDatasetBrowsePathConfig(ConfigModel):
class AddDatasetBrowsePathConfig(TransformerSemanticsConfigModel):
path_templates: List[str]
replace_existing: bool = False


class AddDatasetBrowsePathTransformer(DatasetBrowsePathsTransformer):
Expand All @@ -32,32 +35,59 @@ def create(
config = AddDatasetBrowsePathConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
@staticmethod
def get_browse_paths_to_set(
graph: DataHubGraph, urn: str, mce_browse_paths: Optional[BrowsePathsClass]
) -> Optional[BrowsePathsClass]:
if not mce_browse_paths or not mce_browse_paths.paths:
# nothing to add, no need to consult server
return None

server_browse_paths = graph.get_browse_path(entity_urn=urn)
if server_browse_paths:
# compute patch
# we only include domain who are not present in the server domain list
paths_to_add: List[str] = []
for path in mce_browse_paths.paths:
if path not in server_browse_paths.paths:
paths_to_add.append(path)
# Lets patch
mce_browse_paths.paths = []
mce_browse_paths.paths.extend(server_browse_paths.paths)
mce_browse_paths.paths.extend(paths_to_add)

return mce_browse_paths

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
platform_part, dataset_fqdn, env = (
mce.proposedSnapshot.urn.replace("urn:li:dataset:(", "")
.replace(")", "")
.split(",")
entity_urn.replace("urn:li:dataset:(", "").replace(")", "").split(",")
)

platform = platform_part.replace("urn:li:dataPlatform:", "")
dataset = dataset_fqdn.replace(".", "/")

browse_paths = builder.get_or_add_aspect(
mce,
BrowsePathsClass(
paths=[],
),
)

if self.config.replace_existing:
browse_paths.paths = []
browse_paths = BrowsePathsClass(paths=[])
if aspect is not None and self.config.replace_existing is False:
browse_paths.paths.extend(aspect.paths) # type: ignore[attr-defined]

for template in self.config.path_templates:
browse_path = (
template.replace("PLATFORM", platform)
.replace("DATASET_PARTS", dataset)
.replace("ENV", env.lower())
)

browse_paths.paths.append(browse_path)

return mce
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_browse_paths: Optional[
BrowsePathsClass
] = AddDatasetBrowsePathTransformer.get_browse_paths_to_set(
self.ctx.graph, entity_urn, browse_paths
)
if patch_browse_paths is not None:
browse_paths = patch_browse_paths

return cast(Optional[Aspect], browse_paths)
Loading

0 comments on commit 2f65e2f

Please sign in to comment.