This repository has been archived by the owner on Jan 27, 2025. It is now read-only.
forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs(ingest): add airflow docs that use the
PythonVirtualenvOperator
(
- Loading branch information
1 parent
3d0435a
commit ebd91f1
Showing
7 changed files
with
150 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 2 additions & 0 deletions
2
metadata-ingestion/src/datahub_provider/example_dags/.airflowignore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# This file uses a connection hook, which fails to load unless configured. | ||
snowflake_sample_dag.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,33 +5,22 @@ | |
embedded within the code. | ||
""" | ||
|
||
from datetime import timedelta | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
from airflow.utils.dates import days_ago | ||
|
||
from datahub.ingestion.run.pipeline import Pipeline | ||
|
||
default_args = { | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"email": ["[email protected]"], | ||
"email_on_failure": False, | ||
"email_on_retry": False, | ||
"retries": 1, | ||
"retry_delay": timedelta(minutes=5), | ||
"execution_timeout": timedelta(minutes=120), | ||
} | ||
from airflow.operators.python import PythonVirtualenvOperator | ||
|
||
|
||
def ingest_from_mysql(): | ||
from datahub.ingestion.run.pipeline import Pipeline | ||
|
||
pipeline = Pipeline.create( | ||
# This configuration is analogous to a recipe configuration. | ||
{ | ||
"source": { | ||
"type": "mysql", | ||
"config": { | ||
# If you want to use Airflow connections, take a look at the snowflake_sample_dag.py example. | ||
"username": "user", | ||
"password": "pass", | ||
"database": "db_name", | ||
|
@@ -45,18 +34,28 @@ def ingest_from_mysql(): | |
} | ||
) | ||
pipeline.run() | ||
pipeline.pretty_print_summary() | ||
pipeline.raise_from_status() | ||
|
||
|
||
with DAG( | ||
"datahub_mysql_ingest", | ||
default_args=default_args, | ||
default_args={ | ||
"owner": "airflow", | ||
}, | ||
description="An example DAG which ingests metadata from MySQL to DataHub", | ||
start_date=datetime(2022, 1, 1), | ||
schedule_interval=timedelta(days=1), | ||
start_date=days_ago(2), | ||
catchup=False, | ||
) as dag: | ||
ingest_task = PythonOperator( | ||
# While it is also possible to use the PythonOperator, we recommend using | ||
# the PythonVirtualenvOperator to ensure that there are no dependency | ||
# conflicts between DataHub and the rest of your Airflow environment. | ||
ingest_task = PythonVirtualenvOperator( | ||
task_id="ingest_from_mysql", | ||
requirements=[ | ||
"acryl-datahub[mysql]", | ||
], | ||
system_site_packages=False, | ||
python_callable=ingest_from_mysql, | ||
) |
87 changes: 87 additions & 0 deletions
87
metadata-ingestion/src/datahub_provider/example_dags/snowflake_sample_dag.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
"""Snowflake DataHub Ingest DAG | ||
This example demonstrates how to ingest metadata from Snowflake into DataHub | ||
from within an Airflow DAG. In contrast to the MySQL example, this DAG | ||
pulls the DB connection configuration from Airflow's connection store. | ||
""" | ||
|
||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.hooks.base import BaseHook | ||
from airflow.operators.python import PythonVirtualenvOperator | ||
|
||
|
||
def ingest_from_snowflake(snowflake_credentials, datahub_gms_server): | ||
from datahub.ingestion.run.pipeline import Pipeline | ||
|
||
pipeline = Pipeline.create( | ||
# This configuration is analogous to a recipe configuration. | ||
{ | ||
"source": { | ||
"type": "snowflake", | ||
"config": { | ||
**snowflake_credentials, | ||
# Other Snowflake config can be added here. | ||
"profiling": {"enabled": False}, | ||
}, | ||
}, | ||
# Other ingestion features, like transformers, are also supported. | ||
# "transformers": [ | ||
# { | ||
# "type": "simple_add_dataset_ownership", | ||
# "config": { | ||
# "owner_urns": [ | ||
# "urn:li:corpuser:example", | ||
# ] | ||
# }, | ||
# } | ||
# ], | ||
"sink": { | ||
"type": "datahub-rest", | ||
"config": {"server": datahub_gms_server}, | ||
}, | ||
} | ||
) | ||
pipeline.run() | ||
pipeline.pretty_print_summary() | ||
pipeline.raise_from_status() | ||
|
||
|
||
with DAG( | ||
"datahub_snowflake_ingest", | ||
default_args={ | ||
"owner": "airflow", | ||
}, | ||
description="An example DAG which ingests metadata from Snowflake to DataHub", | ||
start_date=datetime(2022, 1, 1), | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
) as dag: | ||
# This example pulls credentials from Airflow's connection store. | ||
# For this to work, you must have previously configured these connections in Airflow. | ||
# See the Airflow docs for details: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html | ||
snowflake_conn = BaseHook.get_connection("snowflake_admin_default") | ||
datahub_conn = BaseHook.get_connection("datahub_rest_default") | ||
|
||
# While it is also possible to use the PythonOperator, we recommend using | ||
# the PythonVirtualenvOperator to ensure that there are no dependency | ||
# conflicts between DataHub and the rest of your Airflow environment. | ||
ingest_task = PythonVirtualenvOperator( | ||
task_id="ingest_from_snowflake", | ||
requirements=[ | ||
"acryl-datahub[snowflake]", | ||
], | ||
system_site_packages=False, | ||
python_callable=ingest_from_snowflake, | ||
op_kwargs={ | ||
"snowflake_credentials": { | ||
"username": snowflake_conn.login, | ||
"password": snowflake_conn.password, | ||
"account_id": snowflake_conn.extra_dejson["account"], | ||
"warehouse": snowflake_conn.extra_dejson.get("warehouse"), | ||
"role": snowflake_conn.extra_dejson.get("role"), | ||
}, | ||
"datahub_gms_server": datahub_conn.host, | ||
}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters