Skip to content

Commit

Permalink
Updates the sample_snowflake_data_loader.py script to match the PG …
Browse files Browse the repository at this point in the history
…example (amundsen-io#266)
  • Loading branch information
dinedal authored May 15, 2020
1 parent 719a56a commit 944c71e
Showing 1 changed file with 100 additions and 4 deletions.
104 changes: 100 additions & 4 deletions databuilder/example/scripts/sample_snowflake_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import logging
import os
from pyhocon import ConfigFactory
from urllib import unquote_plus
from urllib.parse import unquote_plus
import uuid
import sys

from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
Expand All @@ -14,13 +16,19 @@
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from elasticsearch.client import Elasticsearch
from databuilder.transformer.base_transformer import NoopTransformer

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
# Disable snowflake logging
logging.getLogger("snowflake.connector.network").disabled = True

SNOWFLAKE_CONN_STRING = 'snowflake://username:%s@account' % unquote_plus('password')
SNOWFLAKE_DATABASE_KEY = 'YourSnowflakeDbName'

# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
Expand All @@ -31,6 +39,25 @@

IGNORED_SCHEMAS = ['\'DVCORE\'', '\'INFORMATION_SCHEMA\'', '\'STAGE_ORACLE\'']

es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]

es = Elasticsearch([
{'host': es_host if es_host else 'localhost'},
])


# todo: connection string needs to change
def connection_string():
user = 'username'
password = 'password'
account = 'YourSnowflakeAccountHere'
return "snowflake://%s:%s@%s" % (user, unquote_plus(password), account)


def create_sample_snowflake_job():

Expand All @@ -51,8 +78,8 @@ def create_sample_snowflake_job():
loader=csv_loader)

job_config = ConfigFactory.from_dict({
'extractor.snowflake.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): SNOWFLAKE_CONN_STRING,
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.DATABASE_KEY): 'YourSnowflakeDbName',
'extractor.snowflake.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(),
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.DATABASE_KEY): SNOWFLAKE_DATABASE_KEY,
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder,
Expand All @@ -71,6 +98,75 @@ def create_sample_snowflake_job():
return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
})

# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job


if __name__ == "__main__":
job = create_sample_snowflake_job()
job.launch()

job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()

0 comments on commit 944c71e

Please sign in to comment.