From 7b6c8933e16496fbceadffce9104febafdb3bf76 Mon Sep 17 00:00:00 2001 From: verdan Date: Thu, 30 Jul 2020 10:58:13 +0200 Subject: [PATCH] Implements the Data Owner functionality for the Table Signed-off-by: verdan Updates Signed-off-by: verdan Refactoring - Code Reviews Signed-off-by: verdan Refactoring Signed-off-by: verdan Updates the test cases, based on owner changes Signed-off-by: verdan Updates the lambda function definition Signed-off-by: verdan --- docs/configurations.md | 6 +- metadata_service/api/user.py | 3 +- metadata_service/proxy/atlas_proxy.py | 140 +++++++++++++++++++------- requirements.txt | 2 +- tests/unit/proxy/test_atlas_proxy.py | 18 +++- 5 files changed, 119 insertions(+), 50 deletions(-) diff --git a/docs/configurations.md b/docs/configurations.md index f7cedb23..17efd5cd 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -2,14 +2,12 @@ Most of the configurations are set through Flask [Config Class](https://github.c #### USER_DETAIL_METHOD `OPTIONAL` This is a method that can be used to get the user details from any third-party or custom system. -This custom function takes user_id as a parameter, and returns a tuple consisting user details defined in [UserSchema](https://github.com/lyft/amundsencommon/blob/master/amundsen_common/models/user.py) along with the status code. +This custom function takes user_id as a parameter, and returns a dictionary consisting user details' fields defined in [UserSchema](https://github.com/lyft/amundsencommon/blob/master/amundsen_common/models/user.py). Example: ```python def get_user_details(user_id): - from amundsen_common.models.user import UserSchema - from http import HTTPStatus user_info = { 'email': 'test@email.com', 'user_id': user_id, @@ -17,7 +15,7 @@ def get_user_details(user_id): 'last_name': 'Lastname', 'full_name': 'Firstname Lastname', } - return UserSchema().dump(user_info).data, HTTPStatus.OK + return user_info USER_DETAIL_METHOD = get_user_details ``` diff --git a/metadata_service/api/user.py b/metadata_service/api/user.py index ebb33978..d1348d5e 100644 --- a/metadata_service/api/user.py +++ b/metadata_service/api/user.py @@ -34,7 +34,8 @@ def __init__(self) -> None: def get(self, *, id: Optional[str] = None) -> Iterable[Union[Mapping, int, None]]: if app.config['USER_DETAIL_METHOD']: try: - return app.config['USER_DETAIL_METHOD'](id) + user_data = app.config['USER_DETAIL_METHOD'](id) + return UserSchema().dump(user_data).data, HTTPStatus.OK except Exception: LOGGER.exception('UserDetailAPI GET Failed - Using "USER_DETAIL_METHOD" config variable') return {'message': 'user_id {} fetch failed'.format(id)}, HTTPStatus.NOT_FOUND diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index 1d91d27e..f0b65b57 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -6,13 +6,13 @@ from random import randint from typing import Any, Dict, List, Union, Optional +from amundsen_common.models.dashboard import DashboardSummary from amundsen_common.models.popular_table import PopularTable -from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader,\ +from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader, \ ProgrammaticDescription, ResourceReport from amundsen_common.models.user import User as UserEntity -from amundsen_common.models.dashboard import DashboardSummary from atlasclient.client import Atlas -from atlasclient.exceptions import BadRequest +from atlasclient.exceptions import BadRequest, Conflict, NotFound from atlasclient.models import EntityUniqueAttribute from atlasclient.utils import (make_table_qualified_name, parse_table_qualified_name, @@ -23,8 +23,8 @@ from metadata_service.entity.dashboard_detail import DashboardDetail as DashboardDetailEntity from metadata_service.entity.description import Description -from metadata_service.entity.tag_detail import TagDetail from metadata_service.entity.resource_type import ResourceType +from metadata_service.entity.tag_detail import TagDetail from metadata_service.exception import NotFoundException from metadata_service.proxy import BaseProxy from metadata_service.util import UserResourceRel @@ -35,6 +35,11 @@ _ATLAS_PROXY_CACHE_EXPIRY_SEC = 11 * 60 * 60 + randint(0, 3600) +class Status: + ACTIVE = "ACTIVE" + DELETED = "DELETED" + + # noinspection PyMethodMayBeStatic class AtlasProxy(BaseProxy): """ @@ -49,11 +54,11 @@ class AtlasProxy(BaseProxy): READER_TYPE = 'Reader' QN_KEY = 'qualifiedName' BOOKMARK_ACTIVE_KEY = 'active' - ENTITY_ACTIVE_STATUS = 'ACTIVE' GUID_KEY = 'guid' ATTRS_KEY = 'attributes' REL_ATTRS_KEY = 'relationshipAttributes' ENTITY_URI_KEY = 'entityUri' + user_detail_method = app.config.get('USER_DETAIL_METHOD') or (lambda *args: None) _CACHE = CacheManager(**parse_cache_config_options({'cache.regions': 'atlas_proxy', 'cache.atlas_proxy.type': 'memory', 'cache.atlas_proxy.expire': _ATLAS_PROXY_CACHE_EXPIRY_SEC})) @@ -355,7 +360,7 @@ def _get_reports(self, guids: List[str]) -> List[ResourceReport]: report_entities_collection = self._driver.entity_bulk(guid=guids) for report_entity in extract_entities(report_entities_collection): try: - if report_entity.status == self.ENTITY_ACTIVE_STATUS: + if report_entity.status == Status.ACTIVE: report_attrs = report_entity.attributes reports.append( ResourceReport( @@ -372,6 +377,24 @@ def _get_reports(self, guids: List[str]) -> List[ResourceReport]: return parsed_reports + def _get_owners(self, data_owner: list, fallback_owner: str) -> List[User]: + data_owners = list() + active_owners = filter(lambda item: + item['entityStatus'] == Status.ACTIVE and + item['relationshipStatus'] == Status.ACTIVE, + data_owner) + + for owner in active_owners: + owner_qn = owner['displayText'] + # noinspection PyArgumentList + owner_data = self.user_detail_method(owner_qn) or { + 'email': owner_qn, + 'user_id': owner_qn + } + data_owners.append(User(**owner_data)) + + return data_owners or [User(email=fallback_owner, user_id=fallback_owner)] + def get_user(self, *, id: str) -> Union[UserEntity, None]: pass @@ -391,7 +414,7 @@ def get_table(self, *, table_uri: str) -> Table: try: attrs = table_details[self.ATTRS_KEY] - programmatic_descriptions = self._get_programmatic_descriptions(attrs.get('parameters')) + programmatic_descriptions = self._get_programmatic_descriptions(attrs.get('parameters', dict())) table_qn = parse_table_qualified_name( qualified_name=attrs.get(self.QN_KEY) @@ -399,7 +422,7 @@ def get_table(self, *, table_uri: str) -> Table: tags = [] # Using or in case, if the key 'classifications' is there with a None - for classification in table_details.get("classifications") or list(): + for classification in table_details.get('classifications') or list(): tags.append( Tag( tag_name=classification.get('typeName'), @@ -418,7 +441,7 @@ def get_table(self, *, table_uri: str) -> Table: name=attrs.get('name') or table_qn.get("table_name", ''), tags=tags, description=attrs.get('description') or attrs.get('comment'), - owners=[User(email=attrs.get('owner'))], + owners=self._get_owners(table_details[self.REL_ATTRS_KEY].get('ownedBy'), attrs.get('owner')), resource_reports=self._get_reports(guids=reports_guids), columns=columns, table_readers=self._get_readers(attrs.get(self.QN_KEY)), @@ -434,20 +457,72 @@ def get_table(self, *, table_uri: str) -> Table: .format(table_uri=table_uri)) def delete_owner(self, *, table_uri: str, owner: str) -> None: - pass + """ + + :param table_uri: + :param owner: + :return: + """ + table = self._get_table_entity(table_uri=table_uri) + table_entity = table.entity + + if table_entity[self.REL_ATTRS_KEY].get("ownedBy"): + try: + active_owners = filter(lambda item: + item['relationshipStatus'] == Status.ACTIVE + and item['displayText'] == owner, + table_entity[self.REL_ATTRS_KEY]['ownedBy']) + if list(active_owners): + self._driver.relationship_guid(next(active_owners) + .get('relationshipGuid')).delete() + else: + raise BadRequest('You can not delete this owner.') + except NotFound as ex: + LOGGER.exception('Error while removing table data owner. {}' + .format(str(ex))) def add_owner(self, *, table_uri: str, owner: str) -> None: """ - It simply replaces the owner field in atlas with the new string. - FixMe (Verdan): Implement multiple data owners and - atlas changes in the documentation if needed to make owner field a list + Query on Atlas User entity to find if the entity exist for the + owner string in parameter, if not create one. And then use that User + entity's GUID and add a relationship between Table and User, on ownedBy field. :param table_uri: :param owner: Email address of the owner :return: None, as it simply adds the owner. """ - entity = self._get_table_entity(table_uri=table_uri) - entity.entity[self.ATTRS_KEY]['owner'] = owner - entity.update() + # noinspection PyArgumentList + if not (self.user_detail_method(owner) or owner): + raise NotFoundException(f'User "{owner}" does not exist.') + + user_dict = { + "entity": { + "typeName": "User", + "attributes": {"qualifiedName": owner}, + } + } + + # Get or Create a User + user_entity = self._driver.entity_post.create(data=user_dict) + user_guid = next(iter(user_entity.get("guidAssignments").values())) + + table = self._get_table_entity(table_uri=table_uri) + + entity_def = { + "typeName": "DataSet_Users_Owner", + "end1": { + "guid": table.entity.get("guid"), "typeName": "Table", + }, + "end2": { + "guid": user_guid, "typeName": "User", + }, + } + try: + self._driver.relationship.create(data=entity_def) + except Conflict as ex: + LOGGER.exception('Error while adding the owner information. {}' + .format(str(ex))) + raise BadRequest(f'User {owner} is already added as a data owner for ' + f'table {table_uri}.') def get_table_description(self, *, table_uri: str) -> Union[str, None]: @@ -641,7 +716,7 @@ def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[Popul entity_status = user_reads['entityStatus'] relationship_status = user_reads['relationshipStatus'] - if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE': + if entity_status == Status.ACTIVE and relationship_status == Status.ACTIVE: readers_guids.append(user_reads['guid']) readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True)) @@ -679,16 +754,7 @@ def add_resource_relation_by_user(self, *, if resource_type is not ResourceType.Table: raise NotImplemented('resource type {} is not supported'.format(resource_type)) - self._add_table_relation_by_user(table_uri=id, - user_email=user_id, - relation_type=relation_type) - - def _add_table_relation_by_user(self, *, - table_uri: str, - user_email: str, - relation_type: UserResourceRel) -> None: - - entity = self._get_bookmark_entity(entity_uri=table_uri, user_id=user_email) + entity = self._get_bookmark_entity(entity_uri=id, user_id=user_id) entity.entity[self.ATTRS_KEY][self.BOOKMARK_ACTIVE_KEY] = True entity.update() @@ -700,15 +766,7 @@ def delete_resource_relation_by_user(self, *, if resource_type is not ResourceType.Table: raise NotImplemented('resource type {} is not supported'.format(resource_type)) - self._delete_table_relation_by_user(table_uri=id, - user_email=user_id, - relation_type=relation_type) - - def _delete_table_relation_by_user(self, *, - table_uri: str, - user_email: str, - relation_type: UserResourceRel) -> None: - entity = self._get_bookmark_entity(entity_uri=table_uri, user_id=user_email) + entity = self._get_bookmark_entity(entity_uri=id, user_id=user_id) entity.entity[self.ATTRS_KEY][self.BOOKMARK_ACTIVE_KEY] = False entity.update() @@ -762,9 +820,13 @@ def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Rea read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False)) for read_entity in read_entities: - reader = Reader(user=User(email=read_entity.relationshipAttributes['user']['displayText'], - user_id=read_entity.relationshipAttributes['user']['displayText']), - read_count=read_entity.attributes['count']) + reader_qn = read_entity.relationshipAttributes['user']['displayText'] + # noinspection PyArgumentList + reader_details = self.user_detail_method(reader_qn) or { + 'email': reader_qn, + 'user_id': reader_qn + } + reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count']) results.append(reader) diff --git a/requirements.txt b/requirements.txt index 9303a357..b61321ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -67,7 +67,7 @@ neotime==1.7.1 pytz==2018.4 requests-aws4auth==0.9 statsd==3.2.1 -pyatlasclient==1.0.4 +pyatlasclient==1.0.5 beaker>=1.10.0 mocket==3.7.3 overrides==2.5 diff --git a/tests/unit/proxy/test_atlas_proxy.py b/tests/unit/proxy/test_atlas_proxy.py index 14b1e824..c939f1be 100644 --- a/tests/unit/proxy/test_atlas_proxy.py +++ b/tests/unit/proxy/test_atlas_proxy.py @@ -109,14 +109,14 @@ def _get_table(self, custom_stats_format: bool = False) -> None: test_exp_col = self.test_exp_col_stats_formatted else: test_exp_col = self.test_exp_col_stats_raw - + ent_attrs = cast(dict, self.entity1['attributes']) self._mock_get_table_entity() self._create_mocked_report_entities_collection() + self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore self.proxy._driver.entity_bulk = MagicMock(return_value=self.report_entity_collection) response = self.proxy.get_table(table_uri=self.table_uri) classif_name = self.classification_entity['classifications'][0]['typeName'] - ent_attrs = cast(dict, self.entity1['attributes']) col_attrs = cast(dict, self.test_column['attributes']) exp_col_stats = list() @@ -256,10 +256,18 @@ def test_delete_tag(self) -> None: def test_add_owner(self) -> None: owner = "OWNER" - entity = self._mock_get_table_entity() - with patch.object(entity, 'update') as mock_execute: + user_guid = 123 + self._mock_get_table_entity() + self.proxy._driver.entity_post = MagicMock() + self.proxy._driver.entity_post.create = MagicMock(return_value={"guidAssignments": {user_guid: user_guid}}) + + with patch.object(self.proxy._driver.relationship, 'create') as mock_execute: self.proxy.add_owner(table_uri=self.table_uri, owner=owner) - mock_execute.assert_called_with() + mock_execute.assert_called_with( + data={'typeName': 'DataSet_Users_Owner', + 'end1': {'guid': self.entity1['guid'], 'typeName': 'Table'}, + 'end2': {'guid': user_guid, 'typeName': 'User'}} + ) def test_get_column(self) -> None: self._mock_get_table_entity()