From 53e18be2dd28f55deae03cf827e5a6cc1fe9ad0b Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Wed, 4 Nov 2020 08:44:37 +0100 Subject: [PATCH] improvement_readers_atlas | :tada: Initial commit. Signed-off-by: mgorsk1 --- docs/proxy/atlas_proxy.md | 2 +- metadata_service/proxy/atlas_proxy.py | 60 ++++++++++----------------- tests/unit/proxy/test_atlas_proxy.py | 22 +++++----- 3 files changed, 33 insertions(+), 51 deletions(-) diff --git a/docs/proxy/atlas_proxy.md b/docs/proxy/atlas_proxy.md index a54b7832..48d7b5d3 100644 --- a/docs/proxy/atlas_proxy.md +++ b/docs/proxy/atlas_proxy.md @@ -6,7 +6,7 @@ In order to make the Atlas-Amundsen integration smooth, we've developed a python Usage and Installation of `amundsenatlastypes` can be found [here](https://github.com/dwarszawski/amundsen-atlas-types/blob/master/README.md) Minimum Requirements: -- amundsenatlastypes==1.1.3 +- amundsenatlastypes==1.1.4 - pyatlasclient==1.0.4 ### Configurations diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index f571cfe5..20849460 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -4,6 +4,7 @@ import datetime import logging import re +from operator import attrgetter from random import randint from typing import Any, Dict, List, Union, Optional, Tuple @@ -454,6 +455,8 @@ def get_table(self, *, table_uri: str) -> Table: table_type = attrs.get('tableType') or 'table' is_view = 'view' in table_type.lower() + readers = self._get_readers(table_details) + table = Table( database=table_details.get('typeName'), cluster=table_qn.get('cluster_name', ''), @@ -466,7 +469,7 @@ def get_table(self, *, table_uri: str) -> Table: resource_reports=self._get_reports(guids=reports_guids), columns=columns, is_view=is_view, - table_readers=self._get_readers(attrs.get(self.QN_KEY)), + table_readers=readers, last_updated_timestamp=self._parse_date(table_details.get('updateTime')), programmatic_descriptions=programmatic_descriptions, watermarks=self._get_table_watermarks(table_details)) @@ -993,52 +996,33 @@ def _parse_date(self, date: int) -> Optional[int]: except Exception: return None - def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Reader]: - params = { - 'typeName': self.READER_TYPE, - 'offset': '0', - 'limit': top, - 'excludeDeletedEntities': True, - 'entityFilters': { - 'condition': 'AND', - 'criterion': [ - { - 'attributeName': self.QN_KEY, - 'operator': 'STARTSWITH', - 'attributeValue': qualified_name.split('@')[0] + '.' - }, - { - 'attributeName': 'count', - 'operator': 'gte', - 'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}' - } - ] - }, - 'attributes': ['count', self.QN_KEY], - 'sortBy': 'count', - 'sortOrder': 'DESCENDING' - } + def _get_readers(self, entity: EntityUniqueAttribute, top: Optional[int] = 15) -> List[Reader]: + _readers = entity.get('relationshipAttributes', dict()).get('readers', list()) - search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False) + guids = [_reader.get('guid') for _reader in _readers + if _reader.get('entityStatus', 'INACTIVE') == Status.ACTIVE + and _reader.get('relationshipStatus', 'INACTIVE') == Status.ACTIVE] - readers = [] + if not guids: + return [] - for record in search_results.entities: - readers.append(record.guid) + readers = extract_entities(self._driver.entity_bulk(guid=guids, ignoreRelationships=False)) - results = [] + _result = [] - if readers: - read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False)) + for _reader in readers: + read_count = _reader.attributes['count'] - for read_entity in read_entities: - reader_qn = read_entity.relationshipAttributes['user']['displayText'] + if read_count >= int(app.config['POPULAR_TABLE_MINIMUM_READER_COUNT']): + reader_qn = _reader.relationshipAttributes['user']['displayText'] reader_details = self._get_user_details(reader_qn) - reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count']) + reader = Reader(user=User(**reader_details), read_count=read_count) - results.append(reader) + _result.append(reader) - return results + result = sorted(_result, key=attrgetter('read_count'), reverse=True)[:top] + + return result def _get_programmatic_descriptions(self, parameters: dict) -> List[ProgrammaticDescription]: programmatic_descriptions: Dict[str, ProgrammaticDescription] = {} diff --git a/tests/unit/proxy/test_atlas_proxy.py b/tests/unit/proxy/test_atlas_proxy.py index c386d33c..96f1d980 100644 --- a/tests/unit/proxy/test_atlas_proxy.py +++ b/tests/unit/proxy/test_atlas_proxy.py @@ -3,11 +3,11 @@ import copy import unittest -from typing import Any, Dict, Optional, cast, List +from typing import Any, Dict, Optional, cast from amundsen_common.models.popular_table import PopularTable from amundsen_common.models.table import Column, Stat, Table, Tag, User, Reader,\ - ProgrammaticDescription, ResourceReport + ProgrammaticDescription from atlasclient.exceptions import BadRequest from unittest.mock import MagicMock, patch from tests.unit.proxy.fixtures.atlas_test_data import Data, DottedDict @@ -24,6 +24,7 @@ def setUp(self) -> None: self.app = create_app(config_module_class='metadata_service.config.LocalConfig') self.app.config['PROGRAMMATIC_DESCRIPTIONS_EXCLUDE_FILTERS'] = ['spark.*'] self.app.config['WATERMARK_DATE_FORMATS'] = '' + self.app.config['POPULAR_TABLE_MINIMUM_READER_COUNT'] = 0 self.app_context = self.app.app_context() self.app_context.push() @@ -113,8 +114,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None: ent_attrs = cast(dict, self.entity1['attributes']) self._mock_get_table_entity() self._create_mocked_report_entities_collection() - self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore - self.proxy._driver.entity_bulk = MagicMock(return_value=self.report_entity_collection) + self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore response = self.proxy.get_table(table_uri=self.table_uri) classif_name = self.classification_entity['classifications'][0]['typeName'] @@ -145,8 +145,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None: tags=[Tag(tag_name=classif_name, tag_type="default")], description=ent_attrs['description'], owners=[User(email=ent_attrs['owner'])], - resource_reports=[ResourceReport(name='test_report', url='http://test'), - ResourceReport(name='test_report3', url='http://test3')], + resource_reports=[], last_updated_timestamp=int(str(self.entity1['updateTime'])[:10]), columns=[exp_col] * self.active_columns, watermarks=[], @@ -425,14 +424,13 @@ def test_get_readers(self) -> None: entity_bulk_result.entities = self.reader_entities self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result]) - res = self.proxy._get_readers('dummy', 1) + res = self.proxy._get_readers(dict(relationshipAttributes=dict(readers=[dict(guid=1, entityStatus='ACTIVE', + relationshipStatus='ACTIVE')])), + 1) - expected: List[Reader] = [] + expected = [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)] - expected += [Reader(user=User(email='test_user_1', user_id='test_user_1'), read_count=5)] - expected += [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)] - - self.assertEqual(res, expected) + self.assertEqual(expected, res) def test_get_frequently_used_tables(self) -> None: entity_unique_attribute_result = MagicMock()