Skip to content

Commit

Permalink
Implements the Data Owner functionality for the Table
Browse files Browse the repository at this point in the history
Signed-off-by: verdan <[email protected]>

Updates

Signed-off-by: verdan <[email protected]>

Refactoring - Code Reviews

Signed-off-by: verdan <[email protected]>

Refactoring

Signed-off-by: verdan <[email protected]>

Updates the test cases, based on owner changes

Signed-off-by: verdan <[email protected]>

Updates the lambda function definition

Signed-off-by: verdan <[email protected]>
  • Loading branch information
verdan committed Aug 17, 2020
1 parent 6178a03 commit 7b6c893
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 50 deletions.
6 changes: 2 additions & 4 deletions docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ Most of the configurations are set through Flask [Config Class](https://github.c

#### USER_DETAIL_METHOD `OPTIONAL`
This is a method that can be used to get the user details from any third-party or custom system.
This custom function takes user_id as a parameter, and returns a tuple consisting user details defined in [UserSchema](https://github.com/lyft/amundsencommon/blob/master/amundsen_common/models/user.py) along with the status code.
This custom function takes user_id as a parameter, and returns a dictionary consisting user details' fields defined in [UserSchema](https://github.com/lyft/amundsencommon/blob/master/amundsen_common/models/user.py).

Example:
```python

def get_user_details(user_id):
from amundsen_common.models.user import UserSchema
from http import HTTPStatus
user_info = {
'email': '[email protected]',
'user_id': user_id,
'first_name': 'Firstname',
'last_name': 'Lastname',
'full_name': 'Firstname Lastname',
}
return UserSchema().dump(user_info).data, HTTPStatus.OK
return user_info

USER_DETAIL_METHOD = get_user_details
```
Expand Down
3 changes: 2 additions & 1 deletion metadata_service/api/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(self) -> None:
def get(self, *, id: Optional[str] = None) -> Iterable[Union[Mapping, int, None]]:
if app.config['USER_DETAIL_METHOD']:
try:
return app.config['USER_DETAIL_METHOD'](id)
user_data = app.config['USER_DETAIL_METHOD'](id)
return UserSchema().dump(user_data).data, HTTPStatus.OK
except Exception:
LOGGER.exception('UserDetailAPI GET Failed - Using "USER_DETAIL_METHOD" config variable')
return {'message': 'user_id {} fetch failed'.format(id)}, HTTPStatus.NOT_FOUND
Expand Down
140 changes: 101 additions & 39 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from random import randint
from typing import Any, Dict, List, Union, Optional

from amundsen_common.models.dashboard import DashboardSummary
from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader,\
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader, \
ProgrammaticDescription, ResourceReport
from amundsen_common.models.user import User as UserEntity
from amundsen_common.models.dashboard import DashboardSummary
from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest
from atlasclient.exceptions import BadRequest, Conflict, NotFound
from atlasclient.models import EntityUniqueAttribute
from atlasclient.utils import (make_table_qualified_name,
parse_table_qualified_name,
Expand All @@ -23,8 +23,8 @@

from metadata_service.entity.dashboard_detail import DashboardDetail as DashboardDetailEntity
from metadata_service.entity.description import Description
from metadata_service.entity.tag_detail import TagDetail
from metadata_service.entity.resource_type import ResourceType
from metadata_service.entity.tag_detail import TagDetail
from metadata_service.exception import NotFoundException
from metadata_service.proxy import BaseProxy
from metadata_service.util import UserResourceRel
Expand All @@ -35,6 +35,11 @@
_ATLAS_PROXY_CACHE_EXPIRY_SEC = 11 * 60 * 60 + randint(0, 3600)


class Status:
ACTIVE = "ACTIVE"
DELETED = "DELETED"


# noinspection PyMethodMayBeStatic
class AtlasProxy(BaseProxy):
"""
Expand All @@ -49,11 +54,11 @@ class AtlasProxy(BaseProxy):
READER_TYPE = 'Reader'
QN_KEY = 'qualifiedName'
BOOKMARK_ACTIVE_KEY = 'active'
ENTITY_ACTIVE_STATUS = 'ACTIVE'
GUID_KEY = 'guid'
ATTRS_KEY = 'attributes'
REL_ATTRS_KEY = 'relationshipAttributes'
ENTITY_URI_KEY = 'entityUri'
user_detail_method = app.config.get('USER_DETAIL_METHOD') or (lambda *args: None)
_CACHE = CacheManager(**parse_cache_config_options({'cache.regions': 'atlas_proxy',
'cache.atlas_proxy.type': 'memory',
'cache.atlas_proxy.expire': _ATLAS_PROXY_CACHE_EXPIRY_SEC}))
Expand Down Expand Up @@ -355,7 +360,7 @@ def _get_reports(self, guids: List[str]) -> List[ResourceReport]:
report_entities_collection = self._driver.entity_bulk(guid=guids)
for report_entity in extract_entities(report_entities_collection):
try:
if report_entity.status == self.ENTITY_ACTIVE_STATUS:
if report_entity.status == Status.ACTIVE:
report_attrs = report_entity.attributes
reports.append(
ResourceReport(
Expand All @@ -372,6 +377,24 @@ def _get_reports(self, guids: List[str]) -> List[ResourceReport]:

return parsed_reports

def _get_owners(self, data_owner: list, fallback_owner: str) -> List[User]:
data_owners = list()
active_owners = filter(lambda item:
item['entityStatus'] == Status.ACTIVE and
item['relationshipStatus'] == Status.ACTIVE,
data_owner)

for owner in active_owners:
owner_qn = owner['displayText']
# noinspection PyArgumentList
owner_data = self.user_detail_method(owner_qn) or {
'email': owner_qn,
'user_id': owner_qn
}
data_owners.append(User(**owner_data))

return data_owners or [User(email=fallback_owner, user_id=fallback_owner)]

def get_user(self, *, id: str) -> Union[UserEntity, None]:
pass

Expand All @@ -391,15 +414,15 @@ def get_table(self, *, table_uri: str) -> Table:
try:
attrs = table_details[self.ATTRS_KEY]

programmatic_descriptions = self._get_programmatic_descriptions(attrs.get('parameters'))
programmatic_descriptions = self._get_programmatic_descriptions(attrs.get('parameters', dict()))

table_qn = parse_table_qualified_name(
qualified_name=attrs.get(self.QN_KEY)
)

tags = []
# Using or in case, if the key 'classifications' is there with a None
for classification in table_details.get("classifications") or list():
for classification in table_details.get('classifications') or list():
tags.append(
Tag(
tag_name=classification.get('typeName'),
Expand All @@ -418,7 +441,7 @@ def get_table(self, *, table_uri: str) -> Table:
name=attrs.get('name') or table_qn.get("table_name", ''),
tags=tags,
description=attrs.get('description') or attrs.get('comment'),
owners=[User(email=attrs.get('owner'))],
owners=self._get_owners(table_details[self.REL_ATTRS_KEY].get('ownedBy'), attrs.get('owner')),
resource_reports=self._get_reports(guids=reports_guids),
columns=columns,
table_readers=self._get_readers(attrs.get(self.QN_KEY)),
Expand All @@ -434,20 +457,72 @@ def get_table(self, *, table_uri: str) -> Table:
.format(table_uri=table_uri))

def delete_owner(self, *, table_uri: str, owner: str) -> None:
pass
"""
:param table_uri:
:param owner:
:return:
"""
table = self._get_table_entity(table_uri=table_uri)
table_entity = table.entity

if table_entity[self.REL_ATTRS_KEY].get("ownedBy"):
try:
active_owners = filter(lambda item:
item['relationshipStatus'] == Status.ACTIVE
and item['displayText'] == owner,
table_entity[self.REL_ATTRS_KEY]['ownedBy'])
if list(active_owners):
self._driver.relationship_guid(next(active_owners)
.get('relationshipGuid')).delete()
else:
raise BadRequest('You can not delete this owner.')
except NotFound as ex:
LOGGER.exception('Error while removing table data owner. {}'
.format(str(ex)))

def add_owner(self, *, table_uri: str, owner: str) -> None:
"""
It simply replaces the owner field in atlas with the new string.
FixMe (Verdan): Implement multiple data owners and
atlas changes in the documentation if needed to make owner field a list
Query on Atlas User entity to find if the entity exist for the
owner string in parameter, if not create one. And then use that User
entity's GUID and add a relationship between Table and User, on ownedBy field.
:param table_uri:
:param owner: Email address of the owner
:return: None, as it simply adds the owner.
"""
entity = self._get_table_entity(table_uri=table_uri)
entity.entity[self.ATTRS_KEY]['owner'] = owner
entity.update()
# noinspection PyArgumentList
if not (self.user_detail_method(owner) or owner):
raise NotFoundException(f'User "{owner}" does not exist.')

user_dict = {
"entity": {
"typeName": "User",
"attributes": {"qualifiedName": owner},
}
}

# Get or Create a User
user_entity = self._driver.entity_post.create(data=user_dict)
user_guid = next(iter(user_entity.get("guidAssignments").values()))

table = self._get_table_entity(table_uri=table_uri)

entity_def = {
"typeName": "DataSet_Users_Owner",
"end1": {
"guid": table.entity.get("guid"), "typeName": "Table",
},
"end2": {
"guid": user_guid, "typeName": "User",
},
}
try:
self._driver.relationship.create(data=entity_def)
except Conflict as ex:
LOGGER.exception('Error while adding the owner information. {}'
.format(str(ex)))
raise BadRequest(f'User {owner} is already added as a data owner for '
f'table {table_uri}.')

def get_table_description(self, *,
table_uri: str) -> Union[str, None]:
Expand Down Expand Up @@ -641,7 +716,7 @@ def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[Popul
entity_status = user_reads['entityStatus']
relationship_status = user_reads['relationshipStatus']

if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE':
if entity_status == Status.ACTIVE and relationship_status == Status.ACTIVE:
readers_guids.append(user_reads['guid'])

readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True))
Expand Down Expand Up @@ -679,16 +754,7 @@ def add_resource_relation_by_user(self, *,
if resource_type is not ResourceType.Table:
raise NotImplemented('resource type {} is not supported'.format(resource_type))

self._add_table_relation_by_user(table_uri=id,
user_email=user_id,
relation_type=relation_type)

def _add_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:

entity = self._get_bookmark_entity(entity_uri=table_uri, user_id=user_email)
entity = self._get_bookmark_entity(entity_uri=id, user_id=user_id)
entity.entity[self.ATTRS_KEY][self.BOOKMARK_ACTIVE_KEY] = True
entity.update()

Expand All @@ -700,15 +766,7 @@ def delete_resource_relation_by_user(self, *,
if resource_type is not ResourceType.Table:
raise NotImplemented('resource type {} is not supported'.format(resource_type))

self._delete_table_relation_by_user(table_uri=id,
user_email=user_id,
relation_type=relation_type)

def _delete_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:
entity = self._get_bookmark_entity(entity_uri=table_uri, user_id=user_email)
entity = self._get_bookmark_entity(entity_uri=id, user_id=user_id)
entity.entity[self.ATTRS_KEY][self.BOOKMARK_ACTIVE_KEY] = False
entity.update()

Expand Down Expand Up @@ -762,9 +820,13 @@ def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Rea
read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))

for read_entity in read_entities:
reader = Reader(user=User(email=read_entity.relationshipAttributes['user']['displayText'],
user_id=read_entity.relationshipAttributes['user']['displayText']),
read_count=read_entity.attributes['count'])
reader_qn = read_entity.relationshipAttributes['user']['displayText']
# noinspection PyArgumentList
reader_details = self.user_detail_method(reader_qn) or {
'email': reader_qn,
'user_id': reader_qn
}
reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count'])

results.append(reader)

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ neotime==1.7.1
pytz==2018.4
requests-aws4auth==0.9
statsd==3.2.1
pyatlasclient==1.0.4
pyatlasclient==1.0.5
beaker>=1.10.0
mocket==3.7.3
overrides==2.5
Expand Down
18 changes: 13 additions & 5 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
test_exp_col = self.test_exp_col_stats_formatted
else:
test_exp_col = self.test_exp_col_stats_raw

ent_attrs = cast(dict, self.entity1['attributes'])
self._mock_get_table_entity()
self._create_mocked_report_entities_collection()
self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore
self.proxy._driver.entity_bulk = MagicMock(return_value=self.report_entity_collection)
response = self.proxy.get_table(table_uri=self.table_uri)

classif_name = self.classification_entity['classifications'][0]['typeName']
ent_attrs = cast(dict, self.entity1['attributes'])

col_attrs = cast(dict, self.test_column['attributes'])
exp_col_stats = list()
Expand Down Expand Up @@ -256,10 +256,18 @@ def test_delete_tag(self) -> None:

def test_add_owner(self) -> None:
owner = "OWNER"
entity = self._mock_get_table_entity()
with patch.object(entity, 'update') as mock_execute:
user_guid = 123
self._mock_get_table_entity()
self.proxy._driver.entity_post = MagicMock()
self.proxy._driver.entity_post.create = MagicMock(return_value={"guidAssignments": {user_guid: user_guid}})

with patch.object(self.proxy._driver.relationship, 'create') as mock_execute:
self.proxy.add_owner(table_uri=self.table_uri, owner=owner)
mock_execute.assert_called_with()
mock_execute.assert_called_with(
data={'typeName': 'DataSet_Users_Owner',
'end1': {'guid': self.entity1['guid'], 'typeName': 'Table'},
'end2': {'guid': user_guid, 'typeName': 'User'}}
)

def test_get_column(self) -> None:
self._mock_get_table_entity()
Expand Down

0 comments on commit 7b6c893

Please sign in to comment.