Skip to content

Commit

Permalink
atlas_search_extractor | 🎉 Initial commit.
Browse files Browse the repository at this point in the history
Signed-off-by: mgorsk1 <[email protected]>
  • Loading branch information
mgorsk1 committed Dec 9, 2020
1 parent 2343a90 commit e7e3552
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 0 deletions.
271 changes: 271 additions & 0 deletions databuilder/extractor/atlas_search_data_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import importlib
import logging
import multiprocessing.pool
from copy import deepcopy
from functools import reduce
from typing import Iterator, Optional, List, Tuple, Any, Dict, Generator

from atlasclient.client import Atlas
from atlasclient.utils import extract_entities
from pyhocon import ConfigTree, ConfigFactory

from databuilder.extractor.base_extractor import Extractor

LOGGER = logging.getLogger(__name__)

# custom types
type_fields_mapping_spec = Dict[str, List[Tuple[str, str, Any, Any]]]
type_fields_mapping = List[Tuple[str, str, Any, Any]]


# @todo document classes/methods
# @todo write tests


class AtlasSearchDataExtractor(Extractor):
ATLAS_URL_CONFIG_KEY = 'atlas_url'
ATLAS_PORT_CONFIG_KEY = 'atlas_port'
ATLAS_PROTOCOL_CONFIG_KEY = 'atlas_protocol'
ATLAS_VALIDATE_SSL_CONFIG_KEY = 'atlas_validate_ssl'
ATLAS_USERNAME_CONFIG_KEY = 'atlas_auth_user'
ATLAS_PASSWORD_CONFIG_KEY = 'atlas_auth_pw'
ATLAS_SEARCH_CHUNK_SIZE_KEY = 'atlas_search_chunk_size'
ATLAS_DETAILS_CHUNK_SIZE_KEY = 'atlas_details_chunk_size'
ATLAS_TIMEOUT_SECONDS_KEY = 'atlas_timeout_seconds'
ATLAS_MAX_RETRIES_KEY = 'atlas_max_retries'

PROCESS_POOL_SIZE_KEY = 'process_pool_size'

ENTITY_TYPE_KEY = 'entity_type'

DEFAULT_QUERY_PARAMS_BY_ENTITY = {
'Table': {
'typeName': 'Table',
'excludeDeletedEntities': True,
'query': '*'
}
}

DEFAULT_CONFIG = ConfigFactory.from_dict({ATLAS_URL_CONFIG_KEY: "localhost",
ATLAS_PORT_CONFIG_KEY: 21000,
ATLAS_PROTOCOL_CONFIG_KEY: 'http',
ATLAS_VALIDATE_SSL_CONFIG_KEY: False,
ATLAS_SEARCH_CHUNK_SIZE_KEY: 250,
ATLAS_DETAILS_CHUNK_SIZE_KEY: 25,
ATLAS_TIMEOUT_SECONDS_KEY: 120,
ATLAS_MAX_RETRIES_KEY: 2,
PROCESS_POOL_SIZE_KEY: 10})

# @todo fill out below fields for TableESDocument
# tags: List[str],

# es_document field, atlas field path, modification function, default_value
FIELDS_MAPPING_SPEC: type_fields_mapping_spec = {
'Table': [
('database', 'typeName', None, None),
('cluster', 'attributes.qualifiedName', lambda x: x.split('@')[-1], None),
('schema', 'relationshipAttributes.db.displayText', None, None),
('name', 'attributes.name', None, None),
('key', 'attributes.qualifiedName', None, None),
('description', 'attributes.description', None, None),
('last_updated_timestamp', 'updateTime', lambda x: int(x) / 1000, 0),
('total_usage', 'attributes.popularityScore', lambda x: int(x), 0),
('unique_usage', 'attributes.uniqueUsage', lambda x: int(x), 1),
('column_names', 'relationshipAttributes.columns', lambda x: AtlasSearchDataExtractor._filter_none(
[c.get('attributes').get('name') for c in x if c.get('status').lower() == 'active']), []),
('column_descriptions', 'relationshipAttributes.columns',
lambda x: AtlasSearchDataExtractor._filter_none(
[c.get('attributes').get('description') for c in x if c.get('status').lower() == 'active']), []),
('tags', 'tags', None, []),
('badges', 'classifications',
lambda x: AtlasSearchDataExtractor._filter_none(
[c.get('typeName') for c in x if c.get('entityStatus', '').lower() == 'active']), []),
('display_name', 'attributes.qualifiedName', lambda x: x.split('@')[0], None),
('schema_description', 'attributes.parameters.sourceDescription', None, None),
(
'programmatic_descriptions', 'attributes.parameters', lambda x: [str(s) for s in list(x.values())],
{})
]
}

ENTITY_MODEL_BY_TYPE = {
'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument'
}

REQUIRED_RELATIONSHIPS_BY_TYPE = {
'Table': ['columns']
}

def init(self, conf: ConfigTree) -> None:
self.conf = conf.with_fallback(AtlasSearchDataExtractor.DEFAULT_CONFIG)
self.driver = self._get_driver()

self._extract_iter: Optional[Iterator[Any]] = None

@property
def entity_type(self) -> str:
return self.conf.get(AtlasSearchDataExtractor.ENTITY_TYPE_KEY)

@property
def search_query(self) -> Dict:
return AtlasSearchDataExtractor.DEFAULT_QUERY_PARAMS_BY_ENTITY.get(self.entity_type) or {}

@property
def model_class(self) -> Any:
model_class = AtlasSearchDataExtractor.ENTITY_MODEL_BY_TYPE.get(self.entity_type)

if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)

return getattr(mod, class_name)

@property
def field_mappings(self) -> type_fields_mapping:
return AtlasSearchDataExtractor.FIELDS_MAPPING_SPEC.get(self.entity_type) or []

def extract(self) -> Any:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()

try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
return 'extractor.atlas_search_data'

@staticmethod
def _filter_none(input_list: List) -> List:
return list(filter(None, input_list))

def _get_driver(self) -> Any:
return Atlas(host=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY),
port=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY),
username=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY),
password=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY),
protocol=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY),
validate_ssl=self.conf.get_bool(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY),
timeout=self.conf.get_int(AtlasSearchDataExtractor.ATLAS_TIMEOUT_SECONDS_KEY),
max_retries=self.conf.get_int(AtlasSearchDataExtractor.ATLAS_MAX_RETRIES_KEY))

@staticmethod
def split_list_to_chunks(input_list: List[Any], n: int) -> Generator:
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(input_list), n):
yield input_list[i:i + n]

def _execute_query(self, params: Dict, relationships: Optional[List[str]] = None) -> Any:
def get_details(guid_list: List[str]) -> List:
result = []

LOGGER.info(f'Processing guids chunk of size: {len(guid_list)}')

try:
bulk_collection = self.driver.entity_bulk(guid=guid_list)

for collection in bulk_collection:
search_chunk = list(collection.entities_with_relationships(attributes=relationships))

result += search_chunk

return result
except Exception:
return []

def get_guids(start_offset: int) -> List[str]:
result = []

batch_start = start_offset
batch_end = start_offset + search_chunk_size

LOGGER.info(f'Collecting guids for batch: {batch_start}-{batch_end}')

_params = {'offset': str(batch_start), 'limit': str(search_chunk_size)}

full_params = deepcopy(params)
full_params.update(**_params)

try:
results = self.driver.search_basic(**full_params)

search_chunk = extract_entities(results)

_guids = [table.guid for table in search_chunk]

result += _guids

return result
except Exception:
LOGGER.warning(f'Error processing batch: {batch_start}-{batch_end}', exc_info=True)

return []

search_chunk_size = self.conf.get_int(AtlasSearchDataExtractor.ATLAS_SEARCH_CHUNK_SIZE_KEY)
details_chunk_size = self.conf.get_int(AtlasSearchDataExtractor.ATLAS_DETAILS_CHUNK_SIZE_KEY)
process_pool_size = self.conf.get_int(AtlasSearchDataExtractor.PROCESS_POOL_SIZE_KEY)

guids = []

try:
# Fetch the table entities based on query terms
search_results = self.driver.search_basic.create(data=params)

count = search_results._data.get("approximateCount")
except Exception as e:
count = 0

LOGGER.info(f'Received count: {count}')

if count > 0:
offsets = [i * search_chunk_size for i in range(int(count / search_chunk_size) + 1)]
else:
offsets = []

with multiprocessing.pool.ThreadPool(processes=process_pool_size) as pool:
guid_list = pool.map(get_guids, offsets, chunksize=1)

for sub_list in guid_list:
guids += sub_list

LOGGER.info(f'Received: {len(guids)} guids')

if guids:
guids_chunks = AtlasSearchDataExtractor.split_list_to_chunks(guids, details_chunk_size)

with multiprocessing.pool.ThreadPool(processes=process_pool_size) as pool:
return_list = pool.map(get_details, guids_chunks)

for sub_list in return_list:
for entry in sub_list:
yield entry

def _get_extract_iter(self) -> Iterator[Any]:
relationships = AtlasSearchDataExtractor.REQUIRED_RELATIONSHIPS_BY_TYPE.get(self.entity_type)

for atlas_entity in self._execute_query(self.search_query, relationships=relationships):
model_dict = dict()

try:
data = atlas_entity.__dict__['_data']

for spec in self.field_mappings:
model_field, atlas_field_path, _transform_spec, default_value = spec

atlas_value = reduce(lambda x, y: x.get(y, dict()), atlas_field_path.split('.'),
data) or default_value

transform_spec = _transform_spec or (lambda x: x)

es_entity_value = transform_spec(atlas_value)
model_dict[model_field] = es_entity_value

result = self.model_class(**model_dict)

yield result
except Exception:
LOGGER.warning(f'Error building model object.', exc_info=True)
88 changes: 88 additions & 0 deletions example/scripts/sample_atlas_search_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory

from databuilder.extractor.atlas_search_data_extractor import AtlasSearchDataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

entity_type = 'Table'
extracted_search_data_path = '/tmp/search_data.json'
process_pool_size = 5

# atlas config
atlas_url = 'localhost'
atlas_port = 21000
atlas_protocol = 'http'
atlas_verify_ssl = False
atlas_username = 'admin'
atlas_password = 'admin'
atlas_search_chunk_size = 200
atlas_details_chunk_size = 10

# elastic config
es = Elasticsearch([
{'host': 'localhost'},
])

elasticsearch_client = es
elasticsearch_new_index_key = 'tables-' + str(uuid.uuid4())
elasticsearch_new_index_key_type = 'table'
elasticsearch_index_alias = 'table_search_index'

job_config = ConfigFactory.from_dict({
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY):
atlas_url,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY):
atlas_port,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY):
atlas_protocol,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY):
atlas_verify_ssl,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY):
atlas_username,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY):
atlas_password,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_SEARCH_CHUNK_SIZE_KEY):
atlas_search_chunk_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_DETAILS_CHUNK_SIZE_KEY):
atlas_details_chunk_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.PROCESS_POOL_SIZE_KEY):
process_pool_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ENTITY_TYPE_KEY):
entity_type,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY):
'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY):
'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})

if __name__ == "__main__":
task = DefaultTask(extractor=AtlasSearchDataExtractor(),
transformer=NoopTransformer(),
loader=FSElasticsearchJSONLoader())

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())

job.launch()
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ pandas>=0.21.0,<1.2.0

requests==2.23.0,<3.0
responses==0.10.6

pyatlasclient==1.1.2
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
'feast==0.8.0'
]

atlas = [
'pyatlasclient==1.1.2'
]

all_deps = requirements + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark + feast

Expand Down Expand Up @@ -93,6 +97,7 @@
'druid': druid,
'delta-lake': spark,
'feast': feast,
'atlas': atlas
},
classifiers=[
'Programming Language :: Python :: 3.6',
Expand Down

0 comments on commit e7e3552

Please sign in to comment.