diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 115a9b57b1d5d9..2cae9826cff595 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -4,7 +4,6 @@ import json import logging import os -import pathlib import sys from datetime import datetime from typing import Optional @@ -46,7 +45,7 @@ def ingest() -> None: @click.option( "-c", "--config", - type=click.Path(exists=True, dir_okay=False), + type=click.Path(dir_okay=False), help="Config file in .toml or .yaml format.", required=True, ) @@ -182,12 +181,14 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None: # main function begins logger.info("DataHub CLI version: %s", datahub_package.nice_version_name()) - config_file = pathlib.Path(config) pipeline_config = load_config_file( - config_file, squirrel_original_config=True, squirrel_field="__raw_config" + config, + squirrel_original_config=True, + squirrel_field="__raw_config", + allow_stdin=True, ) - raw_pipeline_config = pipeline_config["__raw_config"] - pipeline_config = {k: v for k, v in pipeline_config.items() if k != "__raw_config"} + raw_pipeline_config = pipeline_config.pop("__raw_config") + if test_source_connection: _test_source_connection(report_to, pipeline_config) diff --git a/metadata-ingestion/src/datahub/configuration/config_loader.py b/metadata-ingestion/src/datahub/configuration/config_loader.py index d94aead3273680..513c15ee444268 100644 --- a/metadata-ingestion/src/datahub/configuration/config_loader.py +++ b/metadata-ingestion/src/datahub/configuration/config_loader.py @@ -1,6 +1,7 @@ import io import pathlib import re +import sys from typing import Any, Dict, Union from expandvars import UnboundVariable, expandvars @@ -54,26 +55,32 @@ def load_config_file( config_file: Union[pathlib.Path, str], squirrel_original_config: bool = False, squirrel_field: str = "__orig_config", + allow_stdin: bool = False, ) -> dict: - if isinstance(config_file, str): - config_file = pathlib.Path(config_file) - if not config_file.is_file(): - raise ConfigurationError(f"Cannot open config file {config_file}") - config_mech: ConfigurationMechanism - if config_file.suffix in [".yaml", ".yml"]: + if allow_stdin and config_file == "-": + # If we're reading from stdin, we assume that the input is a YAML file. config_mech = YamlConfigurationMechanism() - elif config_file.suffix == ".toml": - config_mech = TomlConfigurationMechanism() + raw_config_file = sys.stdin.read() else: - raise ConfigurationError( - "Only .toml and .yml are supported. Cannot process file type {}".format( - config_file.suffix + if isinstance(config_file, str): + config_file = pathlib.Path(config_file) + if not config_file.is_file(): + raise ConfigurationError(f"Cannot open config file {config_file}") + + if config_file.suffix in {".yaml", ".yml"}: + config_mech = YamlConfigurationMechanism() + elif config_file.suffix == ".toml": + config_mech = TomlConfigurationMechanism() + else: + raise ConfigurationError( + "Only .toml and .yml are supported. Cannot process file type {}".format( + config_file.suffix + ) ) - ) - with config_file.open() as raw_config_fp: - raw_config_file = raw_config_fp.read() + raw_config_file = config_file.read_text() + config_fp = io.StringIO(raw_config_file) raw_config = config_mech.load_config(config_fp) config = resolve_env_variables(raw_config)