Skip to content

Commit

Permalink
more lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro93 committed May 27, 2024
1 parent 8861ffe commit 5e9c81b
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 80 deletions.
37 changes: 1 addition & 36 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
import logging
import os
import os.path
import sys
import typing
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Tuple, Type, Union

import click
import requests
from deprecated import deprecated
from requests.models import Response
from requests.sessions import Session

from datahub.cli import config_utils
Expand All @@ -22,46 +19,14 @@

log = logging.getLogger(__name__)

ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL"
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
ENV_METADATA_PORT = "DATAHUB_GMS_PORT"
ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL"
ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN"
ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID"
ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET"

config_override: Dict = {}

# TODO: Many of the methods in this file duplicate logic that already lives
# in the DataHubGraph client. We should refactor this to use the client instead.
# For the methods that aren't duplicates, that logic should be moved to the client.


def set_env_variables_override_config(url: str, token: Optional[str]) -> None:
"""Should be used to override the config when using rest emitter"""
config_override[ENV_METADATA_HOST_URL] = url
if token is not None:
config_override[ENV_METADATA_TOKEN] = token


def get_details_from_env() -> Tuple[Optional[str], Optional[str]]:
host = os.environ.get(ENV_METADATA_HOST)
port = os.environ.get(ENV_METADATA_PORT)
token = os.environ.get(ENV_METADATA_TOKEN)
protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http")
url = os.environ.get(ENV_METADATA_HOST_URL)
if port is not None:
url = f"{protocol}://{host}:{port}"
return url, token
# The reason for using host as URL is backward compatibility
# If port is not being used we assume someone is using host env var as URL
if url is None and host is not None:
log.warning(
f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead"
)
return url or host, token


def first_non_null(ls: List[Optional[str]]) -> Optional[str]:
return next((el for el in ls if el is not None and el.strip() != ""), None)

Expand Down
7 changes: 2 additions & 5 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

import logging
import os
import sys
from typing import Optional, Union
from typing import Optional

import click
import yaml
Expand All @@ -31,15 +30,13 @@ def should_skip_config() -> bool:
return get_boolean_env_variable(ENV_SKIP_CONFIG, False)


def get_client_config(as_dict: bool = True) -> dict:
def get_client_config() -> Optional[dict]:
with open(DATAHUB_CONFIG_PATH, "r") as stream:

def get_client_config(as_dict: bool = False) -> Union[Optional[DatahubClientConfig], dict]:
with open(DATAHUB_CONFIG_PATH) as stream:
try:
return yaml.safe_load(stream)
if as_dict:
return config_json
except yaml.YAMLError as exc:
click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True)
return None
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/cli/lite_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

from datahub.cli.config_utils import (
DATAHUB_ROOT_FOLDER,
DatahubConfig,
get_client_config,
persist_datahub_config,
)
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.graph.client import DatahubConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.lite.duckdb_lite_config import DuckDBLiteConfig
Expand Down Expand Up @@ -45,7 +45,7 @@ class LiteCliConfig(DatahubConfig):


def get_lite_config() -> LiteLocalConfig:
client_config_dict = get_client_config(as_dict=True)
client_config_dict = get_client_config()
lite_config = LiteCliConfig.parse_obj(client_config_dict)
return lite_config.lite

Expand Down Expand Up @@ -309,7 +309,7 @@ def search(


def write_lite_config(lite_config: LiteLocalConfig) -> None:
cli_config = get_client_config(as_dict=True)
cli_config = get_client_config()
assert isinstance(cli_config, dict)
cli_config["lite"] = lite_config.dict()
persist_datahub_config(cli_config)
Expand Down
36 changes: 25 additions & 11 deletions metadata-ingestion/src/datahub/cli/migrate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import random
import uuid
Expand All @@ -7,7 +8,7 @@
import progressbar
from avrogen.dict_wrapper import DictWrapper

from datahub.cli import delete_cli, migration_utils
from datahub.cli import cli_utils, delete_cli, migration_utils
from datahub.emitter.mce_builder import (
DEFAULT_ENV,
dataset_urn_to_key,
Expand All @@ -23,14 +24,19 @@
SchemaKey,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.client import (
DataHubGraph,
RelatedEntity,
get_default_graph,
)
from datahub.metadata.schema_classes import (
ContainerKeyClass,
ContainerPropertiesClass,
DataPlatformInstanceClass,
SystemMetadataClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.urns.urn import Urn

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -143,15 +149,17 @@ def dataplatform2instance_func(

graph = get_default_graph()

urns_to_migrate = []
urns_to_migrate: List[str] = []

# we first calculate all the urns we will be migrating
for src_entity_urn in graph.get_urns_by_filter(platform=platform, env=env):
key = dataset_urn_to_key(src_entity_urn)
assert key
# Does this urn already have a platform instance associated with it?
response = graph.get_aspects_for_entity(
entity_urn=src_entity_urn, aspects=["dataPlatformInstance"], typed=True
entity_urn=src_entity_urn,
aspects=["dataPlatformInstance"],
aspect_types=[DataPlatformInstanceClass],
)
if "dataPlatformInstance" in response:
assert isinstance(
Expand Down Expand Up @@ -235,8 +243,8 @@ def dataplatform2instance_func(
aspect_name = migration_utils.get_aspect_name_from_relationship(
relationshipType, entity_type
)
aspect_map = graph.get_aspects_for_entity(
target_urn, aspects=[aspect_name], typed=True
aspect_map = cli_utils.get_aspects_for_entity(
graph._session, graph.config.server, target_urn, aspects=[aspect_name]
)
if aspect_name in aspect_map:
aspect = aspect_map[aspect_name]
Expand Down Expand Up @@ -434,23 +442,29 @@ def process_container_relationships(
migration_report: MigrationReport,
rest_emitter: DatahubRestEmitter,
) -> None:
relationships = migration_utils.get_incoming_relationships(urn=src_urn)
relationships: Iterable[RelatedEntity] = migration_utils.get_incoming_relationships(
urn=src_urn
)
client = get_default_graph()
for relationship in relationships:
log.debug(f"Incoming Relationship: {relationship}")
target_urn = relationship.urn
target_urn: str = relationship.urn

# We should use the new id if we already migrated it
if target_urn in container_id_map:
target_urn = container_id_map.get(target_urn)
target_urn = container_id_map[target_urn]

entity_type = _get_type_from_urn(target_urn)
relationshipType = relationship.relationship_type
aspect_name = migration_utils.get_aspect_name_from_relationship(
relationshipType, entity_type
)
aspect_map = client.get_aspects_for_entity(
target_urn, aspects=[aspect_name], typed=True
aspect_map = cli_utils.get_aspects_for_entity(
client._session,
client.config.server,
target_urn,
aspects=[aspect_name],
typed=True,
)
if aspect_name in aspect_map:
aspect = aspect_map[aspect_name]
Expand Down
9 changes: 7 additions & 2 deletions metadata-ingestion/src/datahub/cli/migration_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import uuid
from typing import Dict, Iterable, List
from typing import Iterable, List

from avrogen.dict_wrapper import DictWrapper

Expand Down Expand Up @@ -243,8 +243,13 @@ def clone_aspect(
run_id: str = str(uuid.uuid4()),
dry_run: bool = False,
) -> Iterable[MetadataChangeProposalWrapper]:
client = get_default_graph()
aspect_map = cli_utils.get_aspects_for_entity(
entity_urn=src_urn, aspects=aspect_names, typed=True
client._session,
client.config.server,
entity_urn=src_urn,
aspects=aspect_names,
typed=True,
)

if aspect_names is not None:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/timeline_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_timeline(
) -> Any:
client = get_default_graph()
session = client._session
gms_host = client.config.server
host = client.config.server
if urn.startswith("urn%3A"):
# we assume the urn is already encoded
encoded_urn: str = urn
Expand Down
45 changes: 39 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
import os
import sys
import textwrap
import time
from dataclasses import dataclass
Expand All @@ -27,7 +28,6 @@
from deprecated import deprecated
from pydantic import BaseModel, ValidationError
from requests.models import HTTPError
from requests.sessions import Session

from datahub.cli import config_utils
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
Expand Down Expand Up @@ -86,7 +86,10 @@
_GRAPH_DUMMY_RUN_ID = "__datahub-graph-client"

ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL"
ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN"
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
ENV_METADATA_PORT = "DATAHUB_GMS_PORT"
ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL"


class DatahubClientConfig(ConfigModel):
Expand Down Expand Up @@ -572,8 +575,8 @@ def _relationships_endpoint(self):
def _aspect_count_endpoint(self):
return f"{self.config.server}/aspects?action=getCount"

def _session(self) -> Session:
return super()._session
# def _session(self) -> Session:
# return super()._session

def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]:
"""Retrieve a domain urn based on its name. Returns None if there is no match found"""
Expand Down Expand Up @@ -1225,14 +1228,44 @@ class DatahubConfig(BaseModel):
gms: DatahubClientConfig


config_override: Dict = {}


def set_env_variables_override_config(url: str, token: Optional[str]) -> None:
"""Should be used to override the config when using rest emitter"""
config_override[ENV_METADATA_HOST_URL] = url
if token is not None:
config_override[ENV_METADATA_TOKEN] = token


def get_details_from_env() -> Tuple[Optional[str], Optional[str]]:
host = os.environ.get(ENV_METADATA_HOST)
port = os.environ.get(ENV_METADATA_PORT)
token = os.environ.get(ENV_METADATA_TOKEN)
protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http")
url = os.environ.get(ENV_METADATA_HOST_URL)
if port is not None:
url = f"{protocol}://{host}:{port}"
return url, token
# The reason for using host as URL is backward compatibility
# If port is not being used we assume someone is using host env var as URL
if url is None and host is not None:
logger.warning(
f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead"
)
return url or host, token


def load_client_config() -> DatahubClientConfig:
try:
ensure_datahub_config()
client_config_dict = config_utils.get_client_config(as_dict=True)
client_config_dict = config_utils.get_client_config()
datahub_config = DatahubConfig.parse_obj(client_config_dict).gms
return datahub_config
except ValidationError as e:
click.echo(f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}")
click.echo(
f"Received error, please check your {config_utils.CONDENSED_DATAHUB_CONFIG_PATH}"
)
click.echo(e, err=True)
sys.exit(1)

Expand Down Expand Up @@ -1267,7 +1300,7 @@ def write_gms_config(
config = DatahubConfig(gms=DatahubClientConfig(server=host, token=token))
if merge_with_previous:
try:
previous_config = config_utils.get_client_config(as_dict=True)
previous_config = config_utils.get_client_config()
assert isinstance(previous_config, dict)
except Exception as e:
# ok to fail on this
Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from enum import auto
from typing import Optional, Union

from datahub.cli.cli_utils import set_env_variables_override_config
from datahub.configuration.common import (
ConfigEnum,
ConfigurationError,
Expand All @@ -23,7 +22,10 @@
WriteCallback,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.graph.client import (
DatahubClientConfig,
set_env_variables_override_config,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Expand Down Expand Up @@ -209,7 +210,12 @@ def _get_lineage_mcp(

# extract the old lineage and save it for the new mcp
if preserve_upstream:

client = get_default_graph()

old_upstream_lineage = get_aspects_for_entity(
client._session,
client.config.server,
entity_urn=entity_urn,
aspects=["upstreamLineage"],
typed=True,
Expand Down
Loading

0 comments on commit 5e9c81b

Please sign in to comment.