Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample Airflow DAG for Snowflake integration #343

Merged
merged 1 commit into from
Aug 23, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions example/dags/snowflake_sample_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a sample Airflow DAG that demos extracting metadata from Snowflake and loading it into
Neo4j and Elasticsearch
"""

import textwrap
from datetime import datetime, timedelta
import uuid

from airflow import DAG # noqa
from airflow import macros # noqa
from airflow.operators.python_operator import PythonOperator # noqa
from pyhocon import ConfigFactory

from elasticsearch import Elasticsearch
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

dag_args = {
'concurrency': 10,
# One dagrun at a time
'max_active_runs': 1,
# 4AM, 4PM PST
'schedule_interval': '0 11 * * *',
'catchup': False
}

default_args = {
'owner': 'amundsen',
'start_date': datetime(2020, 8, 19),
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'priority_weight': 10,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=120)
}

# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://neo4j:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

es = Elasticsearch([
{'host': 'elasticsearch'},
])


# TODO: user provides a list of schema for indexing
SUPPORTED_SCHEMAS = ['public']
SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS))

# SNOWFLAKE CONFIGs
SNOWFLAKE_DATABASE_KEY = 'YOUR_SNOWFLAKE_DB_NAME'


# todo: connection string needs to change
def connection_string():
user = 'username'
password = 'password'
account = 'YourSnowflakeAccountHere'
return "snowflake://%s:%s@%s" % (user, password, account)


def create_snowflake_table_metadata_job():
"""
Launches databuilder job that extracts table and column metadata from Snowflake database and publishes
to Neo4j.
"""

where_clause_suffix = textwrap.dedent("""
WHERE c.TABLE_SCHEMA IN {schemas}
AND lower(c.COLUMN_NAME) not like 'dw_%';
""").format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE)

tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)

job_config = ConfigFactory.from_dict({
'extractor.snowflake.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
connection_string(),
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.SNOWFLAKE_DATABASE_KEY):
SNOWFLAKE_DATABASE_KEY,
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY):
where_clause_suffix,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'some_unique_tag' # TO-DO unique tag must be added
})

job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=SnowflakeMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()


def create_snowflake_es_publisher_job():
"""
Launches databuilder job that extracts data from Neo4J backend and pushes them as search documents
to Elasticsearch index
"""

# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'

job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY):
neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.table_elasticsearch_document.TableESDocument',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER):
neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW):
neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY):
'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY):
'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
job.launch()


with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:

snowflake_table_metadata_job = PythonOperator(
task_id='snowflake_table_metadata_extract_job',
python_callable=create_snowflake_table_metadata_job
)

snowflake_es_publisher_job = PythonOperator(
task_id='snowflake_es_publisher_job',
python_callable=create_snowflake_es_publisher_job
)

# elastic search update run after table metadata has been updated
snowflake_table_metadata_job >> snowflake_es_publisher_job