diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 832f9eb2d7368b..b00205fe62c332 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -2,15 +2,12 @@ import logging import os import os.path -import sys import typing from datetime import datetime -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type, Union import click import requests -from deprecated import deprecated -from requests.models import Response from requests.sessions import Session from datahub.cli import config_utils @@ -22,46 +19,14 @@ log = logging.getLogger(__name__) -ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL" -ENV_METADATA_HOST = "DATAHUB_GMS_HOST" -ENV_METADATA_PORT = "DATAHUB_GMS_PORT" -ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL" -ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN" ENV_DATAHUB_SYSTEM_CLIENT_ID = "DATAHUB_SYSTEM_CLIENT_ID" ENV_DATAHUB_SYSTEM_CLIENT_SECRET = "DATAHUB_SYSTEM_CLIENT_SECRET" -config_override: Dict = {} - # TODO: Many of the methods in this file duplicate logic that already lives # in the DataHubGraph client. We should refactor this to use the client instead. # For the methods that aren't duplicates, that logic should be moved to the client. -def set_env_variables_override_config(url: str, token: Optional[str]) -> None: - """Should be used to override the config when using rest emitter""" - config_override[ENV_METADATA_HOST_URL] = url - if token is not None: - config_override[ENV_METADATA_TOKEN] = token - - -def get_details_from_env() -> Tuple[Optional[str], Optional[str]]: - host = os.environ.get(ENV_METADATA_HOST) - port = os.environ.get(ENV_METADATA_PORT) - token = os.environ.get(ENV_METADATA_TOKEN) - protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http") - url = os.environ.get(ENV_METADATA_HOST_URL) - if port is not None: - url = f"{protocol}://{host}:{port}" - return url, token - # The reason for using host as URL is backward compatibility - # If port is not being used we assume someone is using host env var as URL - if url is None and host is not None: - log.warning( - f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead" - ) - return url or host, token - - def first_non_null(ls: List[Optional[str]]) -> Optional[str]: return next((el for el in ls if el is not None and el.strip() != ""), None) diff --git a/metadata-ingestion/src/datahub/cli/config_utils.py b/metadata-ingestion/src/datahub/cli/config_utils.py index cdf702b358b14b..a397742a2996c4 100644 --- a/metadata-ingestion/src/datahub/cli/config_utils.py +++ b/metadata-ingestion/src/datahub/cli/config_utils.py @@ -4,8 +4,7 @@ import logging import os -import sys -from typing import Optional, Union +from typing import Optional import click import yaml @@ -31,15 +30,13 @@ def should_skip_config() -> bool: return get_boolean_env_variable(ENV_SKIP_CONFIG, False) -def get_client_config(as_dict: bool = True) -> dict: +def get_client_config() -> Optional[dict]: with open(DATAHUB_CONFIG_PATH, "r") as stream: def get_client_config(as_dict: bool = False) -> Union[Optional[DatahubClientConfig], dict]: with open(DATAHUB_CONFIG_PATH) as stream: try: return yaml.safe_load(stream) - if as_dict: - return config_json except yaml.YAMLError as exc: click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True) return None diff --git a/metadata-ingestion/src/datahub/cli/lite_cli.py b/metadata-ingestion/src/datahub/cli/lite_cli.py index 7e2ad23a7753f4..7000cdbd730947 100644 --- a/metadata-ingestion/src/datahub/cli/lite_cli.py +++ b/metadata-ingestion/src/datahub/cli/lite_cli.py @@ -11,12 +11,12 @@ from datahub.cli.config_utils import ( DATAHUB_ROOT_FOLDER, - DatahubConfig, get_client_config, persist_datahub_config, ) from datahub.ingestion.api.common import PipelineContext, RecordEnvelope from datahub.ingestion.api.sink import NoopWriteCallback +from datahub.ingestion.graph.client import DatahubConfig from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.sink.file import FileSink, FileSinkConfig from datahub.lite.duckdb_lite_config import DuckDBLiteConfig @@ -45,7 +45,7 @@ class LiteCliConfig(DatahubConfig): def get_lite_config() -> LiteLocalConfig: - client_config_dict = get_client_config(as_dict=True) + client_config_dict = get_client_config() lite_config = LiteCliConfig.parse_obj(client_config_dict) return lite_config.lite @@ -309,7 +309,7 @@ def search( def write_lite_config(lite_config: LiteLocalConfig) -> None: - cli_config = get_client_config(as_dict=True) + cli_config = get_client_config() assert isinstance(cli_config, dict) cli_config["lite"] = lite_config.dict() persist_datahub_config(cli_config) diff --git a/metadata-ingestion/src/datahub/cli/migrate.py b/metadata-ingestion/src/datahub/cli/migrate.py index 5e1024de817293..ea5375c9471283 100644 --- a/metadata-ingestion/src/datahub/cli/migrate.py +++ b/metadata-ingestion/src/datahub/cli/migrate.py @@ -1,3 +1,4 @@ +import json import logging import random import uuid @@ -7,7 +8,7 @@ import progressbar from avrogen.dict_wrapper import DictWrapper -from datahub.cli import delete_cli, migration_utils +from datahub.cli import cli_utils, delete_cli, migration_utils from datahub.emitter.mce_builder import ( DEFAULT_ENV, dataset_urn_to_key, @@ -23,7 +24,11 @@ SchemaKey, ) from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.ingestion.graph.client import DataHubGraph, get_default_graph +from datahub.ingestion.graph.client import ( + DataHubGraph, + RelatedEntity, + get_default_graph, +) from datahub.metadata.schema_classes import ( ContainerKeyClass, ContainerPropertiesClass, @@ -31,6 +36,7 @@ SystemMetadataClass, ) from datahub.telemetry import telemetry +from datahub.utilities.urns.urn import Urn log = logging.getLogger(__name__) @@ -143,7 +149,7 @@ def dataplatform2instance_func( graph = get_default_graph() - urns_to_migrate = [] + urns_to_migrate: List[str] = [] # we first calculate all the urns we will be migrating for src_entity_urn in graph.get_urns_by_filter(platform=platform, env=env): @@ -151,7 +157,9 @@ def dataplatform2instance_func( assert key # Does this urn already have a platform instance associated with it? response = graph.get_aspects_for_entity( - entity_urn=src_entity_urn, aspects=["dataPlatformInstance"], typed=True + entity_urn=src_entity_urn, + aspects=["dataPlatformInstance"], + aspect_types=[DataPlatformInstanceClass], ) if "dataPlatformInstance" in response: assert isinstance( @@ -235,8 +243,8 @@ def dataplatform2instance_func( aspect_name = migration_utils.get_aspect_name_from_relationship( relationshipType, entity_type ) - aspect_map = graph.get_aspects_for_entity( - target_urn, aspects=[aspect_name], typed=True + aspect_map = cli_utils.get_aspects_for_entity( + graph._session, graph.config.server, target_urn, aspects=[aspect_name] ) if aspect_name in aspect_map: aspect = aspect_map[aspect_name] @@ -434,23 +442,29 @@ def process_container_relationships( migration_report: MigrationReport, rest_emitter: DatahubRestEmitter, ) -> None: - relationships = migration_utils.get_incoming_relationships(urn=src_urn) + relationships: Iterable[RelatedEntity] = migration_utils.get_incoming_relationships( + urn=src_urn + ) client = get_default_graph() for relationship in relationships: log.debug(f"Incoming Relationship: {relationship}") - target_urn = relationship.urn + target_urn: str = relationship.urn # We should use the new id if we already migrated it if target_urn in container_id_map: - target_urn = container_id_map.get(target_urn) + target_urn = container_id_map[target_urn] entity_type = _get_type_from_urn(target_urn) relationshipType = relationship.relationship_type aspect_name = migration_utils.get_aspect_name_from_relationship( relationshipType, entity_type ) - aspect_map = client.get_aspects_for_entity( - target_urn, aspects=[aspect_name], typed=True + aspect_map = cli_utils.get_aspects_for_entity( + client._session, + client.config.server, + target_urn, + aspects=[aspect_name], + typed=True, ) if aspect_name in aspect_map: aspect = aspect_map[aspect_name] diff --git a/metadata-ingestion/src/datahub/cli/migration_utils.py b/metadata-ingestion/src/datahub/cli/migration_utils.py index 0e4032172a788c..a3dfcfe2ac4034 100644 --- a/metadata-ingestion/src/datahub/cli/migration_utils.py +++ b/metadata-ingestion/src/datahub/cli/migration_utils.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import Dict, Iterable, List +from typing import Iterable, List from avrogen.dict_wrapper import DictWrapper @@ -243,8 +243,13 @@ def clone_aspect( run_id: str = str(uuid.uuid4()), dry_run: bool = False, ) -> Iterable[MetadataChangeProposalWrapper]: + client = get_default_graph() aspect_map = cli_utils.get_aspects_for_entity( - entity_urn=src_urn, aspects=aspect_names, typed=True + client._session, + client.config.server, + entity_urn=src_urn, + aspects=aspect_names, + typed=True, ) if aspect_names is not None: diff --git a/metadata-ingestion/src/datahub/cli/timeline_cli.py b/metadata-ingestion/src/datahub/cli/timeline_cli.py index 83e5f1cbcf8946..63e05aa65d9a5b 100644 --- a/metadata-ingestion/src/datahub/cli/timeline_cli.py +++ b/metadata-ingestion/src/datahub/cli/timeline_cli.py @@ -65,7 +65,7 @@ def get_timeline( ) -> Any: client = get_default_graph() session = client._session - gms_host = client.config.server + host = client.config.server if urn.startswith("urn%3A"): # we assume the urn is already encoded encoded_urn: str = urn diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index e4663d8351f09b..704481d36d9bda 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -4,6 +4,7 @@ import json import logging import os +import sys import textwrap import time from dataclasses import dataclass @@ -27,7 +28,6 @@ from deprecated import deprecated from pydantic import BaseModel, ValidationError from requests.models import HTTPError -from requests.sessions import Session from datahub.cli import config_utils from datahub.configuration.common import ConfigModel, GraphError, OperationalError @@ -86,7 +86,10 @@ _GRAPH_DUMMY_RUN_ID = "__datahub-graph-client" ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL" +ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN" ENV_METADATA_HOST = "DATAHUB_GMS_HOST" +ENV_METADATA_PORT = "DATAHUB_GMS_PORT" +ENV_METADATA_PROTOCOL = "DATAHUB_GMS_PROTOCOL" class DatahubClientConfig(ConfigModel): @@ -572,8 +575,8 @@ def _relationships_endpoint(self): def _aspect_count_endpoint(self): return f"{self.config.server}/aspects?action=getCount" - def _session(self) -> Session: - return super()._session + # def _session(self) -> Session: + # return super()._session def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]: """Retrieve a domain urn based on its name. Returns None if there is no match found""" @@ -1225,14 +1228,44 @@ class DatahubConfig(BaseModel): gms: DatahubClientConfig +config_override: Dict = {} + + +def set_env_variables_override_config(url: str, token: Optional[str]) -> None: + """Should be used to override the config when using rest emitter""" + config_override[ENV_METADATA_HOST_URL] = url + if token is not None: + config_override[ENV_METADATA_TOKEN] = token + + +def get_details_from_env() -> Tuple[Optional[str], Optional[str]]: + host = os.environ.get(ENV_METADATA_HOST) + port = os.environ.get(ENV_METADATA_PORT) + token = os.environ.get(ENV_METADATA_TOKEN) + protocol = os.environ.get(ENV_METADATA_PROTOCOL, "http") + url = os.environ.get(ENV_METADATA_HOST_URL) + if port is not None: + url = f"{protocol}://{host}:{port}" + return url, token + # The reason for using host as URL is backward compatibility + # If port is not being used we assume someone is using host env var as URL + if url is None and host is not None: + logger.warning( + f"Do not use {ENV_METADATA_HOST} as URL. Use {ENV_METADATA_HOST_URL} instead" + ) + return url or host, token + + def load_client_config() -> DatahubClientConfig: try: ensure_datahub_config() - client_config_dict = config_utils.get_client_config(as_dict=True) + client_config_dict = config_utils.get_client_config() datahub_config = DatahubConfig.parse_obj(client_config_dict).gms return datahub_config except ValidationError as e: - click.echo(f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}") + click.echo( + f"Received error, please check your {config_utils.CONDENSED_DATAHUB_CONFIG_PATH}" + ) click.echo(e, err=True) sys.exit(1) @@ -1267,7 +1300,7 @@ def write_gms_config( config = DatahubConfig(gms=DatahubClientConfig(server=host, token=token)) if merge_with_previous: try: - previous_config = config_utils.get_client_config(as_dict=True) + previous_config = config_utils.get_client_config() assert isinstance(previous_config, dict) except Exception as e: # ok to fail on this diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 007b7487cb6a4e..d995c4f1f1a5e1 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -7,7 +7,6 @@ from enum import auto from typing import Optional, Union -from datahub.cli.cli_utils import set_env_variables_override_config from datahub.configuration.common import ( ConfigEnum, ConfigurationError, @@ -23,7 +22,10 @@ WriteCallback, ) from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.graph.client import DatahubClientConfig +from datahub.ingestion.graph.client import ( + DatahubClientConfig, + set_env_variables_override_config, +) from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py index 8bd2e70b2d478c..08ed7677c7ab4c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py @@ -35,6 +35,7 @@ auto_workunit_reporter, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import get_default_graph from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( FineGrainedLineageDownstreamType, FineGrainedLineageUpstreamType, @@ -209,7 +210,12 @@ def _get_lineage_mcp( # extract the old lineage and save it for the new mcp if preserve_upstream: + + client = get_default_graph() + old_upstream_lineage = get_aspects_for_entity( + client._session, + client.config.server, entity_urn=entity_urn, aspects=["upstreamLineage"], typed=True, diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index 6e1ab8ff4ad7ad..cdb7df57efe798 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -12,7 +12,6 @@ from termcolor import colored from datahub import __version__ -from datahub.cli import cli_utils from datahub.ingestion.graph.client import DataHubGraph, load_client_config log = logging.getLogger(__name__) @@ -101,16 +100,18 @@ async def get_github_stats(): return (latest_server_version, latest_server_date) -async def get_server_config(gms_url: str, token: str) -> dict: +async def get_server_config(gms_url: str, token: Optional[str]) -> dict: import aiohttp - async with aiohttp.ClientSession( - headers={ - "X-RestLi-Protocol-Version": "2.0.0", - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - } - ) as session: + headers = { + "X-RestLi-Protocol-Version": "2.0.0", + "Content-Type": "application/json", + } + + if token: + headers["Authorization"] = f"Bearer {token}" + + async with aiohttp.ClientSession() as session: config_endpoint = f"{gms_url}/config" async with session.get(config_endpoint) as dh_response: dh_response_json = await dh_response.json() diff --git a/metadata-ingestion/tests/unit/test_cli_utils.py b/metadata-ingestion/tests/unit/test_cli_utils.py index bc1826d422e38a..68cb985af47340 100644 --- a/metadata-ingestion/tests/unit/test_cli_utils.py +++ b/metadata-ingestion/tests/unit/test_cli_utils.py @@ -2,6 +2,7 @@ from unittest import mock from datahub.cli import cli_utils +from datahub.ingestion.graph.client import get_details_from_env def test_first_non_null(): @@ -16,14 +17,14 @@ def test_first_non_null(): @mock.patch.dict(os.environ, {"DATAHUB_GMS_HOST": "http://localhost:9092"}) def test_correct_url_when_gms_host_in_old_format(): - assert cli_utils.get_details_from_env() == ("http://localhost:9092", None) + assert get_details_from_env() == ("http://localhost:9092", None) @mock.patch.dict( os.environ, {"DATAHUB_GMS_HOST": "localhost", "DATAHUB_GMS_PORT": "8080"} ) def test_correct_url_when_gms_host_and_port_set(): - assert cli_utils.get_details_from_env() == ("http://localhost:8080", None) + assert get_details_from_env() == ("http://localhost:8080", None) @mock.patch.dict( @@ -35,7 +36,7 @@ def test_correct_url_when_gms_host_and_port_set(): }, ) def test_correct_url_when_gms_host_port_url_set(): - assert cli_utils.get_details_from_env() == ("http://localhost:8080", None) + assert get_details_from_env() == ("http://localhost:8080", None) @mock.patch.dict( @@ -48,7 +49,7 @@ def test_correct_url_when_gms_host_port_url_set(): }, ) def test_correct_url_when_gms_host_port_url_protocol_set(): - assert cli_utils.get_details_from_env() == ("https://localhost:8080", None) + assert get_details_from_env() == ("https://localhost:8080", None) @mock.patch.dict( @@ -58,7 +59,7 @@ def test_correct_url_when_gms_host_port_url_protocol_set(): }, ) def test_correct_url_when_url_set(): - assert cli_utils.get_details_from_env() == ("https://example.com", None) + assert get_details_from_env() == ("https://example.com", None) def test_fixup_gms_url():