Skip to content

Commit

Permalink
fix: Improvements to the Owned By feature (#178)
Browse files Browse the repository at this point in the history
* Improvements to the Owned By feature

Signed-off-by: verdan <[email protected]>

* Fixes the mypy errors and code refactoring

Signed-off-by: verdan <[email protected]>

* Code Review changes

Signed-off-by: verdan <[email protected]>

* Code Review Changes

Signed-off-by: verdan <[email protected]>

* Adds an extra test to cover the owner of feature

Signed-off-by: verdan <[email protected]>
  • Loading branch information
verdan authored Aug 26, 2020
1 parent a314c89 commit 0558d69
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 31 deletions.
124 changes: 95 additions & 29 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class AtlasProxy(BaseProxy):
ATTRS_KEY = 'attributes'
REL_ATTRS_KEY = 'relationshipAttributes'
ENTITY_URI_KEY = 'entityUri'
user_detail_method = app.config.get('USER_DETAIL_METHOD') or (lambda *args: None)
_CACHE = CacheManager(**parse_cache_config_options({'cache.regions': 'atlas_proxy',
'cache.atlas_proxy.type': 'memory',
'cache.atlas_proxy.expire': _ATLAS_PROXY_CACHE_EXPIRY_SEC}))
Expand Down Expand Up @@ -177,6 +176,22 @@ def _parse_bookmark_qn(self, bookmark_qn: str) -> Dict:
result = pattern.match(bookmark_qn)
return result.groupdict() if result else dict()

def _get_user_details(self, user_id: str, fallback: str = None) -> Union[Dict, str]:
"""
Helper function to help get the user details if the `USER_DETAIL_METHOD` is configured,
else uses the user_id for both email and user_id properties.
:param user_id: The Unique user id of a user entity
:return: a dictionary of user details
"""
if app.config.get('USER_DETAIL_METHOD'):
user_details = app.config.get('USER_DETAIL_METHOD')(user_id) # type: ignore
elif fallback:
user_details = fallback
else:
user_details = {'email': user_id, 'user_id': user_id}

return user_details

def _get_table_entity(self, *, table_uri: str) -> EntityUniqueAttribute:
"""
Fetch information from table_uri and then find the appropriate entity
Expand Down Expand Up @@ -386,10 +401,7 @@ def _get_owners(self, data_owners: list, fallback_owner: str) -> List[User]:

for owner in active_owners:
owner_qn = owner['displayText']
owner_data = self.user_detail_method(owner_qn) or {
'email': owner_qn,
'user_id': owner_qn
}
owner_data = self._get_user_details(owner_qn)
owners_detail.append(User(**owner_data))

return owners_detail or [User(email=fallback_owner, user_id=fallback_owner)]
Expand Down Expand Up @@ -492,7 +504,10 @@ def add_owner(self, *, table_uri: str, owner: str) -> None:
:param owner: Email address of the owner
:return: None, as it simply adds the owner.
"""
if not (self.user_detail_method(owner) or owner):
# Generating owner_info to validate if the user exists
owner_info = self._get_user_details(owner, fallback=owner)

if not owner_info:
raise NotFoundException(f'User "{owner}" does not exist.')

user_dict = {
Expand Down Expand Up @@ -613,19 +628,14 @@ def get_column_description(self, *,
column_name=column_name)
return column_detail[self.ATTRS_KEY].get('description')

def get_popular_tables(self, *, num_entries: int) -> List[PopularTable]:
def _serialize_popular_tables(self, entities: list) -> List[PopularTable]:
"""
:param num_entries: Number of popular tables to fetch
:return: A List of popular tables instances
Gets a list of entities and serialize the popular tables.
:param entities: List of entities from atlas client
:return: a list of PopularTable objects
"""
popular_tables = list()
popular_query_params = {'typeName': 'Table',
'sortBy': 'popularityScore',
'sortOrder': 'DESCENDING',
'excludeDeletedEntities': True,
'limit': num_entries}
search_results = self._driver.search_basic.create(data=popular_query_params)
for table in search_results.entities:
for table in entities:
table_attrs = table.attributes

table_qn = parse_table_qualified_name(
Expand All @@ -646,6 +656,20 @@ def get_popular_tables(self, *, num_entries: int) -> List[PopularTable]:

return popular_tables

def get_popular_tables(self, *, num_entries: int) -> List[PopularTable]:
"""
Generates a list of Popular tables to be shown on the home page of Amundsen.
:param num_entries: Number of popular tables to fetch
:return: A List of popular tables instances
"""
popular_query_params = {'typeName': 'Table',
'sortBy': 'popularityScore',
'sortOrder': 'DESCENDING',
'excludeDeletedEntities': True,
'limit': num_entries}
search_results = self._driver.search_basic.create(data=popular_query_params)
return self._serialize_popular_tables(search_results.entities)

def get_latest_updated_ts(self) -> int:
date = None

Expand Down Expand Up @@ -677,11 +701,15 @@ def get_tags(self) -> List:
)
return tags

def get_dashboard_by_user_relation(self, *, user_email: str, relation_type: UserResourceRel) \
-> Dict[str, List[DashboardSummary]]:
pass

def get_table_by_user_relation(self, *, user_email: str, relation_type: UserResourceRel) -> Dict[str, Any]:
def _get_resources_followed_by_user(self, user_id: str, resource_type: str) \
-> List[Union[PopularTable, DashboardSummary]]:
"""
ToDo (Verdan): Dashboard still needs to be implemented.
Helper function to get the resource, table, dashboard etc followed by a user.
:param user_id: User ID of a user
:param resource_type: Type of a resource that returns, could be table, dashboard etc.
:return: A list of PopularTable, DashboardSummary or any other resource.
"""
params = {
'typeName': self.BOOKMARK_TYPE,
'offset': '0',
Expand All @@ -693,7 +721,7 @@ def get_table_by_user_relation(self, *, user_email: str, relation_type: UserReso
{
'attributeName': self.QN_KEY,
'operator': 'contains',
'attributeValue': f'.{user_email}.bookmark'
'attributeValue': f'.{user_id}.bookmark'
},
{
'attributeName': self.BOOKMARK_ACTIVE_KEY,
Expand All @@ -707,17 +735,58 @@ def get_table_by_user_relation(self, *, user_email: str, relation_type: UserReso
# Fetches the bookmark entities based on filters
search_results = self._driver.search_basic.create(data=params)

results = []
resources = []
for record in search_results.entities:
table_info = self._extract_info_from_uri(table_uri=record.attributes[self.ENTITY_URI_KEY])
res = self._parse_bookmark_qn(record.attributes[self.QN_KEY])
results.append(PopularTable(
resources.append(PopularTable(
database=table_info['entity'],
cluster=res['cluster'],
schema=res['db'],
name=res['table']))
return resources

return {'table': results}
def _get_resources_owned_by_user(self, user_id: str, resource_type: str) \
-> List[Union[PopularTable, DashboardSummary, Any]]:
"""
ToDo (Verdan): Dashboard still needs to be implemented.
Helper function to get the resource, table, dashboard etc owned by a user.
:param user_id: User ID of a user
:param resource_type: Type of a resource that returns, could be table, dashboard etc.
:return: A list of PopularTable, DashboardSummary or any other resource.
"""
resources = list()
user_entity = self._driver.entity_unique_attribute(self.USER_TYPE, qualifiedName=user_id).entity

if not user_entity:
LOGGER.exception(f'User ({user_id}) not found in Atlas')
raise NotFoundException(f'User {user_id} not found.')

resource_guids = list()
for item in user_entity[self.REL_ATTRS_KEY].get('ownerOf') or list():
if (item['entityStatus'] == Status.ACTIVE and
item['relationshipStatus'] == Status.ACTIVE and
item['typeName'] == resource_type):
resource_guids.append(item[self.GUID_KEY])

entities = extract_entities(self._driver.entity_bulk(guid=resource_guids, ignoreRelationships=True))
if resource_type == self.TABLE_ENTITY:
resources = self._serialize_popular_tables(entities)

return resources

def get_dashboard_by_user_relation(self, *, user_email: str, relation_type: UserResourceRel) \
-> Dict[str, List[DashboardSummary]]:
pass

def get_table_by_user_relation(self, *, user_email: str, relation_type: UserResourceRel) -> Dict[str, Any]:
tables = list()
if relation_type == UserResourceRel.follow:
tables = self._get_resources_followed_by_user(user_id=user_email, resource_type=self.TABLE_ENTITY)
elif relation_type == UserResourceRel.own:
tables = self._get_resources_owned_by_user(user_id=user_email, resource_type=self.TABLE_ENTITY)

return {'table': tables}

def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[PopularTable]]:
user = self._driver.entity_unique_attribute(self.USER_TYPE, qualifiedName=user_email).entity
Expand Down Expand Up @@ -832,10 +901,7 @@ def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Rea

for read_entity in read_entities:
reader_qn = read_entity.relationshipAttributes['user']['displayText']
reader_details = self.user_detail_method(reader_qn) or {
'email': reader_qn,
'user_id': reader_qn
}
reader_details = self._get_user_details(reader_qn)
reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count'])

results.append(reader)
Expand Down
13 changes: 12 additions & 1 deletion tests/unit/proxy/fixtures/atlas_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,18 @@ class Data:
"relationshipStatus": "INACTIVE",
"guid": "3"
}
]
],
"""
if (item['entityStatus'] == Status.ACTIVE and
item['relationshipStatus'] == Status.ACTIVE and
item['typeName'] == resource_type):
"""
"ownerOf": [{
"entityStatus": "ACTIVE",
"relationshipStatus": "ACTIVE",
"typeName": "Table",
"guid": entity1["guid"]
}]
}
}

Expand Down
21 changes: 20 additions & 1 deletion tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def test_put_column_description(self) -> None:
column_name=attributes['name'],
description='DOESNT_MATTER')

def test_get_table_by_user_relation(self) -> None:
def test_get_table_by_user_relation_follow(self) -> None:
bookmark1 = copy.deepcopy(self.bookmark_entity1)
bookmark1 = self.to_class(bookmark1)
bookmark_collection = MagicMock()
Expand All @@ -319,6 +319,25 @@ def test_get_table_by_user_relation(self) -> None:

self.assertEqual(res, {'table': expected})

def test_get_table_by_user_relation_own(self) -> None:
unique_attr_response = MagicMock()
unique_attr_response.entity = Data.user_entity_2
self.proxy._driver.entity_unique_attribute = MagicMock(return_value=unique_attr_response)

entity_bulk_result = MagicMock()
entity_bulk_result.entities = [DottedDict(self.entity1)]
self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result])

res = self.proxy.get_table_by_user_relation(user_email='test_user_id',
relation_type=UserResourceRel.own)

ent1_attrs = cast(dict, self.entity1['attributes'])

expected = [PopularTable(database=self.entity_type, cluster=self.cluster, schema=self.db,
name=ent1_attrs['name'], description=ent1_attrs['description'])]

self.assertEqual({'table': expected}, res)

def test_add_resource_relation_by_user(self) -> None:
bookmark_entity = self._mock_get_bookmark_entity()
with patch.object(bookmark_entity, 'update') as mock_execute:
Expand Down

0 comments on commit 0558d69

Please sign in to comment.