From 19983b4b940330ef04db4606d7e9e8cac9ba890b Mon Sep 17 00:00:00 2001 From: Andrew Ciambrone Date: Mon, 22 Feb 2021 14:59:39 -0500 Subject: [PATCH 1/3] Some fixes to neptune integration Signed-off-by: Andrew Ciambrone --- metadata_service/config.py | 38 +++++++++++++++ metadata_service/proxy/gremlin_proxy.py | 47 ++++++++++--------- metadata_service/proxy/neptune_proxy.py | 9 ++-- .../proxy/roundtrip/test_neptune_proxy.py | 10 +++- 4 files changed, 75 insertions(+), 29 deletions(-) diff --git a/metadata_service/config.py b/metadata_service/config.py index 413a8753..92f6e80e 100644 --- a/metadata_service/config.py +++ b/metadata_service/config.py @@ -5,7 +5,9 @@ import os from typing import Dict, List, Optional, Set # noqa: F401 +import boto3 from amundsen_gremlin.config import LocalGremlinConfig +from amundsen_gremlin.test_and_development_shard import shard_set_explicitly from metadata_service.entity.badge import Badge @@ -114,3 +116,39 @@ class LocalConfig(LocalGremlinConfig, Config): 'title': 'Metadata Service', 'uiversion': 3 } + + +# The databuilder expects this to be False currently. We are defaulting to true because the testing expects this +if bool(distutils.util.strtobool(os.environ.get('IGNORE_NEPTUNE_SHARD', 'False'))): + shard_set_explicitly('') + + +class NeptuneConfig(LocalGremlinConfig, Config): + DEBUG = False + TESTING = False + LOG_LEVEL = 'INFO' + LOCAL_HOST = '0.0.0.0' + + # FORMAT: wss://:/gremlin + PROXY_HOST = os.environ.get('PROXY_HOST') + PROXY_PORT = None + PROXY_CLIENT = PROXY_CLIENTS['NEPTUNE'] + PROXY_ENCRYPTED = bool(distutils.util.strtobool(os.environ.get(PROXY_ENCRYPTED, 'True'))) + PROXY_VALIDATE_SSL = bool(distutils.util.strtobool(os.environ.get(PROXY_VALIDATE_SSL, 'False'))) + PROXY_PASSWORD = boto3.session.Session(region_name=os.environ.get('AWS_REGION', 'us-east-1')) + + PROXY_CLIENT_KWARGS = { + 'neptune_bulk_loader_s3_bucket_name': os.environ.get('S3_BUCKET_NAME'), + } + + JANUS_GRAPH_URL = None + + IS_STATSD_ON = bool(distutils.util.strtobool(os.environ.get(IS_STATSD_ON, 'False'))) + + SWAGGER_ENABLED = True + SWAGGER_TEMPLATE_PATH = os.path.join('api', 'swagger_doc', 'template.yml') + SWAGGER = { + 'openapi': '3.0.2', + 'title': 'Metadata Service', + 'uiversion': 3 + } diff --git a/metadata_service/proxy/gremlin_proxy.py b/metadata_service/proxy/gremlin_proxy.py index 7284253f..619ad15d 100644 --- a/metadata_service/proxy/gremlin_proxy.py +++ b/metadata_service/proxy/gremlin_proxy.py @@ -456,11 +456,13 @@ def _V(label: Union[str, VertexTypes, VertexType], key: Optional[Union[str, Text if key_property_name is None: raise AssertionError('expected key_property_name') g = g.V().has(get_label_from(label), key_property_name, key) - properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) + if get_shard(): + properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) else: # let's support hasLabel, but need to limit it to either the test_shard (or unsharded perhaps) g = g.V().hasLabel(get_label_from(label)) - properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) + if get_shard(): + properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) # should we do this when using the V(id)? there are a couple or one case where we use it to filter so seems handy if properties is not None: @@ -524,7 +526,8 @@ def _upsert(*, executor: ExecuteQuery, execute: Callable[[ResultSet], TYPE] = Fr id = label.id(key=key, **properties) else: raise AssertionError('wat') # appease mypy - properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) + if get_shard(): + properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) existing_node = executor(query=g.V(id).valueMap(True), get=FromResultSet.getOptional) _label = get_label_from(label) @@ -1025,8 +1028,7 @@ def get_users(self) -> List[User]: @timer_with_counter @overrides - def get_table(self, *, table_uri: str, - is_reviewer: bool = False) -> Table: + def get_table(self, *, table_uri: str, is_reviewer: bool = False) -> Table: """ :param table_uri: Table URI :return: A Table object @@ -1040,9 +1042,7 @@ def get_table(self, *, table_uri: str, readers = self._get_table_readers(table_uri=table_uri) users_by_type: Dict[str, List[User]] = {} - users_by_type['owner'] = sorted( - _safe_get_list(result, f'all_owners', transform=self._convert_to_user) or [], - key=attrgetter('user_id')) + users_by_type['owner'] = _safe_get_list(result, f'all_owners', transform=self._convert_to_user) or [] stats = _safe_get_list(result, 'stats', transform=self._convert_to_statistics) or [] @@ -1087,7 +1087,7 @@ def _get_table_itself(self, *, table_uri: str) -> Mapping[str, Any]: hasLabel(VertexTypes.Application.value.label).fold()).as_('application') g = g.coalesce(select('table').outE(EdgeTypes.LastUpdatedAt.value.label).inV(). hasLabel(VertexTypes.Updatedtimestamp.value.label). - values('latest_timestamp').fold()).as_('timestamp') + values('timestamp').fold()).as_('timestamp') g = g.coalesce(select('table').inE(EdgeTypes.Tag.value.label).outV(). hasLabel(VertexTypes.Tag.value.label).fold()).as_('tags') g = g.coalesce(select('table').outE(EdgeTypes.Source.value.label).inV(). @@ -1095,10 +1095,10 @@ def _get_table_itself(self, *, table_uri: str) -> Mapping[str, Any]: g = g.coalesce(select('table').outE(EdgeTypes.Stat.value.label).inV(). hasLabel(VertexTypes.Stat.value.label).fold()).as_('stats') g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label). - inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description') - g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label). - inV().has(VertexTypes.Description.value.label, 'source', without('user')).fold()). \ - as_('programmatic_descriptions') + inV().hasLabel(VertexTypes.Description.value.label).fold()).as_('description') + g = g.coalesce( + select('table').out(EdgeTypes.Description.value.label).hasLabel('Programmatic_Description').fold() + ).as_('programmatic_descriptions') g = g.coalesce(select('table').inE(EdgeTypes.Read.value.label). has('date', gte(date.today() - timedelta(days=5))). where(outV().hasLabel(VertexTypes.User.value.label)). @@ -1136,8 +1136,9 @@ def _get_table_columns(self, *, table_uri: str) -> List[Column]: g = _V(g=self.g, label=VertexTypes.Table.value.label, key=table_uri). \ outE(EdgeTypes.Column.value.label). \ inV().hasLabel(VertexTypes.Column.value.label).as_('column') - g = g.coalesce(select('column').outE(EdgeTypes.Description.value.label). - inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description') + g = g.coalesce( + select('column').out(EdgeTypes.Description.value.label).hasLabel(VertexTypes.Description.value.label).fold() + ).as_('description') g = g.coalesce(select('column').outE(EdgeTypes.Stat.value.label).inV(). hasLabel(VertexTypes.Stat.value.label).fold()).as_('stats') g = g.select('column', 'description', 'stats'). \ @@ -1151,7 +1152,7 @@ def _get_table_columns(self, *, table_uri: str) -> List[Column]: col = Column(name=_safe_get(result, 'column', 'name'), key=_safe_get(result, 'column', self.key_property_name), description=_safe_get(result, 'description', 'description'), - col_type=_safe_get(result, 'column', 'col_type'), + col_type=_safe_get(result, 'column', 'type'), sort_order=_safe_get(result, 'column', 'sort_order', transform=int), stats=_safe_get_list(result, 'stats', transform=self._convert_to_statistics) or []) cols.append(col) @@ -1397,8 +1398,9 @@ def get_popular_tables(self, *, outV().hasLabel(VertexTypes.Cluster.value.label).as_('cluster') g = g.inE(EdgeTypes.Cluster.value.label). \ outV().hasLabel(VertexTypes.Database.value.label).as_('database') - g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label). - inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description') + g = g.coalesce( + select('table').out(EdgeTypes.Description.value.label).hasLabel(VertexTypes.Description.value.label).fold() + ).as_('description') g = g.select('database', 'cluster', 'schema', 'table', 'description'). \ by('name').by('name').by('name').by('name').by(unfold().values('description').fold()) results = self.query_executor()(query=g, get=FromResultSet.toList) @@ -1449,7 +1451,7 @@ def get_latest_updated_ts(self) -> int: """ results = _V(g=self.g, label=VertexTypes.Updatedtimestamp, - key=AMUNDSEN_TIMESTAMP_KEY).values('latest_timestamp').toList() + key=AMUNDSEN_TIMESTAMP_KEY).values('latest_timestmap').toList() return _safe_get(results, transform=int) @timer_with_counter @@ -1662,11 +1664,12 @@ def _convert_to_application(self, result: Mapping[str, Any]) -> Application: application_url=_safe_get(result, 'application_url'), description=_safe_get(result, 'description'), name=_safe_get(result, 'name'), - id=_safe_get(result, 'id', default='')) + id=_safe_get(result, 'id', default='') + ) def _convert_to_description(self, result: Mapping[str, Any]) -> ProgrammaticDescription: return ProgrammaticDescription(text=_safe_get(result, 'description'), - source=_safe_get(result, 'source')) + source=_safe_get(result, 'description_source')) def _convert_to_user(self, result: Mapping[str, Any]) -> User: return User(email=_safe_get(result, 'email'), @@ -1689,7 +1692,7 @@ def _convert_to_source(self, result: Mapping[str, Any]) -> Source: def _convert_to_statistics(self, result: Mapping[str, Any]) -> Stat: return Stat( - stat_type=_safe_get(result, 'stat_type'), + stat_type=_safe_get(result, 'stat_name'), stat_val=_safe_get(result, 'stat_val'), start_epoch=_safe_get(result, 'start_epoch'), end_epoch=_safe_get(result, 'end_epoch')) diff --git a/metadata_service/proxy/neptune_proxy.py b/metadata_service/proxy/neptune_proxy.py index 3959abb9..82d5e443 100644 --- a/metadata_service/proxy/neptune_proxy.py +++ b/metadata_service/proxy/neptune_proxy.py @@ -57,7 +57,7 @@ class NeptuneGremlinProxy(AbstractGremlinProxy): def __init__(self, *, host: str, port: Optional[int] = None, user: str = None, password: Optional[Union[str, boto3.session.Session]] = None, driver_remote_connection_options: Mapping[str, Any] = {}, - neptune_bulk_loader_s3_bucket_name: Optional[str] = None, + client_kwargs: Dict = dict(), **kwargs: dict) -> None: driver_remote_connection_options = dict(driver_remote_connection_options) @@ -101,11 +101,10 @@ def __init__(self, *, host: str, port: Optional[int] = None, user: str = None, # always g for Neptune driver_remote_connection_options.update(traversal_source='g') - s3_bucket_name: str = '' - if neptune_bulk_loader_s3_bucket_name is None: + try: + s3_bucket_name = client_kwargs['neptune_bulk_loader_s3_bucket_name'] # noqa: E731 + except Exception: raise NotImplementedError(f'Cannot find s3 bucket name!') - else: - s3_bucket_name = neptune_bulk_loader_s3_bucket_name # Instantiate bulk loader and graph traversal factory bulk_loader_config: Dict[str, Any] = dict(NEPTUNE_SESSION=password, NEPTUNE_URL=host, diff --git a/tests/unit/proxy/roundtrip/test_neptune_proxy.py b/tests/unit/proxy/roundtrip/test_neptune_proxy.py index 2299bd1b..1d8bc9eb 100644 --- a/tests/unit/proxy/roundtrip/test_neptune_proxy.py +++ b/tests/unit/proxy/roundtrip/test_neptune_proxy.py @@ -21,8 +21,14 @@ class NeptuneGremlinProxyTest( abstract_gremlin_proxy_test_class(), unittest.TestCase): # type: ignore def _create_gremlin_proxy(self, config: Mapping[str, Any]) -> RoundtripNeptuneGremlinProxy: # Don't use PROXY_HOST, PROXY_PORT, PROXY_PASSWORD. They might not be neptune - return RoundtripNeptuneGremlinProxy(host=config['NEPTUNE_URL'], password=config['NEPTUNE_SESSION'], - neptune_bulk_loader_s3_bucket_name=config['NEPTUNE_BULK_LOADER_S3_BUCKET_NAME']) # noqa E501 + client_kwargs = { + 'neptune_bulk_loader_s3_bucket_name': config['NEPTUNE_BULK_LOADER_S3_BUCKET_NAME'] + } + return RoundtripNeptuneGremlinProxy( + host=config['NEPTUNE_URL'], + password=config['NEPTUNE_SESSION'], + client_kwargs=client_kwargs + ) # noqa E501 def test_is_retryable(self) -> None: exception = gremlin_python.driver.protocol.GremlinServerError(dict( From e334e66d781d0f20389d3a26653bab2442e76f23 Mon Sep 17 00:00:00 2001 From: Andrew Ciambrone Date: Tue, 23 Feb 2021 09:53:30 -0500 Subject: [PATCH 2/3] correct a typo in timestamp Signed-off-by: Andrew Ciambrone --- metadata_service/proxy/gremlin_proxy.py | 2 +- metadata_service/proxy/neo4j_proxy.py | 2 +- tests/unit/proxy/test_neo4j_proxy.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata_service/proxy/gremlin_proxy.py b/metadata_service/proxy/gremlin_proxy.py index 619ad15d..34998d1f 100644 --- a/metadata_service/proxy/gremlin_proxy.py +++ b/metadata_service/proxy/gremlin_proxy.py @@ -1451,7 +1451,7 @@ def get_latest_updated_ts(self) -> int: """ results = _V(g=self.g, label=VertexTypes.Updatedtimestamp, - key=AMUNDSEN_TIMESTAMP_KEY).values('latest_timestmap').toList() + key=AMUNDSEN_TIMESTAMP_KEY).values('latest_timestamp').toList() return _safe_get(results, transform=int) @timer_with_counter diff --git a/metadata_service/proxy/neo4j_proxy.py b/metadata_service/proxy/neo4j_proxy.py index 6976c9f8..9afe6b88 100644 --- a/metadata_service/proxy/neo4j_proxy.py +++ b/metadata_service/proxy/neo4j_proxy.py @@ -818,7 +818,7 @@ def get_latest_updated_ts(self) -> Optional[int]: # None means we don't have record for neo4j, es last updated / index ts record = record.single() if record: - return record.get('ts', {}).get('latest_timestmap', 0) + return record.get('ts', {}).get('latest_timestamp', 0) else: return None diff --git a/tests/unit/proxy/test_neo4j_proxy.py b/tests/unit/proxy/test_neo4j_proxy.py index 952c42e1..5181abab 100644 --- a/tests/unit/proxy/test_neo4j_proxy.py +++ b/tests/unit/proxy/test_neo4j_proxy.py @@ -493,7 +493,7 @@ def test_get_neo4j_latest_updated_ts(self) -> None: with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute: mock_execute.return_value.single.return_value = { 'ts': { - 'latest_timestmap': '1000' + 'latest_timestamp': '1000' } } neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000) From 458268102713c6b3e297ae13075821914d16afe1 Mon Sep 17 00:00:00 2001 From: Andrew Ciambrone Date: Wed, 24 Feb 2021 16:50:24 -0500 Subject: [PATCH 3/3] rename stat_name to stat_type and type to col_type Signed-off-by: Andrew Ciambrone --- metadata_service/proxy/gremlin_proxy.py | 4 ++-- metadata_service/proxy/neo4j_proxy.py | 4 ++-- tests/unit/proxy/test_neo4j_proxy.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/metadata_service/proxy/gremlin_proxy.py b/metadata_service/proxy/gremlin_proxy.py index 34998d1f..6c887050 100644 --- a/metadata_service/proxy/gremlin_proxy.py +++ b/metadata_service/proxy/gremlin_proxy.py @@ -1152,7 +1152,7 @@ def _get_table_columns(self, *, table_uri: str) -> List[Column]: col = Column(name=_safe_get(result, 'column', 'name'), key=_safe_get(result, 'column', self.key_property_name), description=_safe_get(result, 'description', 'description'), - col_type=_safe_get(result, 'column', 'type'), + col_type=_safe_get(result, 'column', 'col_type'), sort_order=_safe_get(result, 'column', 'sort_order', transform=int), stats=_safe_get_list(result, 'stats', transform=self._convert_to_statistics) or []) cols.append(col) @@ -1692,7 +1692,7 @@ def _convert_to_source(self, result: Mapping[str, Any]) -> Source: def _convert_to_statistics(self, result: Mapping[str, Any]) -> Stat: return Stat( - stat_type=_safe_get(result, 'stat_name'), + stat_type=_safe_get(result, 'stat_type'), stat_val=_safe_get(result, 'stat_val'), start_epoch=_safe_get(result, 'start_epoch'), end_epoch=_safe_get(result, 'end_epoch')) diff --git a/metadata_service/proxy/neo4j_proxy.py b/metadata_service/proxy/neo4j_proxy.py index 9afe6b88..22f26a94 100644 --- a/metadata_service/proxy/neo4j_proxy.py +++ b/metadata_service/proxy/neo4j_proxy.py @@ -145,7 +145,7 @@ def _exec_col_query(self, table_uri: str) -> Tuple: col_stats = [] for stat in tbl_col_neo4j_record['col_stats']: col_stat = Stat( - stat_type=stat['stat_name'], + stat_type=stat['stat_type'], stat_val=stat['stat_val'], start_epoch=int(float(stat['start_epoch'])), end_epoch=int(float(stat['end_epoch'])) @@ -159,7 +159,7 @@ def _exec_col_query(self, table_uri: str) -> Tuple: last_neo4j_record = tbl_col_neo4j_record col = Column(name=tbl_col_neo4j_record['col']['name'], description=self._safe_get(tbl_col_neo4j_record, 'col_dscrpt', 'description'), - col_type=tbl_col_neo4j_record['col']['type'], + col_type=tbl_col_neo4j_record['col']['col_type'], sort_order=int(tbl_col_neo4j_record['col']['sort_order']), stats=col_stats, badges=column_badges) diff --git a/tests/unit/proxy/test_neo4j_proxy.py b/tests/unit/proxy/test_neo4j_proxy.py index 5181abab..448ab208 100644 --- a/tests/unit/proxy/test_neo4j_proxy.py +++ b/tests/unit/proxy/test_neo4j_proxy.py @@ -45,18 +45,18 @@ def setUp(self) -> None: col1 = copy.deepcopy(table_entry) # type: Dict[Any, Any] col1['col'] = {'name': 'bar_id_1', - 'type': 'varchar', + 'col_type': 'varchar', 'sort_order': 0} col1['col_dscrpt'] = {'description': 'bar col description'} - col1['col_stats'] = [{'stat_name': 'avg', 'start_epoch': 1, 'end_epoch': 1, 'stat_val': '1'}] + col1['col_stats'] = [{'stat_type': 'avg', 'start_epoch': 1, 'end_epoch': 1, 'stat_val': '1'}] col1['col_badges'] = [] col2 = copy.deepcopy(table_entry) # type: Dict[Any, Any] col2['col'] = {'name': 'bar_id_2', - 'type': 'bigint', + 'col_type': 'bigint', 'sort_order': 1} col2['col_dscrpt'] = {'description': 'bar col2 description'} - col2['col_stats'] = [{'stat_name': 'avg', 'start_epoch': 2, 'end_epoch': 2, 'stat_val': '2'}] + col2['col_stats'] = [{'stat_type': 'avg', 'start_epoch': 2, 'end_epoch': 2, 'stat_val': '2'}] col2['col_badges'] = [{'key': 'primary key', 'category': 'column'}] table_level_results = MagicMock()