Skip to content

Commit

Permalink
improvement_readers_atlas | 🎉 Initial commit.
Browse files Browse the repository at this point in the history
Signed-off-by: mgorsk1 <[email protected]>
  • Loading branch information
mgorsk1 committed Oct 28, 2020
1 parent 3c9a55e commit 7958e8a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 45 deletions.
2 changes: 1 addition & 1 deletion docs/proxy/atlas_proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ In order to make the Atlas-Amundsen integration smooth, we've developed a python
Usage and Installation of `amundsenatlastypes` can be found [here](https://github.com/dwarszawski/amundsen-atlas-types/blob/master/README.md)

Minimum Requirements:
- amundsenatlastypes==1.1.3
- amundsenatlastypes==1.1.4
- pyatlasclient==1.0.4

### Configurations
Expand Down
59 changes: 19 additions & 40 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import logging
import re
from operator import attrgetter
from random import randint
from typing import Any, Dict, List, Union, Optional, Tuple

Expand Down Expand Up @@ -454,6 +455,12 @@ def get_table(self, *, table_uri: str) -> Table:
table_type = attrs.get('tableType') or 'table'
is_view = 'view' in table_type.lower()

readers = self._get_readers([_reader.get('guid') for _reader in
table_details.get('relationshipAttributes', dict()).get('readers', list())
if _reader.get('entityStatus', 'INACTIVE') == Status.ACTIVE
and _reader.get('relationshipStatus', 'INACTIVE') == Status.ACTIVE
])

table = Table(
database=table_details.get('typeName'),
cluster=table_qn.get('cluster_name', ''),
Expand All @@ -466,7 +473,7 @@ def get_table(self, *, table_uri: str) -> Table:
resource_reports=self._get_reports(guids=reports_guids),
columns=columns,
is_view=is_view,
table_readers=self._get_readers(attrs.get(self.QN_KEY)),
table_readers=readers,
last_updated_timestamp=self._parse_date(table_details.get('updateTime')),
programmatic_descriptions=programmatic_descriptions,
watermarks=self._get_table_watermarks(table_details))
Expand Down Expand Up @@ -1001,50 +1008,22 @@ def _parse_date(self, date: int) -> Optional[int]:
except Exception:
return None

def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Reader]:
params = {
'typeName': self.READER_TYPE,
'offset': '0',
'limit': top,
'excludeDeletedEntities': True,
'entityFilters': {
'condition': 'AND',
'criterion': [
{
'attributeName': self.QN_KEY,
'operator': 'STARTSWITH',
'attributeValue': qualified_name.split('@')[0] + '.'
},
{
'attributeName': 'count',
'operator': 'gte',
'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}'
}
]
},
'attributes': ['count', self.QN_KEY],
'sortBy': 'count',
'sortOrder': 'DESCENDING'
}

search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False)
def _get_readers(self, guids: List[str], top: Optional[int] = 15) -> List[Reader]:
_results = []

readers = []
readers = extract_entities(self._driver.entity_bulk(guid=guids, ignoreRelationships=False))

for record in search_results.entities:
readers.append(record.guid)

results = []
for _reader in readers:
read_count = _reader.attributes['count']

if readers:
read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))

for read_entity in read_entities:
reader_qn = read_entity.relationshipAttributes['user']['displayText']
if read_count >= int(app.config['POPULAR_TABLE_MINIMUM_READER_COUNT']):
reader_qn = _reader.relationshipAttributes['user']['displayText']
reader_details = self._get_user_details(reader_qn)
reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count'])
reader = Reader(user=User(**reader_details), read_count=read_count)

_results.append(reader)

results.append(reader)
results = sorted(_results, key=attrgetter('read_count'), reverse=True)[:top]

return results

Expand Down
6 changes: 2 additions & 4 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TestAtlasProxy(unittest.TestCase, Data):
def setUp(self) -> None:
self.app = create_app(config_module_class='metadata_service.config.LocalConfig')
self.app.config['PROGRAMMATIC_DESCRIPTIONS_EXCLUDE_FILTERS'] = ['spark.*']
self.app.config['POPULAR_TABLE_MINIMUM_READER_COUNT'] = 0
self.app_context = self.app.app_context()
self.app_context.push()

Expand Down Expand Up @@ -113,7 +114,6 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
self._mock_get_table_entity()
self._create_mocked_report_entities_collection()
self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore
self.proxy._driver.entity_bulk = MagicMock(return_value=self.report_entity_collection)
response = self.proxy.get_table(table_uri=self.table_uri)

classif_name = self.classification_entity['classifications'][0]['typeName']
Expand Down Expand Up @@ -143,8 +143,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
tags=[Tag(tag_name=classif_name, tag_type="default")],
description=ent_attrs['description'],
owners=[User(email=ent_attrs['owner'])],
resource_reports=[ResourceReport(name='test_report', url='http://test'),
ResourceReport(name='test_report3', url='http://test3')],
resource_reports=[],
last_updated_timestamp=int(str(self.entity1['updateTime'])[:10]),
columns=[exp_col] * self.active_columns,
programmatic_descriptions=[ProgrammaticDescription(source='test parameter key a',
Expand Down Expand Up @@ -426,7 +425,6 @@ def test_get_readers(self) -> None:

expected: List[Reader] = []

expected += [Reader(user=User(email='test_user_1', user_id='test_user_1'), read_count=5)]
expected += [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)]

self.assertEqual(res, expected)
Expand Down

0 comments on commit 7958e8a

Please sign in to comment.