Skip to content

Commit

Permalink
feat(ingest): support reading config file from stdin (datahub-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Sep 8, 2022
1 parent 827ac88 commit dfa05fa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
13 changes: 7 additions & 6 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import json
import logging
import os
import pathlib
import sys
from datetime import datetime
from typing import Optional
Expand Down Expand Up @@ -46,7 +45,7 @@ def ingest() -> None:
@click.option(
"-c",
"--config",
type=click.Path(exists=True, dir_okay=False),
type=click.Path(dir_okay=False),
help="Config file in .toml or .yaml format.",
required=True,
)
Expand Down Expand Up @@ -182,12 +181,14 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None:
# main function begins
logger.info("DataHub CLI version: %s", datahub_package.nice_version_name())

config_file = pathlib.Path(config)
pipeline_config = load_config_file(
config_file, squirrel_original_config=True, squirrel_field="__raw_config"
config,
squirrel_original_config=True,
squirrel_field="__raw_config",
allow_stdin=True,
)
raw_pipeline_config = pipeline_config["__raw_config"]
pipeline_config = {k: v for k, v in pipeline_config.items() if k != "__raw_config"}
raw_pipeline_config = pipeline_config.pop("__raw_config")

if test_source_connection:
_test_source_connection(report_to, pipeline_config)

Expand Down
35 changes: 21 additions & 14 deletions metadata-ingestion/src/datahub/configuration/config_loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import pathlib
import re
import sys
from typing import Any, Dict, Union

from expandvars import UnboundVariable, expandvars
Expand Down Expand Up @@ -54,26 +55,32 @@ def load_config_file(
config_file: Union[pathlib.Path, str],
squirrel_original_config: bool = False,
squirrel_field: str = "__orig_config",
allow_stdin: bool = False,
) -> dict:
if isinstance(config_file, str):
config_file = pathlib.Path(config_file)
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")

config_mech: ConfigurationMechanism
if config_file.suffix in [".yaml", ".yml"]:
if allow_stdin and config_file == "-":
# If we're reading from stdin, we assume that the input is a YAML file.
config_mech = YamlConfigurationMechanism()
elif config_file.suffix == ".toml":
config_mech = TomlConfigurationMechanism()
raw_config_file = sys.stdin.read()
else:
raise ConfigurationError(
"Only .toml and .yml are supported. Cannot process file type {}".format(
config_file.suffix
if isinstance(config_file, str):
config_file = pathlib.Path(config_file)
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")

if config_file.suffix in {".yaml", ".yml"}:
config_mech = YamlConfigurationMechanism()
elif config_file.suffix == ".toml":
config_mech = TomlConfigurationMechanism()
else:
raise ConfigurationError(
"Only .toml and .yml are supported. Cannot process file type {}".format(
config_file.suffix
)
)
)

with config_file.open() as raw_config_fp:
raw_config_file = raw_config_fp.read()
raw_config_file = config_file.read_text()

config_fp = io.StringIO(raw_config_file)
raw_config = config_mech.load_config(config_fp)
config = resolve_env_variables(raw_config)
Expand Down

0 comments on commit dfa05fa

Please sign in to comment.