Skip to content

Commit

Permalink
chore: Add sample airflow dag for snowflake integration (#343)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao Feng <[email protected]>
  • Loading branch information
Alagappan authored Aug 23, 2020
1 parent 030ef49 commit 60c837d
Showing 1 changed file with 197 additions and 0 deletions.
197 changes: 197 additions & 0 deletions example/dags/snowflake_sample_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a sample Airflow DAG that demos extracting metadata from Snowflake and loading it into
Neo4j and Elasticsearch
"""

import textwrap
from datetime import datetime, timedelta
import uuid

from airflow import DAG # noqa
from airflow import macros # noqa
from airflow.operators.python_operator import PythonOperator # noqa
from pyhocon import ConfigFactory

from elasticsearch import Elasticsearch
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

dag_args = {
'concurrency': 10,
# One dagrun at a time
'max_active_runs': 1,
# 4AM, 4PM PST
'schedule_interval': '0 11 * * *',
'catchup': False
}

default_args = {
'owner': 'amundsen',
'start_date': datetime(2020, 8, 19),
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'priority_weight': 10,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=120)
}

# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://neo4j:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

es = Elasticsearch([
{'host': 'elasticsearch'},
])


# TODO: user provides a list of schema for indexing
SUPPORTED_SCHEMAS = ['public']
SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS))

# SNOWFLAKE CONFIGs
SNOWFLAKE_DATABASE_KEY = 'YOUR_SNOWFLAKE_DB_NAME'


# todo: connection string needs to change
def connection_string():
user = 'username'
password = 'password'
account = 'YourSnowflakeAccountHere'
return "snowflake://%s:%s@%s" % (user, password, account)


def create_snowflake_table_metadata_job():
"""
Launches databuilder job that extracts table and column metadata from Snowflake database and publishes
to Neo4j.
"""

where_clause_suffix = textwrap.dedent("""
WHERE c.TABLE_SCHEMA IN {schemas}
AND lower(c.COLUMN_NAME) not like 'dw_%';
""").format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE)

tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)

job_config = ConfigFactory.from_dict({
'extractor.snowflake.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
connection_string(),
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.SNOWFLAKE_DATABASE_KEY):
SNOWFLAKE_DATABASE_KEY,
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY):
where_clause_suffix,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'some_unique_tag' # TO-DO unique tag must be added
})

job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=SnowflakeMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()


def create_snowflake_es_publisher_job():
"""
Launches databuilder job that extracts data from Neo4J backend and pushes them as search documents
to Elasticsearch index
"""

# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'

job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY):
neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.table_elasticsearch_document.TableESDocument',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER):
neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW):
neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY):
'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY):
'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
job.launch()


with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:

snowflake_table_metadata_job = PythonOperator(
task_id='snowflake_table_metadata_extract_job',
python_callable=create_snowflake_table_metadata_job
)

snowflake_es_publisher_job = PythonOperator(
task_id='snowflake_es_publisher_job',
python_callable=create_snowflake_es_publisher_job
)

# elastic search update run after table metadata has been updated
snowflake_table_metadata_job >> snowflake_es_publisher_job

0 comments on commit 60c837d

Please sign in to comment.