Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/airflow): support raw dataset urns in airflow lineage #6854

Merged
merged 2 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions metadata-ingestion/src/datahub_provider/_airflow_compat.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED

from airflow.models.baseoperator import BaseOperator

try:
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
except ModuleNotFoundError:
Operator = BaseOperator # type: ignore
MappedOperator = None # type: ignore
# This module must be imported before any Airflow imports in any of our files.
# The AIRFLOW_PATCHED just helps avoid flake8 errors.

try:
from airflow.sensors.external_task import ExternalTaskSensor
except ImportError:
from airflow.sensors.external_task_sensor import ExternalTaskSensor # type: ignore
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED

assert MARKUPSAFE_PATCHED

AIRFLOW_PATCHED = True

__all__ = [
"MARKUPSAFE_PATCHED",
"Operator",
"BaseOperator",
"MappedOperator",
"ExternalTaskSensor",
"AIRFLOW_PATCHED",
]
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub_provider/_airflow_shims.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datahub_provider._airflow_compat import AIRFLOW_PATCHED

from airflow.models.baseoperator import BaseOperator

try:
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
except ModuleNotFoundError:
Operator = BaseOperator # type: ignore
MappedOperator = None # type: ignore

try:
from airflow.sensors.external_task import ExternalTaskSensor
except ImportError:
from airflow.sensors.external_task_sensor import ExternalTaskSensor # type: ignore

assert AIRFLOW_PATCHED

__all__ = [
"Operator",
"MappedOperator",
"ExternalTaskSensor",
]
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub_provider/_lineage_core.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from datahub_provider._airflow_compat import Operator

from datetime import datetime
from typing import TYPE_CHECKING, Dict, List

import datahub.emitter.mce_builder as builder
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub.configuration.common import ConfigModel
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_provider._airflow_shims import Operator
from datahub_provider.client.airflow_generator import AirflowGenerator
from datahub_provider.entities import _Entity

Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub_provider/_plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datahub_provider._airflow_compat import BaseOperator, MappedOperator, Operator
from datahub_provider._airflow_compat import AIRFLOW_PATCHED

import contextlib
import logging
Expand All @@ -7,15 +7,18 @@

from airflow.configuration import conf
from airflow.lineage import PIPELINE_OUTLETS
from airflow.models.baseoperator import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.module_loading import import_string
from cattr import structure

from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub_provider._airflow_shims import MappedOperator, Operator
from datahub_provider.client.airflow_generator import AirflowGenerator
from datahub_provider.hooks.datahub import DatahubGenericHook
from datahub_provider.lineage.datahub import DatahubLineageConfig

assert AIRFLOW_PATCHED
logger = logging.getLogger(__name__)

TASK_ON_FAILURE_CALLBACK = "on_failure_callback"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datahub_provider._airflow_compat import BaseOperator, ExternalTaskSensor, Operator
from datahub_provider._airflow_compat import AIRFLOW_PATCHED

from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union, cast

from airflow.configuration import conf
from airflow.models.baseoperator import BaseOperator

from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
Expand All @@ -12,6 +13,9 @@
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub_provider._airflow_shims import ExternalTaskSensor, Operator

assert AIRFLOW_PATCHED

if TYPE_CHECKING:
from airflow import DAG
Expand Down
18 changes: 18 additions & 0 deletions metadata-ingestion/src/datahub_provider/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import attr

import datahub.emitter.mce_builder as builder
from datahub.utilities.urns.urn import guess_entity_type


class _Entity:
Expand All @@ -21,3 +22,20 @@ class Dataset(_Entity):
@property
def urn(self):
return builder.make_dataset_urn(self.platform, self.name, self.env)


@attr.s(str=True)
class Urn(_Entity):
_urn: str = attr.ib()

@_urn.validator
def _validate_urn(self, attribute, value):
if not value.startswith("urn:"):
raise ValueError("invalid urn provided: urns must start with 'urn:'")
if guess_entity_type(value) != "dataset":
# This is because DataJobs only support Dataset lineage.
raise ValueError("Airflow lineage currently only supports datasets")

@property
def urn(self):
return self._urn
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

from datahub_provider.entities import Dataset
from datahub_provider.entities import Dataset, Urn

default_args = {
"owner": "airflow",
Expand All @@ -35,7 +35,11 @@
bash_command="echo 'This is where you might run your data tooling.'",
inlets=[
Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"),
Dataset("snowflake", "mydb.schema.tableB", "DEV"),
# You can also put dataset URNs in the inlets/outlets lists.
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
],
outlets=[Dataset("snowflake", "mydb.schema.tableC")],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

from datahub_provider.entities import Dataset
from datahub_provider.entities import Dataset, Urn

default_args = {
"owner": "airflow",
Expand All @@ -31,9 +31,13 @@ def datahub_lineage_backend_taskflow_demo():
@task(
inlets=[
Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"),
Dataset("snowflake", "mydb.schema.tableB", "DEV"),
# You can also put dataset URNs in the inlets/outlets lists.
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
],
outlets=[Dataset("snowflake", "mydb.schema.tableC")],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)
def run_data_task():
# This is where you might run your data tooling.
Expand Down
Loading