From b9478953fdb61e5c55677b36e6b3f7c0e0a15e6f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 1 Dec 2022 19:24:53 -0500 Subject: [PATCH 1/3] docs(ingest): add airflow docs that use the `PythonVirtualenvOperator` --- metadata-ingestion/schedule_docs/airflow.md | 42 +++++++-- .../example_dags/generic_recipe_sample_dag.py | 1 + .../example_dags/mysql_sample_dag.py | 34 ++++---- .../example_dags/snowflake_sample_dag.py | 86 +++++++++++++++++++ 4 files changed, 139 insertions(+), 24 deletions(-) create mode 100644 metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py diff --git a/metadata-ingestion/schedule_docs/airflow.md b/metadata-ingestion/schedule_docs/airflow.md index 03a5930fea1368..e48710964b01c7 100644 --- a/metadata-ingestion/schedule_docs/airflow.md +++ b/metadata-ingestion/schedule_docs/airflow.md @@ -2,11 +2,41 @@ If you are using Apache Airflow for your scheduling then you might want to also use it for scheduling your ingestion recipes. For any Airflow specific questions you can go through [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/) for more details. -To schedule your recipe through Airflow you can follow these steps -- Create a recipe file e.g. `recipe.yml` -- Ensure the receipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`. -- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment -- Create a sample DAG file like [`generic_recipe_sample_dag.py`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py). This will read your DataHub ingestion recipe file and run it. +We've provided a few examples of how to configure your DAG: + +- [`mysql_sample_dag`](../src/datahub_provider/example_dags/mysql_sample_dag.py) embeds the full MySQL ingestion configuration inside the DAG. + +- [`snowflake_sample_dag`](../src/datahub_provider/example_dags/snowflake_sample_dag.py) avoids embedding credentials inside the recipe, and instead fetches them from Airflow's [Connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/index.html) feature. You must configure your connections in Airflow to use this approach. + +:::tip + +These example DAGs use the `PythonVirtualenvOperator` to run the ingestion. This is the recommended approach, since it guarantees that there will not be any conflicts between DataHub and the rest of your Airflow environment. + +When configuring the task, it's important to specify the requirements with your source and set the `system_site_packages` option to false. + +```py +ingestion_task = PythonVirtualenvOperator( + task_id="ingestion_task", + requirements=[ + "acryl-datahub[]", + ], + system_site_packages=False, + python_callable=your_callable, +) +``` + +::: + +
+Advanced: loading a recipe file + +In more advanced cases, you might want to store your ingestion recipe in a file and load it from your task. + +- Ensure the recipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`. +- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment. +- Create a DAG task to read your DataHub ingestion recipe file and run it. See the example below for reference. - Deploy the DAG file into airflow for scheduling. Typically this involves checking in the DAG file into your dags folder which is accessible to your Airflow instance. -Alternatively you can have an inline recipe as given in [`mysql_sample_dag.py`](../src/datahub_provider/example_dags/mysql_sample_dag.py). This runs a MySQL metadata ingestion pipeline using an inlined configuration. +Example: [`generic_recipe_sample_dag`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py) + +
diff --git a/metadata-ingestion/src/datahub_provider/example_dags/generic_recipe_sample_dag.py b/metadata-ingestion/src/datahub_provider/example_dags/generic_recipe_sample_dag.py index 289155a877b55e..d0e4aa944e8401 100644 --- a/metadata-ingestion/src/datahub_provider/example_dags/generic_recipe_sample_dag.py +++ b/metadata-ingestion/src/datahub_provider/example_dags/generic_recipe_sample_dag.py @@ -26,6 +26,7 @@ def datahub_recipe(): + # Note that this will also resolve environment variables in the recipe. config = load_config_file("path/to/recipe.yml") pipeline = Pipeline.create(config) diff --git a/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py b/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py index 35744ea367d353..7dc252595391df 100644 --- a/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py +++ b/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py @@ -8,30 +8,19 @@ from datetime import timedelta from airflow import DAG -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago - -from datahub.ingestion.run.pipeline import Pipeline - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "email": ["jdoe@example.com"], - "email_on_failure": False, - "email_on_retry": False, - "retries": 1, - "retry_delay": timedelta(minutes=5), - "execution_timeout": timedelta(minutes=120), -} +from airflow.operators.python import PythonVirtualenvOperator def ingest_from_mysql(): + from datahub.ingestion.run.pipeline import Pipeline + pipeline = Pipeline.create( # This configuration is analogous to a recipe configuration. { "source": { "type": "mysql", "config": { + # If you want to use Airflow connections, take a look at the snowflake_sample_dag.py example. "username": "user", "password": "pass", "database": "db_name", @@ -45,18 +34,27 @@ def ingest_from_mysql(): } ) pipeline.run() + pipeline.pretty_print_summary() pipeline.raise_from_status() with DAG( "datahub_mysql_ingest", - default_args=default_args, + default_args={ + "owner": "airflow", + }, description="An example DAG which ingests metadata from MySQL to DataHub", schedule_interval=timedelta(days=1), - start_date=days_ago(2), catchup=False, ) as dag: - ingest_task = PythonOperator( + # While it is also possible to use the PythonOperator, we recommend using + # the PythonVirtualenvOperator to ensure that there are no dependency + # conflicts between DataHub and the rest of your Airflow environment. + ingest_task = PythonVirtualenvOperator( task_id="ingest_from_mysql", + requirements=[ + "acryl-datahub[mysql]", + ], + system_site_packages=False, python_callable=ingest_from_mysql, ) diff --git a/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py b/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py new file mode 100644 index 00000000000000..d90f2199de2675 --- /dev/null +++ b/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py @@ -0,0 +1,86 @@ +"""Snowflake DataHub Ingest DAG + +This example demonstrates how to ingest metadata from Snowflake into DataHub +from within an Airflow DAG. In contrast to the MySQL example, this DAG +pulls the DB connection configuration from Airflow's connection store. +""" + +from datetime import timedelta + +from airflow import DAG +from airflow.hooks.base import BaseHook +from airflow.operators.python import PythonVirtualenvOperator + + +def ingest_from_snowflake(snowflake_credentials, datahub_gms_server): + from datahub.ingestion.run.pipeline import Pipeline + + pipeline = Pipeline.create( + # This configuration is analogous to a recipe configuration. + { + "source": { + "type": "snowflake", + "config": { + **snowflake_credentials, + # Other Snowflake config can be added here. + "profiling": {"enabled": False}, + }, + }, + # Other ingestion features, like transformers, are also supported. + # "transformers": [ + # { + # "type": "simple_add_dataset_ownership", + # "config": { + # "owner_urns": [ + # "urn:li:corpuser:example", + # ] + # }, + # } + # ], + "sink": { + "type": "datahub-rest", + "config": {"server": datahub_gms_server}, + }, + } + ) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status() + + +with DAG( + "datahub_snowflake_ingest", + default_args={ + "owner": "airflow", + }, + description="An example DAG which ingests metadata from Snowflake to DataHub", + schedule_interval=timedelta(days=1), + catchup=False, +) as dag: + # This example pulls credentials from Airflow's connection store. + # For this to work, you must have previously configured these connections in Airflow. + # See the Airflow docs for details: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html + snowflake_conn = BaseHook.get_connection("snowflake_admin_default") + datahub_conn = BaseHook.get_connection("datahub_rest_default") + + # While it is also possible to use the PythonOperator, we recommend using + # the PythonVirtualenvOperator to ensure that there are no dependency + # conflicts between DataHub and the rest of your Airflow environment. + ingest_task = PythonVirtualenvOperator( + task_id="ingest_from_snowflake", + requirements=[ + "acryl-datahub[snowflake]", + ], + system_site_packages=False, + python_callable=ingest_from_snowflake, + op_kwargs={ + "snowflake_credentials": { + "username": snowflake_conn.login, + "password": snowflake_conn.password, + "account_id": snowflake_conn.extra_dejson["account"], + "warehouse": snowflake_conn.extra_dejson.get("warehouse"), + "role": snowflake_conn.extra_dejson.get("role"), + }, + "datahub_gms_server": datahub_conn.host, + }, + ) From f1a2621f62ab3a63732425a7ccd6fd4b943ae830 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 1 Dec 2022 23:30:03 -0500 Subject: [PATCH 2/3] add start date param --- .../example_dags/mysql_sample_dag.py | 3 ++- .../example_dags/snowflake_sample_dag.py | 3 ++- metadata-ingestion/tests/unit/test_airflow.py | 12 +++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py b/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py index 7dc252595391df..2c833e14256342 100644 --- a/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py +++ b/metadata-ingestion/src/datahub_provider/example_dags/mysql_sample_dag.py @@ -5,7 +5,7 @@ embedded within the code. """ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonVirtualenvOperator @@ -44,6 +44,7 @@ def ingest_from_mysql(): "owner": "airflow", }, description="An example DAG which ingests metadata from MySQL to DataHub", + start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, ) as dag: diff --git a/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py b/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py index d90f2199de2675..c107bb479262cd 100644 --- a/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py +++ b/metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py @@ -5,7 +5,7 @@ pulls the DB connection configuration from Airflow's connection store. """ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.hooks.base import BaseHook @@ -54,6 +54,7 @@ def ingest_from_snowflake(snowflake_credentials, datahub_gms_server): "owner": "airflow", }, description="An example DAG which ingests metadata from Snowflake to DataHub", + start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, ) as dag: diff --git a/metadata-ingestion/tests/unit/test_airflow.py b/metadata-ingestion/tests/unit/test_airflow.py index 82e76f43b00c72..2248eba3fe0297 100644 --- a/metadata-ingestion/tests/unit/test_airflow.py +++ b/metadata-ingestion/tests/unit/test_airflow.py @@ -75,11 +75,7 @@ def test_airflow_provider_info(): assert get_provider_info() -@pytest.mark.skipif( - AIRFLOW_VERSION < packaging.version.parse("2.0.0"), - reason="the examples use list-style lineage, which is only supported on Airflow 2.x", -) -def test_dags_load_with_no_errors(pytestconfig): +def test_dags_load_with_no_errors(pytestconfig: pytest.Config) -> None: airflow_examples_folder = ( pytestconfig.rootpath / "src/datahub_provider/example_dags" ) @@ -88,8 +84,10 @@ def test_dags_load_with_no_errors(pytestconfig): import_errors = dag_bag.import_errors - assert import_errors == {} - assert len(dag_bag.dag_ids) > 0 + assert len(import_errors) == 1 + assert "snowflake_sample_dag" in list(import_errors.keys())[0] + + assert dag_bag.size() > 0 @contextmanager From a68fb008fc49feee005d1bd000ced840ce636afc Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 2 Dec 2022 00:10:11 -0500 Subject: [PATCH 3/3] fix tests? --- metadata-ingestion/setup.py | 3 ++- .../src/datahub_provider/example_dags/.airflowignore | 2 ++ metadata-ingestion/tests/unit/test_airflow.py | 5 ++--- 3 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 metadata-ingestion/src/datahub_provider/example_dags/.airflowignore diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 1a744a6fe328a6..c6a0f7401ffb88 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -465,8 +465,9 @@ def get_long_description(): dev_requirements = { *base_dev_requirements, + # Extra requirements for Airflow. "apache-airflow[snowflake]>=2.0.2", # snowflake is used in example dags - "snowflake-sqlalchemy<=1.2.4", # make constraint consistent with extras + "virtualenv", # needed by PythonVirtualenvOperator } full_test_dev_requirements = { diff --git a/metadata-ingestion/src/datahub_provider/example_dags/.airflowignore b/metadata-ingestion/src/datahub_provider/example_dags/.airflowignore new file mode 100644 index 00000000000000..decb9b559aea89 --- /dev/null +++ b/metadata-ingestion/src/datahub_provider/example_dags/.airflowignore @@ -0,0 +1,2 @@ +# This file uses a connection hook, which fails to load unless configured. +snowflake_sample_dag.py diff --git a/metadata-ingestion/tests/unit/test_airflow.py b/metadata-ingestion/tests/unit/test_airflow.py index 2248eba3fe0297..4d1b737e985fbe 100644 --- a/metadata-ingestion/tests/unit/test_airflow.py +++ b/metadata-ingestion/tests/unit/test_airflow.py @@ -80,13 +80,12 @@ def test_dags_load_with_no_errors(pytestconfig: pytest.Config) -> None: pytestconfig.rootpath / "src/datahub_provider/example_dags" ) + # Note: the .airflowignore file skips the snowflake DAG. dag_bag = DagBag(dag_folder=str(airflow_examples_folder), include_examples=False) import_errors = dag_bag.import_errors - assert len(import_errors) == 1 - assert "snowflake_sample_dag" in list(import_errors.keys())[0] - + assert len(import_errors) == 0 assert dag_bag.size() > 0