Skip to content

Commit

Permalink
Fix for Airflow 1 support
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed May 25, 2022
1 parent c131e13 commit c463aaa
Showing 1 changed file with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub_provider.hooks.datahub import AIRFLOW_1

if TYPE_CHECKING:
from airflow import DAG
Expand Down Expand Up @@ -159,6 +158,20 @@ def generate_dataflow(

return data_flow

@staticmethod
def _get_description(task: "BaseOperator") -> Optional[str]:
if hasattr(task, "doc") and task.doc:
return task.doc
elif hasattr(task, "doc_md") and task.doc_md:
return task.doc_md
elif hasattr(task, "doc_json") and task.doc_json:
return task.doc_json
elif hasattr(task, "doc_yaml") and task.doc_yaml:
return task.doc_yaml
elif hasattr(task, "doc_rst") and task.doc_yaml:
return task.doc_yaml
return None

@staticmethod
def generate_datajob(
cluster: str,
Expand All @@ -184,11 +197,7 @@ def generate_datajob(
orchestrator="airflow", env=cluster, flow_id=dag.dag_id
)
datajob = DataJob(id=task.task_id, flow_urn=dataflow_urn)
datajob.description = (
(task.doc or task.doc_md or task.doc_json or task.doc_yaml or task.doc_rst)
if not AIRFLOW_1
else None
)
datajob.description = AirflowGenerator._get_description(task)

job_property_bag: Dict[str, str] = {
key: repr(value)
Expand Down Expand Up @@ -375,7 +384,7 @@ def run_datajob(
job_property_bag["hostname"] = str(ti.hostname)
job_property_bag["max_tries"] = str(ti.max_tries)
# Not compatible with Airflow 1
if not AIRFLOW_1:
if hasattr(ti, "external_executor_id"):
job_property_bag["external_executor_id"] = str(ti.external_executor_id)
job_property_bag["pid"] = str(ti.pid)
job_property_bag["state"] = str(ti.state)
Expand All @@ -387,7 +396,7 @@ def run_datajob(
dpi.url = ti.log_url

# This property only exists in Airflow2
if hasattr(ti.dag_run, "run_type"):
if hasattr(ti, "dag_run") and hasattr(ti.dag_run, "run_type"):
from airflow.utils.types import DagRunType

if ti.dag_run.run_type == DagRunType.SCHEDULED:
Expand Down

0 comments on commit c463aaa

Please sign in to comment.