Skip to content

Commit

Permalink
fix(ingest/airflow): reorder imports to avoid cyclical dependencies (d…
Browse files Browse the repository at this point in the history
…atahub-project#6719)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
2 people authored and cccs-Dustin committed Feb 1, 2023
1 parent 344dd66 commit 8f39b07
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
3 changes: 3 additions & 0 deletions metadata-ingestion/src/datahub_provider/_airflow_shims.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
except ModuleNotFoundError:
# Operator isn't a real class, but rather a type alias defined
# as the union of BaseOperator and MappedOperator.
# Since older versions of Airflow don't have MappedOperator, we can just use BaseOperator.
Operator = BaseOperator # type: ignore
MappedOperator = None # type: ignore

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub_provider/_lineage_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub.configuration.common import ConfigModel
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_provider._airflow_shims import Operator
from datahub_provider.client.airflow_generator import AirflowGenerator
from datahub_provider.entities import _Entity

Expand All @@ -14,6 +13,7 @@
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance

from datahub_provider._airflow_shims import Operator
from datahub_provider.hooks.datahub import DatahubGenericHook


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union, cast

from airflow.configuration import conf
from airflow.models.baseoperator import BaseOperator

from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
Expand All @@ -13,7 +12,6 @@
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub_provider._airflow_shims import ExternalTaskSensor, Operator

assert AIRFLOW_PATCHED

Expand All @@ -23,6 +21,7 @@

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub_provider._airflow_shims import Operator


def _task_downstream_task_ids(operator: "Operator") -> Set[str]:
Expand All @@ -36,6 +35,8 @@ class AirflowGenerator:
def _get_dependencies(
task: "Operator", dag: "DAG", flow_urn: DataFlowUrn
) -> List[DataJobUrn]:
from datahub_provider._airflow_shims import ExternalTaskSensor

# resolve URNs for upstream nodes in subdags upstream of the current task.
upstream_subdag_task_urns: List[DataJobUrn] = []

Expand Down Expand Up @@ -182,6 +183,8 @@ def generate_dataflow(

@staticmethod
def _get_description(task: "Operator") -> Optional[str]:
from airflow.models.baseoperator import BaseOperator

if not isinstance(task, BaseOperator):
# TODO: Get docs for mapped operators.
return None
Expand Down

0 comments on commit 8f39b07

Please sign in to comment.