Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Compatibility changes to the gremlin integration #260

Merged
merged 3 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions metadata_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import os
from typing import Dict, List, Optional, Set # noqa: F401

import boto3
from amundsen_gremlin.config import LocalGremlinConfig
from amundsen_gremlin.test_and_development_shard import shard_set_explicitly

from metadata_service.entity.badge import Badge

Expand Down Expand Up @@ -114,3 +116,39 @@ class LocalConfig(LocalGremlinConfig, Config):
'title': 'Metadata Service',
'uiversion': 3
}


# The databuilder expects this to be False currently. We are defaulting to true because the testing expects this
if bool(distutils.util.strtobool(os.environ.get('IGNORE_NEPTUNE_SHARD', 'False'))):
Copy link
Contributor Author

@AndrewCiambrone AndrewCiambrone Feb 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be at the base layer instead of the config because once get_shard is called the shard_set_explicitly no longer works.

shard_set_explicitly('')


class NeptuneConfig(LocalGremlinConfig, Config):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this to documentation if you all wish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong feelings either way on my end, but certainly we could inherit the things unrelated to neptune/gremlin setup at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could leave it here to give people who use neptune for reference.

DEBUG = False
TESTING = False
LOG_LEVEL = 'INFO'
LOCAL_HOST = '0.0.0.0'

# FORMAT: wss://<NEPTUNE_URL>:<NEPTUNE_PORT>/gremlin
PROXY_HOST = os.environ.get('PROXY_HOST')
PROXY_PORT = None
PROXY_CLIENT = PROXY_CLIENTS['NEPTUNE']
PROXY_ENCRYPTED = bool(distutils.util.strtobool(os.environ.get(PROXY_ENCRYPTED, 'True')))
PROXY_VALIDATE_SSL = bool(distutils.util.strtobool(os.environ.get(PROXY_VALIDATE_SSL, 'False')))
PROXY_PASSWORD = boto3.session.Session(region_name=os.environ.get('AWS_REGION', 'us-east-1'))

PROXY_CLIENT_KWARGS = {
'neptune_bulk_loader_s3_bucket_name': os.environ.get('S3_BUCKET_NAME'),
}

JANUS_GRAPH_URL = None

IS_STATSD_ON = bool(distutils.util.strtobool(os.environ.get(IS_STATSD_ON, 'False')))

SWAGGER_ENABLED = True
SWAGGER_TEMPLATE_PATH = os.path.join('api', 'swagger_doc', 'template.yml')
SWAGGER = {
'openapi': '3.0.2',
'title': 'Metadata Service',
'uiversion': 3
}
41 changes: 22 additions & 19 deletions metadata_service/proxy/gremlin_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,13 @@ def _V(label: Union[str, VertexTypes, VertexType], key: Optional[Union[str, Text
if key_property_name is None:
raise AssertionError('expected key_property_name')
g = g.V().has(get_label_from(label), key_property_name, key)
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard())
if get_shard():
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard())
else:
# let's support hasLabel, but need to limit it to either the test_shard (or unsharded perhaps)
g = g.V().hasLabel(get_label_from(label))
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard())
if get_shard():
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard())

# should we do this when using the V(id)? there are a couple or one case where we use it to filter so seems handy
if properties is not None:
Expand Down Expand Up @@ -524,7 +526,8 @@ def _upsert(*, executor: ExecuteQuery, execute: Callable[[ResultSet], TYPE] = Fr
id = label.id(key=key, **properties)
else:
raise AssertionError('wat') # appease mypy
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard())
if get_shard():
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard())

existing_node = executor(query=g.V(id).valueMap(True), get=FromResultSet.getOptional)
_label = get_label_from(label)
Expand Down Expand Up @@ -1025,8 +1028,7 @@ def get_users(self) -> List[User]:

@timer_with_counter
@overrides
def get_table(self, *, table_uri: str,
is_reviewer: bool = False) -> Table:
def get_table(self, *, table_uri: str, is_reviewer: bool = False) -> Table:
"""
:param table_uri: Table URI
:return: A Table object
Expand All @@ -1040,9 +1042,7 @@ def get_table(self, *, table_uri: str,
readers = self._get_table_readers(table_uri=table_uri)

users_by_type: Dict[str, List[User]] = {}
users_by_type['owner'] = sorted(
_safe_get_list(result, f'all_owners', transform=self._convert_to_user) or [],
key=attrgetter('user_id'))
users_by_type['owner'] = _safe_get_list(result, f'all_owners', transform=self._convert_to_user) or []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User_id was not always defined. I can switch the sort to be on another attribute if you all prefer.


stats = _safe_get_list(result, 'stats', transform=self._convert_to_statistics) or []

Expand Down Expand Up @@ -1087,18 +1087,18 @@ def _get_table_itself(self, *, table_uri: str) -> Mapping[str, Any]:
hasLabel(VertexTypes.Application.value.label).fold()).as_('application')
g = g.coalesce(select('table').outE(EdgeTypes.LastUpdatedAt.value.label).inV().
hasLabel(VertexTypes.Updatedtimestamp.value.label).
values('latest_timestamp').fold()).as_('timestamp')
values('timestamp').fold()).as_('timestamp')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

g = g.coalesce(select('table').inE(EdgeTypes.Tag.value.label).outV().
hasLabel(VertexTypes.Tag.value.label).fold()).as_('tags')
g = g.coalesce(select('table').outE(EdgeTypes.Source.value.label).inV().
hasLabel(VertexTypes.Source.value.label).fold()).as_('source')
g = g.coalesce(select('table').outE(EdgeTypes.Stat.value.label).inV().
hasLabel(VertexTypes.Stat.value.label).fold()).as_('stats')
g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label).
inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description')
g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label).
inV().has(VertexTypes.Description.value.label, 'source', without('user')).fold()). \
as_('programmatic_descriptions')
inV().hasLabel(VertexTypes.Description.value.label).fold()).as_('description')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was filtering on source originally but for some reason it didn't work as expected. Made it work on the descriptions labels instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it looks like the databuilder integration is creating things a bit differently? In our Neptune, all Descriptions are Description type nodes and we just distinguish user descriptions from programmatic descriptions by source attribute.

I think it's alright to do it this way if you prefer, but we should definitely add Programmatic_Description to VertexTypes so it's not a magic string.

g = g.coalesce(
select('table').out(EdgeTypes.Description.value.label).hasLabel('Programmatic_Description').fold()
).as_('programmatic_descriptions')
g = g.coalesce(select('table').inE(EdgeTypes.Read.value.label).
has('date', gte(date.today() - timedelta(days=5))).
where(outV().hasLabel(VertexTypes.User.value.label)).
Expand Down Expand Up @@ -1136,8 +1136,9 @@ def _get_table_columns(self, *, table_uri: str) -> List[Column]:
g = _V(g=self.g, label=VertexTypes.Table.value.label, key=table_uri). \
outE(EdgeTypes.Column.value.label). \
inV().hasLabel(VertexTypes.Column.value.label).as_('column')
g = g.coalesce(select('column').outE(EdgeTypes.Description.value.label).
inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description')
g = g.coalesce(
select('column').out(EdgeTypes.Description.value.label).hasLabel(VertexTypes.Description.value.label).fold()
).as_('description')
g = g.coalesce(select('column').outE(EdgeTypes.Stat.value.label).inV().
hasLabel(VertexTypes.Stat.value.label).fold()).as_('stats')
g = g.select('column', 'description', 'stats'). \
Expand Down Expand Up @@ -1397,8 +1398,9 @@ def get_popular_tables(self, *,
outV().hasLabel(VertexTypes.Cluster.value.label).as_('cluster')
g = g.inE(EdgeTypes.Cluster.value.label). \
outV().hasLabel(VertexTypes.Database.value.label).as_('database')
g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label).
inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description')
g = g.coalesce(
select('table').out(EdgeTypes.Description.value.label).hasLabel(VertexTypes.Description.value.label).fold()
).as_('description')
g = g.select('database', 'cluster', 'schema', 'table', 'description'). \
by('name').by('name').by('name').by('name').by(unfold().values('description').fold())
results = self.query_executor()(query=g, get=FromResultSet.toList)
Expand Down Expand Up @@ -1662,11 +1664,12 @@ def _convert_to_application(self, result: Mapping[str, Any]) -> Application:
application_url=_safe_get(result, 'application_url'),
description=_safe_get(result, 'description'),
name=_safe_get(result, 'name'),
id=_safe_get(result, 'id', default=''))
id=_safe_get(result, 'id', default='')
)

def _convert_to_description(self, result: Mapping[str, Any]) -> ProgrammaticDescription:
return ProgrammaticDescription(text=_safe_get(result, 'description'),
source=_safe_get(result, 'source'))
source=_safe_get(result, 'description_source'))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think description_source has been defined in databuilder for programmatic description. I would prefer to change it here to match the databuilder implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think source is more concise so I'd prefer that in a model with description already in the name, but at the very least common and databuilder should be in agreement about the field names!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have a follow up todo to change the common repo to make it consistent?


def _convert_to_user(self, result: Mapping[str, Any]) -> User:
return User(email=_safe_get(result, 'email'),
Expand Down
6 changes: 3 additions & 3 deletions metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _exec_col_query(self, table_uri: str) -> Tuple:
col_stats = []
for stat in tbl_col_neo4j_record['col_stats']:
col_stat = Stat(
stat_type=stat['stat_name'],
stat_type=stat['stat_type'],
stat_val=stat['stat_val'],
start_epoch=int(float(stat['start_epoch'])),
end_epoch=int(float(stat['end_epoch']))
Expand All @@ -159,7 +159,7 @@ def _exec_col_query(self, table_uri: str) -> Tuple:
last_neo4j_record = tbl_col_neo4j_record
col = Column(name=tbl_col_neo4j_record['col']['name'],
description=self._safe_get(tbl_col_neo4j_record, 'col_dscrpt', 'description'),
col_type=tbl_col_neo4j_record['col']['type'],
col_type=tbl_col_neo4j_record['col']['col_type'],
sort_order=int(tbl_col_neo4j_record['col']['sort_order']),
stats=col_stats,
badges=column_badges)
Expand Down Expand Up @@ -818,7 +818,7 @@ def get_latest_updated_ts(self) -> Optional[int]:
# None means we don't have record for neo4j, es last updated / index ts
record = record.single()
if record:
return record.get('ts', {}).get('latest_timestmap', 0)
return record.get('ts', {}).get('latest_timestamp', 0)
else:
return None

Expand Down
9 changes: 4 additions & 5 deletions metadata_service/proxy/neptune_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class NeptuneGremlinProxy(AbstractGremlinProxy):
def __init__(self, *, host: str, port: Optional[int] = None, user: str = None,
password: Optional[Union[str, boto3.session.Session]] = None,
driver_remote_connection_options: Mapping[str, Any] = {},
neptune_bulk_loader_s3_bucket_name: Optional[str] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no way to define this thru configuration. Decided to reuse the existing client_kwargs field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's a little magical, but if you do like follows:

options = current_app.config[config.PROXY_CLIENT_KWARGS] or {}
_proxy_client = NeptuneGremlinProxy(host=host, port=port, user=user, password=password, **options)

python should pick up any keys from the PROXY_CLIENT_KWARGS dict and use them as keyword args. So that's how neptune_bulk_loader_s3_bucket_name would be populated here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, the existing proxies don't use this (we've diverged a bit). It's probably least disruptive to just conform to the existing interface then, sure!

client_kwargs: Dict = dict(),
**kwargs: dict) -> None:

driver_remote_connection_options = dict(driver_remote_connection_options)
Expand Down Expand Up @@ -101,11 +101,10 @@ def __init__(self, *, host: str, port: Optional[int] = None, user: str = None,
# always g for Neptune
driver_remote_connection_options.update(traversal_source='g')

s3_bucket_name: str = ''
if neptune_bulk_loader_s3_bucket_name is None:
try:
s3_bucket_name = client_kwargs['neptune_bulk_loader_s3_bucket_name'] # noqa: E731
except Exception:
raise NotImplementedError(f'Cannot find s3 bucket name!')
else:
s3_bucket_name = neptune_bulk_loader_s3_bucket_name

# Instantiate bulk loader and graph traversal factory
bulk_loader_config: Dict[str, Any] = dict(NEPTUNE_SESSION=password, NEPTUNE_URL=host,
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/proxy/roundtrip/test_neptune_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ class NeptuneGremlinProxyTest(
abstract_gremlin_proxy_test_class(), unittest.TestCase): # type: ignore
def _create_gremlin_proxy(self, config: Mapping[str, Any]) -> RoundtripNeptuneGremlinProxy:
# Don't use PROXY_HOST, PROXY_PORT, PROXY_PASSWORD. They might not be neptune
return RoundtripNeptuneGremlinProxy(host=config['NEPTUNE_URL'], password=config['NEPTUNE_SESSION'],
neptune_bulk_loader_s3_bucket_name=config['NEPTUNE_BULK_LOADER_S3_BUCKET_NAME']) # noqa E501
client_kwargs = {
'neptune_bulk_loader_s3_bucket_name': config['NEPTUNE_BULK_LOADER_S3_BUCKET_NAME']
}
return RoundtripNeptuneGremlinProxy(
host=config['NEPTUNE_URL'],
password=config['NEPTUNE_SESSION'],
client_kwargs=client_kwargs
) # noqa E501

def test_is_retryable(self) -> None:
exception = gremlin_python.driver.protocol.GremlinServerError(dict(
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/proxy/test_neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ def setUp(self) -> None:

col1 = copy.deepcopy(table_entry) # type: Dict[Any, Any]
col1['col'] = {'name': 'bar_id_1',
'type': 'varchar',
'col_type': 'varchar',
'sort_order': 0}
col1['col_dscrpt'] = {'description': 'bar col description'}
col1['col_stats'] = [{'stat_name': 'avg', 'start_epoch': 1, 'end_epoch': 1, 'stat_val': '1'}]
col1['col_stats'] = [{'stat_type': 'avg', 'start_epoch': 1, 'end_epoch': 1, 'stat_val': '1'}]
col1['col_badges'] = []

col2 = copy.deepcopy(table_entry) # type: Dict[Any, Any]
col2['col'] = {'name': 'bar_id_2',
'type': 'bigint',
'col_type': 'bigint',
'sort_order': 1}
col2['col_dscrpt'] = {'description': 'bar col2 description'}
col2['col_stats'] = [{'stat_name': 'avg', 'start_epoch': 2, 'end_epoch': 2, 'stat_val': '2'}]
col2['col_stats'] = [{'stat_type': 'avg', 'start_epoch': 2, 'end_epoch': 2, 'stat_val': '2'}]
col2['col_badges'] = [{'key': 'primary key', 'category': 'column'}]

table_level_results = MagicMock()
Expand Down Expand Up @@ -493,7 +493,7 @@ def test_get_neo4j_latest_updated_ts(self) -> None:
with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
mock_execute.return_value.single.return_value = {
'ts': {
'latest_timestmap': '1000'
'latest_timestamp': '1000'
}
}
neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000)
Expand Down