Skip to content

Commit

Permalink
feat(ingest): drop plugin support for airflow 1.x (datahub-project#6331)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-tom committed Nov 18, 2022
1 parent cafd620 commit f61e8a9
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 355 deletions.
36 changes: 6 additions & 30 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,20 @@ concurrency:
cancel-in-progress: true

jobs:
metadata-ingestion-general:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.7", "3.10.6"]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Run metadata-ingestion tests
run: ./gradlew :metadata-ingestion:build :metadata-ingestion:testQuick :metadata-ingestion:check
- uses: actions/upload-artifact@v3
if: always()
with:
name: Test Results (metadata ingestion ${{ matrix.python-version }} testQuick)
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
metadata-ingestion:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
# TODO: Enable this once the test is fixed.
# DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
strategy:
matrix:
python-version: ["3.7", "3.10"]
command:
[
"installAirflow1",
"lint",
"testQuick",
"testIntegration",
"testIntegrationBatch1",
"testSlowIntegration",
Expand All @@ -75,12 +51,12 @@ jobs:
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Run metadata-ingestion tests
run: ./gradlew :metadata-ingestion:build :metadata-ingestion:${{ matrix.command }} -x:metadata-ingestion:testQuick -x:metadata-ingestion:check
run: ./gradlew :metadata-ingestion:build :metadata-ingestion:${{ matrix.command }}
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: always()
if: ${{ always() && matrix.command != 'lint' }}
with:
name: Test Results (metadata ingestion ${{ matrix.python-version }})
path: |
Expand Down
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- LookML source will only emit views that are reachable from explores while scanning your git repo. Previous behavior can be achieved by setting `emit_reachable_views_only` to False.
- LookML source will always lowercase urns for lineage edges from views to upstream tables. There is no fallback provided to previous behavior because it was inconsistent in application of lower-casing earlier.
- dbt config `node_type_pattern` which was previously deprecated has been removed. Use `entities_enabled` instead to control whether to emit metadata for sources, models, seeds, tests, etc.
- The DataHub Airflow lineage backend and plugin no longer support Airflow 1.x. You can still run DataHub ingestion in Airflow 1.x using the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/operators/python_operator/index.html?highlight=pythonvirtualenvoperator#airflow.operators.python_operator.PythonVirtualenvOperator).

### Breaking Changes
- Java version 11 or greater is required.
Expand Down
19 changes: 2 additions & 17 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import os
import pathlib
import sys
from typing import Dict, Set

import setuptools

is_py37_or_newer = sys.version_info >= (3, 7)


package_metadata: dict = {}
with open("./src/datahub_airflow_plugin/__init__.py") as fp:
exec(fp.read(), package_metadata)
Expand All @@ -26,7 +22,7 @@ def get_long_description():
# Actual dependencies.
"typing-inspect",
"pydantic>=1.5.1",
"apache-airflow >= 1.10.2",
"apache-airflow >= 2.0.2",
"acryl-datahub[airflow] >= 0.8.36",
# Pinned dependencies to make dependency resolution faster.
"sqlalchemy==1.3.24",
Expand Down Expand Up @@ -77,20 +73,10 @@ def get_long_description():
"packaging",
}

base_dev_requirements_airflow_1 = base_dev_requirements.copy()

dev_requirements = {
*base_dev_requirements,
}

dev_requirements_airflow_1_base = {
"apache-airflow==1.10.15",
"apache-airflow-backport-providers-snowflake",
}
dev_requirements_airflow_1 = {
*base_dev_requirements_airflow_1,
*dev_requirements_airflow_1_base,
}

entry_points = {
"airflow.plugins": "acryl-datahub-airflow-plugin = datahub_airflow_plugin.datahub_plugin:DatahubPlugin"
Expand Down Expand Up @@ -119,6 +105,7 @@ def get_long_description():
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand All @@ -140,7 +127,5 @@ def get_long_description():
install_requires=list(base_requirements),
extras_require={
"dev": list(dev_requirements),
"dev-airflow1-base": list(dev_requirements_airflow_1_base),
"dev-airflow1": list(dev_requirements_airflow_1),
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cattr import structure
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub_provider.client.airflow_generator import AirflowGenerator
from datahub_provider.hooks.datahub import AIRFLOW_1, DatahubGenericHook
from datahub_provider.hooks.datahub import DatahubGenericHook
from datahub_provider.lineage.datahub import DatahubLineageConfig


Expand Down Expand Up @@ -40,38 +40,6 @@ def get_lineage_config() -> DatahubLineageConfig:

def get_inlets_from_task(task: BaseOperator, context: Any) -> Iterable[Any]:
inlets = []
needs_repeat_preparation = False
if (
not AIRFLOW_1
and isinstance(task._inlets, list)
and len(task._inlets) == 1
and isinstance(task._inlets[0], dict)
):
# This is necessary to avoid issues with circular imports.
from airflow.lineage import AUTO, prepare_lineage

task._inlets = [
# See https://airflow.apache.org/docs/apache-airflow/1.10.15/lineage.html.
*task._inlets[0].get("datasets", []), # assumes these are attr-annotated
*task._inlets[0].get("task_ids", []),
*([AUTO] if task._inlets[0].get("auto", False) else []),
]
needs_repeat_preparation = True

if (
not AIRFLOW_1
and isinstance(task._outlets, list)
and len(task._outlets) == 1
and isinstance(task._outlets[0], dict)
):
task._outlets = [*task._outlets[0].get("datasets", [])]
needs_repeat_preparation = True
if needs_repeat_preparation:
# Rerun the lineage preparation routine, now that the old format has been translated to the new one.
prepare_lineage(lambda self, ctx: None)(task, context)

context = context or {} # ensure not None to satisfy mypy

if isinstance(task._inlets, (str, BaseOperator)) or attr.has(task._inlets): # type: ignore
inlets = [
task._inlets,
Expand Down Expand Up @@ -370,27 +338,13 @@ def custom_task_policy(task):
return custom_task_policy


def set_airflow2_policies(settings):
def _patch_policy(settings):
print("Patching datahub policy")
if hasattr(settings, "task_policy"):
datahub_task_policy = _wrap_task_policy(settings.task_policy)
settings.task_policy = datahub_task_policy


def set_airflow1_policies(settings):
if hasattr(settings, "policy"):
datahub_task_policy = _wrap_task_policy(settings.policy)
settings.policy = datahub_task_policy


def _patch_policy(settings):
if AIRFLOW_1:
print("Patching datahub policy for Airflow 1")
set_airflow1_policies(settings)
else:
print("Patching datahub policy for Airflow 2")
set_airflow2_policies(settings)


def _patch_datahub_policy():
with contextlib.suppress(ImportError):
import airflow_local_settings
Expand Down
16 changes: 4 additions & 12 deletions metadata-ingestion-modules/airflow-plugin/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
# and then run "tox" from this directory.

[tox]
envlist = py3-quick,py3-full,py3-airflow1
envlist = py3-quick,py3-full

[gh-actions]
python =
3.6: py3-full, py3-airflow1
3.9: py3-full, py3-airflow1
3.6: py3-full
3.9: py3-full

# Providing optional features that add dependencies from setup.py as deps here
# allows tox to recreate testenv when new dependencies are added to setup.py.
Expand All @@ -22,22 +22,14 @@ deps =
-e ../../metadata-ingestion/[.dev]
commands =
pytest --cov={envsitepackagesdir}/datahub --cov={envsitepackagesdir}/datahub_provider \
py3-quick,py3-airflow1: -m 'not integration and not slow_integration' --junit-xml=junit.quick.xml \
py3-quick: -m 'not integration and not slow_integration' --junit-xml=junit.quick.xml \
py3-full: --cov-fail-under 65 --junit-xml=junit.full.xml \
--continue-on-collection-errors \
-vv

setenv =
AIRFLOW_HOME = /tmp/airflow/thisshouldnotexist-{envname}

[testenv:py3-airflow1]
deps =
../../metadata-ingestion/[.dev]
-c ../../metadata-ingestion/tests/airflow1-constraints.txt

setenv =
AIRFLOW1_TEST = true

[testenv:py3-full]
deps =
../../metadata-ingestion/.[dev]
8 changes: 0 additions & 8 deletions metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ task testSingle(dependsOn: [installDevTest]) {
}
}

task installAirflow1(type: Exec, dependsOn: [install]) {
inputs.file file('setup.py')
outputs.dir("${venv_name}")
outputs.file("${venv_name}/.build_install_airflow_sentinel")
commandLine 'bash', '-x', '-c',
"${venv_name}/bin/pip install -e .[dev-airflow1] -c tests/airflow1-constraints.txt && touch ${venv_name}/.build_install_airflow_sentinel"
}

task testIntegration(type: Exec, dependsOn: [installDevTest]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && pytest --durations=50 -m 'integration' -vv --continue-on-collection-errors --junit-xml=junit.integration.xml"
Expand Down
38 changes: 0 additions & 38 deletions metadata-ingestion/scripts/airflow1-constraints.sh

This file was deleted.

Loading

0 comments on commit f61e8a9

Please sign in to comment.