-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Compatibility changes to the gremlin integration #260
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,9 @@ | |
import os | ||
from typing import Dict, List, Optional, Set # noqa: F401 | ||
|
||
import boto3 | ||
from amundsen_gremlin.config import LocalGremlinConfig | ||
from amundsen_gremlin.test_and_development_shard import shard_set_explicitly | ||
|
||
from metadata_service.entity.badge import Badge | ||
|
||
|
@@ -114,3 +116,39 @@ class LocalConfig(LocalGremlinConfig, Config): | |
'title': 'Metadata Service', | ||
'uiversion': 3 | ||
} | ||
|
||
|
||
# The databuilder expects this to be False currently. We are defaulting to true because the testing expects this | ||
if bool(distutils.util.strtobool(os.environ.get('IGNORE_NEPTUNE_SHARD', 'False'))): | ||
shard_set_explicitly('') | ||
|
||
|
||
class NeptuneConfig(LocalGremlinConfig, Config): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can move this to documentation if you all wish. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No strong feelings either way on my end, but certainly we could inherit the things unrelated to neptune/gremlin setup at least. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could leave it here to give people who use neptune for reference. |
||
DEBUG = False | ||
TESTING = False | ||
LOG_LEVEL = 'INFO' | ||
LOCAL_HOST = '0.0.0.0' | ||
|
||
# FORMAT: wss://<NEPTUNE_URL>:<NEPTUNE_PORT>/gremlin | ||
PROXY_HOST = os.environ.get('PROXY_HOST') | ||
PROXY_PORT = None | ||
PROXY_CLIENT = PROXY_CLIENTS['NEPTUNE'] | ||
PROXY_ENCRYPTED = bool(distutils.util.strtobool(os.environ.get(PROXY_ENCRYPTED, 'True'))) | ||
PROXY_VALIDATE_SSL = bool(distutils.util.strtobool(os.environ.get(PROXY_VALIDATE_SSL, 'False'))) | ||
PROXY_PASSWORD = boto3.session.Session(region_name=os.environ.get('AWS_REGION', 'us-east-1')) | ||
|
||
PROXY_CLIENT_KWARGS = { | ||
'neptune_bulk_loader_s3_bucket_name': os.environ.get('S3_BUCKET_NAME'), | ||
} | ||
|
||
JANUS_GRAPH_URL = None | ||
|
||
IS_STATSD_ON = bool(distutils.util.strtobool(os.environ.get(IS_STATSD_ON, 'False'))) | ||
|
||
SWAGGER_ENABLED = True | ||
SWAGGER_TEMPLATE_PATH = os.path.join('api', 'swagger_doc', 'template.yml') | ||
SWAGGER = { | ||
'openapi': '3.0.2', | ||
'title': 'Metadata Service', | ||
'uiversion': 3 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -456,11 +456,13 @@ def _V(label: Union[str, VertexTypes, VertexType], key: Optional[Union[str, Text | |
if key_property_name is None: | ||
raise AssertionError('expected key_property_name') | ||
g = g.V().has(get_label_from(label), key_property_name, key) | ||
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) | ||
if get_shard(): | ||
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) | ||
else: | ||
# let's support hasLabel, but need to limit it to either the test_shard (or unsharded perhaps) | ||
g = g.V().hasLabel(get_label_from(label)) | ||
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) | ||
if get_shard(): | ||
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) | ||
|
||
# should we do this when using the V(id)? there are a couple or one case where we use it to filter so seems handy | ||
if properties is not None: | ||
|
@@ -524,7 +526,8 @@ def _upsert(*, executor: ExecuteQuery, execute: Callable[[ResultSet], TYPE] = Fr | |
id = label.id(key=key, **properties) | ||
else: | ||
raise AssertionError('wat') # appease mypy | ||
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) | ||
if get_shard(): | ||
properties.setdefault(WellKnownProperties.TestShard.value.name, get_shard()) | ||
|
||
existing_node = executor(query=g.V(id).valueMap(True), get=FromResultSet.getOptional) | ||
_label = get_label_from(label) | ||
|
@@ -1025,8 +1028,7 @@ def get_users(self) -> List[User]: | |
|
||
@timer_with_counter | ||
@overrides | ||
def get_table(self, *, table_uri: str, | ||
is_reviewer: bool = False) -> Table: | ||
def get_table(self, *, table_uri: str, is_reviewer: bool = False) -> Table: | ||
""" | ||
:param table_uri: Table URI | ||
:return: A Table object | ||
|
@@ -1040,9 +1042,7 @@ def get_table(self, *, table_uri: str, | |
readers = self._get_table_readers(table_uri=table_uri) | ||
|
||
users_by_type: Dict[str, List[User]] = {} | ||
users_by_type['owner'] = sorted( | ||
_safe_get_list(result, f'all_owners', transform=self._convert_to_user) or [], | ||
key=attrgetter('user_id')) | ||
users_by_type['owner'] = _safe_get_list(result, f'all_owners', transform=self._convert_to_user) or [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. User_id was not always defined. I can switch the sort to be on another attribute if you all prefer. |
||
|
||
stats = _safe_get_list(result, 'stats', transform=self._convert_to_statistics) or [] | ||
|
||
|
@@ -1087,18 +1087,18 @@ def _get_table_itself(self, *, table_uri: str) -> Mapping[str, Any]: | |
hasLabel(VertexTypes.Application.value.label).fold()).as_('application') | ||
g = g.coalesce(select('table').outE(EdgeTypes.LastUpdatedAt.value.label).inV(). | ||
hasLabel(VertexTypes.Updatedtimestamp.value.label). | ||
values('latest_timestamp').fold()).as_('timestamp') | ||
values('timestamp').fold()).as_('timestamp') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
g = g.coalesce(select('table').inE(EdgeTypes.Tag.value.label).outV(). | ||
hasLabel(VertexTypes.Tag.value.label).fold()).as_('tags') | ||
g = g.coalesce(select('table').outE(EdgeTypes.Source.value.label).inV(). | ||
hasLabel(VertexTypes.Source.value.label).fold()).as_('source') | ||
g = g.coalesce(select('table').outE(EdgeTypes.Stat.value.label).inV(). | ||
hasLabel(VertexTypes.Stat.value.label).fold()).as_('stats') | ||
g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label). | ||
inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description') | ||
g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label). | ||
inV().has(VertexTypes.Description.value.label, 'source', without('user')).fold()). \ | ||
as_('programmatic_descriptions') | ||
inV().hasLabel(VertexTypes.Description.value.label).fold()).as_('description') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was filtering on source originally but for some reason it didn't work as expected. Made it work on the descriptions labels instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, it looks like the databuilder integration is creating things a bit differently? In our Neptune, all Descriptions are I think it's alright to do it this way if you prefer, but we should definitely add |
||
g = g.coalesce( | ||
select('table').out(EdgeTypes.Description.value.label).hasLabel('Programmatic_Description').fold() | ||
).as_('programmatic_descriptions') | ||
g = g.coalesce(select('table').inE(EdgeTypes.Read.value.label). | ||
has('date', gte(date.today() - timedelta(days=5))). | ||
where(outV().hasLabel(VertexTypes.User.value.label)). | ||
|
@@ -1136,8 +1136,9 @@ def _get_table_columns(self, *, table_uri: str) -> List[Column]: | |
g = _V(g=self.g, label=VertexTypes.Table.value.label, key=table_uri). \ | ||
outE(EdgeTypes.Column.value.label). \ | ||
inV().hasLabel(VertexTypes.Column.value.label).as_('column') | ||
g = g.coalesce(select('column').outE(EdgeTypes.Description.value.label). | ||
inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description') | ||
g = g.coalesce( | ||
select('column').out(EdgeTypes.Description.value.label).hasLabel(VertexTypes.Description.value.label).fold() | ||
).as_('description') | ||
g = g.coalesce(select('column').outE(EdgeTypes.Stat.value.label).inV(). | ||
hasLabel(VertexTypes.Stat.value.label).fold()).as_('stats') | ||
g = g.select('column', 'description', 'stats'). \ | ||
|
@@ -1397,8 +1398,9 @@ def get_popular_tables(self, *, | |
outV().hasLabel(VertexTypes.Cluster.value.label).as_('cluster') | ||
g = g.inE(EdgeTypes.Cluster.value.label). \ | ||
outV().hasLabel(VertexTypes.Database.value.label).as_('database') | ||
g = g.coalesce(select('table').outE(EdgeTypes.Description.value.label). | ||
inV().has(VertexTypes.Description.value.label, 'source', 'user').fold()).as_('description') | ||
g = g.coalesce( | ||
select('table').out(EdgeTypes.Description.value.label).hasLabel(VertexTypes.Description.value.label).fold() | ||
).as_('description') | ||
g = g.select('database', 'cluster', 'schema', 'table', 'description'). \ | ||
by('name').by('name').by('name').by('name').by(unfold().values('description').fold()) | ||
results = self.query_executor()(query=g, get=FromResultSet.toList) | ||
|
@@ -1662,11 +1664,12 @@ def _convert_to_application(self, result: Mapping[str, Any]) -> Application: | |
application_url=_safe_get(result, 'application_url'), | ||
description=_safe_get(result, 'description'), | ||
name=_safe_get(result, 'name'), | ||
id=_safe_get(result, 'id', default='')) | ||
id=_safe_get(result, 'id', default='') | ||
) | ||
|
||
def _convert_to_description(self, result: Mapping[str, Any]) -> ProgrammaticDescription: | ||
return ProgrammaticDescription(text=_safe_get(result, 'description'), | ||
source=_safe_get(result, 'source')) | ||
source=_safe_get(result, 'description_source')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm I'd claim this is a bug with the databuilder impl again: https://github.com/amundsen-io/amundsencommon/blob/master/amundsen_common/models/table.py#L139 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we have a follow up todo to change the common repo to make it consistent? |
||
|
||
def _convert_to_user(self, result: Mapping[str, Any]) -> User: | ||
return User(email=_safe_get(result, 'email'), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ class NeptuneGremlinProxy(AbstractGremlinProxy): | |
def __init__(self, *, host: str, port: Optional[int] = None, user: str = None, | ||
password: Optional[Union[str, boto3.session.Session]] = None, | ||
driver_remote_connection_options: Mapping[str, Any] = {}, | ||
neptune_bulk_loader_s3_bucket_name: Optional[str] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was no way to define this thru configuration. Decided to reuse the existing client_kwargs field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, it's a little magical, but if you do like follows:
python should pick up any keys from the PROXY_CLIENT_KWARGS dict and use them as keyword args. So that's how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you suggesting I make those changes here: https://github.com/amundsen-io/amundsenmetadatalibrary/blob/master/metadata_service/proxy/__init__.py#L41? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see, the existing proxies don't use this (we've diverged a bit). It's probably least disruptive to just conform to the existing interface then, sure! |
||
client_kwargs: Dict = dict(), | ||
**kwargs: dict) -> None: | ||
|
||
driver_remote_connection_options = dict(driver_remote_connection_options) | ||
|
@@ -101,11 +101,10 @@ def __init__(self, *, host: str, port: Optional[int] = None, user: str = None, | |
# always g for Neptune | ||
driver_remote_connection_options.update(traversal_source='g') | ||
|
||
s3_bucket_name: str = '' | ||
if neptune_bulk_loader_s3_bucket_name is None: | ||
try: | ||
s3_bucket_name = client_kwargs['neptune_bulk_loader_s3_bucket_name'] # noqa: E731 | ||
except Exception: | ||
raise NotImplementedError(f'Cannot find s3 bucket name!') | ||
else: | ||
s3_bucket_name = neptune_bulk_loader_s3_bucket_name | ||
|
||
# Instantiate bulk loader and graph traversal factory | ||
bulk_loader_config: Dict[str, Any] = dict(NEPTUNE_SESSION=password, NEPTUNE_URL=host, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be at the base layer instead of the config because once get_shard is called the shard_set_explicitly no longer works.