Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(ingest): add airflow docs that use the PythonVirtualenvOperator #6604

Merged
merged 3 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions metadata-ingestion/schedule_docs/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,41 @@

If you are using Apache Airflow for your scheduling then you might want to also use it for scheduling your ingestion recipes. For any Airflow specific questions you can go through [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/) for more details.

To schedule your recipe through Airflow you can follow these steps
- Create a recipe file e.g. `recipe.yml`
- Ensure the receipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`.
- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment
- Create a sample DAG file like [`generic_recipe_sample_dag.py`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py). This will read your DataHub ingestion recipe file and run it.
We've provided a few examples of how to configure your DAG:

- [`mysql_sample_dag`](../src/datahub_provider/example_dags/mysql_sample_dag.py) embeds the full MySQL ingestion configuration inside the DAG.

- [`snowflake_sample_dag`](../src/datahub_provider/example_dags/snowflake_sample_dag.py) avoids embedding credentials inside the recipe, and instead fetches them from Airflow's [Connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/index.html) feature. You must configure your connections in Airflow to use this approach.

:::tip

These example DAGs use the `PythonVirtualenvOperator` to run the ingestion. This is the recommended approach, since it guarantees that there will not be any conflicts between DataHub and the rest of your Airflow environment.

When configuring the task, it's important to specify the requirements with your source and set the `system_site_packages` option to false.

```py
ingestion_task = PythonVirtualenvOperator(
task_id="ingestion_task",
requirements=[
"acryl-datahub[<your-source>]",
],
system_site_packages=False,
python_callable=your_callable,
)
```

:::

<details>
<summary>Advanced: loading a recipe file</summary>

In more advanced cases, you might want to store your ingestion recipe in a file and load it from your task.

- Ensure the recipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`.
- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment.
- Create a DAG task to read your DataHub ingestion recipe file and run it. See the example below for reference.
- Deploy the DAG file into airflow for scheduling. Typically this involves checking in the DAG file into your dags folder which is accessible to your Airflow instance.

Alternatively you can have an inline recipe as given in [`mysql_sample_dag.py`](../src/datahub_provider/example_dags/mysql_sample_dag.py). This runs a MySQL metadata ingestion pipeline using an inlined configuration.
Example: [`generic_recipe_sample_dag`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py)

</details>
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,9 @@ def get_long_description():

dev_requirements = {
*base_dev_requirements,
# Extra requirements for Airflow.
"apache-airflow[snowflake]>=2.0.2", # snowflake is used in example dags
"snowflake-sqlalchemy<=1.2.4", # make constraint consistent with extras
"virtualenv", # needed by PythonVirtualenvOperator
}

full_test_dev_requirements = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This file uses a connection hook, which fails to load unless configured.
snowflake_sample_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@


def datahub_recipe():
# Note that this will also resolve environment variables in the recipe.
config = load_config_file("path/to/recipe.yml")

pipeline = Pipeline.create(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,22 @@
embedded within the code.
"""

from datetime import timedelta
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

from datahub.ingestion.run.pipeline import Pipeline

default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
from airflow.operators.python import PythonVirtualenvOperator


def ingest_from_mysql():
from datahub.ingestion.run.pipeline import Pipeline

pipeline = Pipeline.create(
# This configuration is analogous to a recipe configuration.
{
"source": {
"type": "mysql",
"config": {
# If you want to use Airflow connections, take a look at the snowflake_sample_dag.py example.
"username": "user",
"password": "pass",
"database": "db_name",
Expand All @@ -45,18 +34,28 @@ def ingest_from_mysql():
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()


with DAG(
"datahub_mysql_ingest",
default_args=default_args,
default_args={
"owner": "airflow",
},
description="An example DAG which ingests metadata from MySQL to DataHub",
start_date=datetime(2022, 1, 1),
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
catchup=False,
) as dag:
ingest_task = PythonOperator(
# While it is also possible to use the PythonOperator, we recommend using
# the PythonVirtualenvOperator to ensure that there are no dependency
# conflicts between DataHub and the rest of your Airflow environment.
ingest_task = PythonVirtualenvOperator(
task_id="ingest_from_mysql",
requirements=[
"acryl-datahub[mysql]",
],
system_site_packages=False,
python_callable=ingest_from_mysql,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Snowflake DataHub Ingest DAG

This example demonstrates how to ingest metadata from Snowflake into DataHub
from within an Airflow DAG. In contrast to the MySQL example, this DAG
pulls the DB connection configuration from Airflow's connection store.
"""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonVirtualenvOperator


def ingest_from_snowflake(snowflake_credentials, datahub_gms_server):
from datahub.ingestion.run.pipeline import Pipeline

pipeline = Pipeline.create(
# This configuration is analogous to a recipe configuration.
{
"source": {
"type": "snowflake",
"config": {
**snowflake_credentials,
# Other Snowflake config can be added here.
"profiling": {"enabled": False},
},
},
# Other ingestion features, like transformers, are also supported.
# "transformers": [
# {
# "type": "simple_add_dataset_ownership",
# "config": {
# "owner_urns": [
# "urn:li:corpuser:example",
# ]
# },
# }
# ],
"sink": {
"type": "datahub-rest",
"config": {"server": datahub_gms_server},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()


with DAG(
"datahub_snowflake_ingest",
default_args={
"owner": "airflow",
},
description="An example DAG which ingests metadata from Snowflake to DataHub",
start_date=datetime(2022, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# This example pulls credentials from Airflow's connection store.
# For this to work, you must have previously configured these connections in Airflow.
# See the Airflow docs for details: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
snowflake_conn = BaseHook.get_connection("snowflake_admin_default")
datahub_conn = BaseHook.get_connection("datahub_rest_default")

# While it is also possible to use the PythonOperator, we recommend using
# the PythonVirtualenvOperator to ensure that there are no dependency
# conflicts between DataHub and the rest of your Airflow environment.
ingest_task = PythonVirtualenvOperator(
task_id="ingest_from_snowflake",
requirements=[
"acryl-datahub[snowflake]",
],
system_site_packages=False,
python_callable=ingest_from_snowflake,
op_kwargs={
"snowflake_credentials": {
"username": snowflake_conn.login,
"password": snowflake_conn.password,
"account_id": snowflake_conn.extra_dejson["account"],
"warehouse": snowflake_conn.extra_dejson.get("warehouse"),
"role": snowflake_conn.extra_dejson.get("role"),
},
"datahub_gms_server": datahub_conn.host,
},
)
11 changes: 4 additions & 7 deletions metadata-ingestion/tests/unit/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,18 @@ def test_airflow_provider_info():
assert get_provider_info()


@pytest.mark.skipif(
AIRFLOW_VERSION < packaging.version.parse("2.0.0"),
reason="the examples use list-style lineage, which is only supported on Airflow 2.x",
)
def test_dags_load_with_no_errors(pytestconfig):
def test_dags_load_with_no_errors(pytestconfig: pytest.Config) -> None:
airflow_examples_folder = (
pytestconfig.rootpath / "src/datahub_provider/example_dags"
)

# Note: the .airflowignore file skips the snowflake DAG.
dag_bag = DagBag(dag_folder=str(airflow_examples_folder), include_examples=False)

import_errors = dag_bag.import_errors

assert import_errors == {}
assert len(dag_bag.dag_ids) > 0
assert len(import_errors) == 0
assert dag_bag.size() > 0


@contextmanager
Expand Down