Skip to content

Commit

Permalink
improvement_readers_atlas | 🎉 Initial commit. (#218)
Browse files Browse the repository at this point in the history
Signed-off-by: mgorsk1 <[email protected]>
  • Loading branch information
mgorsk1 authored Nov 4, 2020
1 parent cc3768f commit 62c65fe
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/proxy/atlas_proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ In order to make the Atlas-Amundsen integration smooth, we've developed a python
Usage and Installation of `amundsenatlastypes` can be found [here](https://github.com/dwarszawski/amundsen-atlas-types/blob/master/README.md)

Minimum Requirements:
- amundsenatlastypes==1.1.3
- amundsenatlastypes==1.1.4
- pyatlasclient==1.0.4

### Configurations
Expand Down
60 changes: 22 additions & 38 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import logging
import re
from operator import attrgetter
from random import randint
from typing import Any, Dict, List, Union, Optional, Tuple

Expand Down Expand Up @@ -454,6 +455,8 @@ def get_table(self, *, table_uri: str) -> Table:
table_type = attrs.get('tableType') or 'table'
is_view = 'view' in table_type.lower()

readers = self._get_readers(table_details)

table = Table(
database=table_details.get('typeName'),
cluster=table_qn.get('cluster_name', ''),
Expand All @@ -466,7 +469,7 @@ def get_table(self, *, table_uri: str) -> Table:
resource_reports=self._get_reports(guids=reports_guids),
columns=columns,
is_view=is_view,
table_readers=self._get_readers(attrs.get(self.QN_KEY)),
table_readers=readers,
last_updated_timestamp=self._parse_date(table_details.get('updateTime')),
programmatic_descriptions=programmatic_descriptions,
watermarks=self._get_table_watermarks(table_details))
Expand Down Expand Up @@ -993,52 +996,33 @@ def _parse_date(self, date: int) -> Optional[int]:
except Exception:
return None

def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Reader]:
params = {
'typeName': self.READER_TYPE,
'offset': '0',
'limit': top,
'excludeDeletedEntities': True,
'entityFilters': {
'condition': 'AND',
'criterion': [
{
'attributeName': self.QN_KEY,
'operator': 'STARTSWITH',
'attributeValue': qualified_name.split('@')[0] + '.'
},
{
'attributeName': 'count',
'operator': 'gte',
'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}'
}
]
},
'attributes': ['count', self.QN_KEY],
'sortBy': 'count',
'sortOrder': 'DESCENDING'
}
def _get_readers(self, entity: EntityUniqueAttribute, top: Optional[int] = 15) -> List[Reader]:
_readers = entity.get('relationshipAttributes', dict()).get('readers', list())

search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False)
guids = [_reader.get('guid') for _reader in _readers
if _reader.get('entityStatus', 'INACTIVE') == Status.ACTIVE
and _reader.get('relationshipStatus', 'INACTIVE') == Status.ACTIVE]

readers = []
if not guids:
return []

for record in search_results.entities:
readers.append(record.guid)
readers = extract_entities(self._driver.entity_bulk(guid=guids, ignoreRelationships=False))

results = []
_result = []

if readers:
read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))
for _reader in readers:
read_count = _reader.attributes['count']

for read_entity in read_entities:
reader_qn = read_entity.relationshipAttributes['user']['displayText']
if read_count >= int(app.config['POPULAR_TABLE_MINIMUM_READER_COUNT']):
reader_qn = _reader.relationshipAttributes['user']['displayText']
reader_details = self._get_user_details(reader_qn)
reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count'])
reader = Reader(user=User(**reader_details), read_count=read_count)

results.append(reader)
_result.append(reader)

return results
result = sorted(_result, key=attrgetter('read_count'), reverse=True)[:top]

return result

def _get_programmatic_descriptions(self, parameters: dict) -> List[ProgrammaticDescription]:
programmatic_descriptions: Dict[str, ProgrammaticDescription] = {}
Expand Down
22 changes: 10 additions & 12 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

import copy
import unittest
from typing import Any, Dict, Optional, cast, List
from typing import Any, Dict, Optional, cast

from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Stat, Table, Tag, User, Reader,\
ProgrammaticDescription, ResourceReport
ProgrammaticDescription
from atlasclient.exceptions import BadRequest
from unittest.mock import MagicMock, patch
from tests.unit.proxy.fixtures.atlas_test_data import Data, DottedDict
Expand All @@ -24,6 +24,7 @@ def setUp(self) -> None:
self.app = create_app(config_module_class='metadata_service.config.LocalConfig')
self.app.config['PROGRAMMATIC_DESCRIPTIONS_EXCLUDE_FILTERS'] = ['spark.*']
self.app.config['WATERMARK_DATE_FORMATS'] = ''
self.app.config['POPULAR_TABLE_MINIMUM_READER_COUNT'] = 0
self.app_context = self.app.app_context()
self.app_context.push()

Expand Down Expand Up @@ -113,8 +114,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
ent_attrs = cast(dict, self.entity1['attributes'])
self._mock_get_table_entity()
self._create_mocked_report_entities_collection()
self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore
self.proxy._driver.entity_bulk = MagicMock(return_value=self.report_entity_collection)
self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore
response = self.proxy.get_table(table_uri=self.table_uri)

classif_name = self.classification_entity['classifications'][0]['typeName']
Expand Down Expand Up @@ -145,8 +145,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
tags=[Tag(tag_name=classif_name, tag_type="default")],
description=ent_attrs['description'],
owners=[User(email=ent_attrs['owner'])],
resource_reports=[ResourceReport(name='test_report', url='http://test'),
ResourceReport(name='test_report3', url='http://test3')],
resource_reports=[],
last_updated_timestamp=int(str(self.entity1['updateTime'])[:10]),
columns=[exp_col] * self.active_columns,
watermarks=[],
Expand Down Expand Up @@ -425,14 +424,13 @@ def test_get_readers(self) -> None:
entity_bulk_result.entities = self.reader_entities
self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result])

res = self.proxy._get_readers('dummy', 1)
res = self.proxy._get_readers(dict(relationshipAttributes=dict(readers=[dict(guid=1, entityStatus='ACTIVE',
relationshipStatus='ACTIVE')])),
1)

expected: List[Reader] = []
expected = [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)]

expected += [Reader(user=User(email='test_user_1', user_id='test_user_1'), read_count=5)]
expected += [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)]

self.assertEqual(res, expected)
self.assertEqual(expected, res)

def test_get_frequently_used_tables(self) -> None:
entity_unique_attribute_result = MagicMock()
Expand Down

0 comments on commit 62c65fe

Please sign in to comment.