From 3080b9e3ee433f3e92fc0921f4a5c96a4e82d405 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 3 Nov 2022 20:16:07 -0700 Subject: [PATCH] fix(ingest): only log vars if requested (#6362) --- metadata-ingestion/README.md | 7 -- .../src/datahub/cli/check_cli.py | 2 +- metadata-ingestion/src/datahub/cli/get_cli.py | 3 +- .../src/datahub/cli/ingest_cli.py | 42 +++--------- .../src/datahub/cli/timeline_cli.py | 3 +- .../src/datahub/configuration/__init__.py | 1 - .../src/datahub/configuration/common.py | 22 +----- metadata-ingestion/src/datahub/entrypoints.py | 68 +++++++++++-------- .../datahub_ingestion_run_summary_provider.py | 4 +- .../src/datahub/ingestion/run/pipeline.py | 32 ++++----- 10 files changed, 72 insertions(+), 112 deletions(-) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 206684ad8966d..f57d82ebe9d92 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -126,13 +126,6 @@ By default `--preview` creates 10 workunits. But if you wish to try producing mo datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yml -n --preview --preview-workunits=20 ``` -Sometimes, while running the ingestion pipeline, unexpected exceptions may occur. This can cause `stackprinter` to print all variables the logs. This may lead to credentials being written to logfiles. To prevent this behavior, in case of unexpected errors, a `--suppress-error-logs` option can be added to ingest cli command. By default, this option is set to false. However, if enabled, prevents printing all variables to logs, mitigating the risk of writing credentials to logs. The `--suppress-error-logs` option is applied when the ingestion pipeline is actually running. - -```shell -# Running ingestion with --suppress-error-logs option -datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yml --suppress-error-logs -``` - #### Reporting By default, the cli sends an ingestion report to DataHub, which allows you to see the result of all cli-based ingestion in the UI. This can be turned off with the `--no-default-report` flag. diff --git a/metadata-ingestion/src/datahub/cli/check_cli.py b/metadata-ingestion/src/datahub/cli/check_cli.py index b6484549a7de1..51f8564d70311 100644 --- a/metadata-ingestion/src/datahub/cli/check_cli.py +++ b/metadata-ingestion/src/datahub/cli/check_cli.py @@ -91,7 +91,7 @@ def plugins(verbose: bool) -> None: ) -@check.command +@check.command() def graph_consistency() -> None: gms_endpoint, gms_token = get_url_and_token() check_data_platform.check(gms_endpoint, gms_token) diff --git a/metadata-ingestion/src/datahub/cli/get_cli.py b/metadata-ingestion/src/datahub/cli/get_cli.py index ee5b9d64621e0..9788ebc1111ed 100644 --- a/metadata-ingestion/src/datahub/cli/get_cli.py +++ b/metadata-ingestion/src/datahub/cli/get_cli.py @@ -3,7 +3,6 @@ from typing import Any, List, Optional import click -from click.exceptions import UsageError from datahub.cli.cli_utils import get_aspects_for_entity from datahub.telemetry import telemetry @@ -32,7 +31,7 @@ def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None: if urn is None: if not ctx.args: - raise UsageError("Nothing for me to get. Maybe provide an urn?") + raise click.UsageError("Nothing for me to get. Maybe provide an urn?") urn = ctx.args[0] logger.debug(f"Using urn from args {urn}") click.echo( diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 2cae9826cff59..24ca706e7d5f0 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -21,7 +21,6 @@ get_session_and_host, post_rollback_endpoint, ) -from datahub.configuration import SensitiveError from datahub.configuration.config_loader import load_config_file from datahub.ingestion.run.connection import ConnectionManager from datahub.ingestion.run.pipeline import Pipeline @@ -75,13 +74,6 @@ def ingest() -> None: default=False, help="If enabled, ingestion runs with warnings will yield a non-zero error code", ) -@click.option( - "--suppress-error-logs", - type=bool, - is_flag=True, - default=False, - help="Suppress display of variable values in logs by suppressing elaborate stacktrace (stackprinter) during ingestion failures", -) @click.option( "--test-source-connection", type=bool, @@ -115,7 +107,6 @@ def run( preview: bool, strict_warnings: bool, preview_workunits: int, - suppress_error_logs: bool, test_source_connection: bool, report_to: str, no_default_report: bool, @@ -139,13 +130,7 @@ def run_pipeline_to_completion( logger.info( f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}" ) - # We dont want to log sensitive information in variables if the pipeline fails due to - # an unexpected error. Disable printing sensitive info to logs if ingestion is running - # with `--suppress-error-logs` flag. - if suppress_error_logs: - raise SensitiveError() from e - else: - raise e + raise e else: logger.info("Finished metadata ingestion") pipeline.log_ingestion_stats() @@ -192,21 +177,16 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None: if test_source_connection: _test_source_connection(report_to, pipeline_config) - try: - logger.debug(f"Using config: {pipeline_config}") - pipeline = Pipeline.create( - pipeline_config, - dry_run, - preview, - preview_workunits, - report_to, - no_default_report, - raw_pipeline_config, - ) - except Exception as e: - # The pipeline_config may contain sensitive information, so we wrap the exception - # in a SensitiveError to prevent detailed variable-level information from being logged. - raise SensitiveError() from e + # logger.debug(f"Using config: {pipeline_config}") + pipeline = Pipeline.create( + pipeline_config, + dry_run, + preview, + preview_workunits, + report_to, + no_default_report, + raw_pipeline_config, + ) loop = asyncio.get_event_loop() loop.run_until_complete(run_func_check_upgrade(pipeline)) diff --git a/metadata-ingestion/src/datahub/cli/timeline_cli.py b/metadata-ingestion/src/datahub/cli/timeline_cli.py index 78988c6f165cf..6980249aa8e6f 100644 --- a/metadata-ingestion/src/datahub/cli/timeline_cli.py +++ b/metadata-ingestion/src/datahub/cli/timeline_cli.py @@ -5,7 +5,6 @@ from typing import Any, List, Optional import click -from click.exceptions import UsageError from requests import Response from termcolor import colored @@ -155,7 +154,7 @@ def timeline( if urn is None: if not ctx.args: - raise UsageError("Nothing for me to get. Maybe provide an urn?") + raise click.UsageError("Nothing for me to get. Maybe provide an urn?") urn = ctx.args[0] logger.debug(f"Using urn from args {urn}") diff --git a/metadata-ingestion/src/datahub/configuration/__init__.py b/metadata-ingestion/src/datahub/configuration/__init__.py index 1abb9792d1e7b..008d788072d0a 100644 --- a/metadata-ingestion/src/datahub/configuration/__init__.py +++ b/metadata-ingestion/src/datahub/configuration/__init__.py @@ -2,5 +2,4 @@ ConfigModel, ConfigurationMechanism, DynamicTypedConfig, - SensitiveError, ) diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 42d0e856dc96d..7ee4df10e0469 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -1,7 +1,7 @@ import re from abc import ABC, abstractmethod from enum import auto -from typing import IO, Any, ClassVar, Dict, List, Optional, Type, cast +from typing import IO, Any, ClassVar, Dict, List, Optional, Type from cached_property import cached_property from pydantic import BaseModel, Extra @@ -89,24 +89,8 @@ class ConfigurationError(MetaError): """A configuration error has happened""" -class SensitiveError(Exception): - """Wraps an exception that should not be logged with variable information.""" - - @classmethod - def get_sensitive_cause(cls, exc: Exception) -> Optional[Exception]: - """ - Returns the underlying exception if the exception is sensitive, and None otherwise. - """ - - e: Optional[Exception] = exc - while e: - # This cast converts BaseException to Exception. - inner = cast(Optional[Exception], e.__cause__) - - if isinstance(e, cls): - return inner - e = inner - return None +class IgnorableError(MetaError): + """An error that can be ignored""" class ConfigurationMechanism(ABC): diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 8e917d3f394f4..12c1101daa36c 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -18,8 +18,6 @@ from datahub.cli.put_cli import put from datahub.cli.telemetry import telemetry as telemetry_cli from datahub.cli.timeline_cli import timeline -from datahub.configuration import SensitiveError -from datahub.configuration.common import ConfigurationError from datahub.telemetry import telemetry from datahub.utilities.server_config_util import get_gms_config @@ -45,9 +43,22 @@ # Avoid truncation of help text. # See https://github.com/pallets/click/issues/486. max_content_width=MAX_CONTENT_WIDTH, - ) + ), +) +@click.option( + "--debug/--no-debug", + type=bool, + is_flag=True, + default=False, + help="Enable debug logging.", +) +@click.option( + "--debug-vars/--no-debug-vars", + type=bool, + is_flag=True, + default=False, + help="Show variable values in stack traces. Implies --debug. While we try to avoid printing sensitive information like passwords, this may still happen.", ) -@click.option("--debug/--no-debug", default=False) @click.version_option( version=datahub_package.nice_version_name(), prog_name=datahub_package.__package_name__, @@ -61,7 +72,12 @@ help="Run memory leak detection.", ) @click.pass_context -def datahub(ctx: click.Context, debug: bool, detect_memory_leaks: bool) -> None: +def datahub( + ctx: click.Context, debug: bool, debug_vars: bool, detect_memory_leaks: bool +) -> None: + if debug_vars: + debug = True + # Insulate 'datahub' and all child loggers from inadvertent changes to the # root logger by the external site packages that we import. # (Eg: https://github.com/reata/sqllineage/commit/2df027c77ea0a8ea4909e471dcd1ecbf4b8aeb2f#diff-30685ea717322cd1e79c33ed8d37903eea388e1750aa00833c33c0c5b89448b3R11 @@ -85,8 +101,7 @@ def datahub(ctx: click.Context, debug: bool, detect_memory_leaks: bool) -> None: else: logging.getLogger().setLevel(logging.WARNING) datahub_logger.setLevel(logging.INFO) - # loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - # print(loggers) + # Setup the context for the memory_leak_detector decorator. ctx.ensure_object(dict) ctx.obj["detect_memory_leaks"] = detect_memory_leaks @@ -147,25 +162,27 @@ def main(**kwargs): # This wrapper prevents click from suppressing errors. try: sys.exit(datahub(standalone_mode=False, **kwargs)) - except click.exceptions.Abort: + except click.Abort: # Click already automatically prints an abort message, so we can just exit. sys.exit(1) except click.ClickException as error: error.show() sys.exit(1) except Exception as exc: - kwargs = {} - sensitive_cause = SensitiveError.get_sensitive_cause(exc) - if sensitive_cause: - kwargs = {"show_vals": None} - exc = sensitive_cause - - # suppress stack printing for common configuration errors - if isinstance(exc, (ConfigurationError, ValidationError)): - logger.error(exc) + if "--debug-vars" in sys.argv: + show_vals = "like_source" else: - # only print stacktraces during debug - logger.debug( + # Unless --debug-vars is passed, we don't want to print the values of variables. + show_vals = None + + if isinstance(exc, ValidationError) or isinstance( + exc.__cause__, ValidationError + ): + # Don't print the full stack trace for simple config errors. + logger.error(exc) + elif logger.isEnabledFor(logging.DEBUG): + # We only print rich stacktraces during debug. + logger.error( stackprinter.format( exc, line_wrap=MAX_CONTENT_WIDTH, @@ -182,19 +199,12 @@ def main(**kwargs): r".*cparams.*", ], suppressed_paths=[r"lib/python.*/site-packages/click/"], - **kwargs, + show_vals=show_vals, ) ) + else: + logger.exception(f"Command failed: {exc}") - if "--debug" not in sys.argv: - pretty_msg = _get_pretty_chained_message(exc) - # log the exception with a basic traceback - logger.exception(msg="", exc_info=exc) - debug_variant_command = " ".join(["datahub", "--debug"] + sys.argv[1:]) - # log a more human readable message at the very end - logger.error( - f"Command failed: \n\t{pretty_msg}.\n\tRun with --debug to get full stacktrace.\n\te.g. '{debug_variant_command}'" - ) logger.debug( f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}" ) diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py index 13fcde65d85d5..8afe34b93749a 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py @@ -4,7 +4,7 @@ from typing import Any, Dict, Optional from datahub import nice_version_name -from datahub.configuration.common import ConfigModel, DynamicTypedConfig +from datahub.configuration.common import ConfigModel, DynamicTypedConfig, IgnorableError from datahub.emitter.mce_builder import datahub_guid from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import make_data_platform_urn @@ -89,7 +89,7 @@ def create( # Global instances are safe to use only if the types are datahub-rest and datahub-kafka # Re-using a shared file sink will result in clobbering the events if sink_config_holder.type not in ["datahub-rest", "datahub-kafka"]: - raise ValueError( + raise IgnorableError( f"Datahub ingestion reporter will be disabled because sink type {sink_config_holder.type} is not supported" ) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 2949280400831..30ff09e65b8b9 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -5,14 +5,14 @@ import sys import time from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, cast +from typing import Any, Dict, Iterable, List, NoReturn, Optional, cast import click import humanfriendly import psutil import datahub -from datahub.configuration.common import PipelineExecutionError +from datahub.configuration.common import IgnorableError, PipelineExecutionError from datahub.ingestion.api.committable import CommitPolicy from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope from datahub.ingestion.api.pipeline_run_listener import PipelineRunListener @@ -125,8 +125,8 @@ class Pipeline: sink: Sink transformers: List[Transformer] - def _record_initialization_failure(self, e: Exception, msg: str) -> None: - raise PipelineInitError(msg) from e + def _raise_initialization_error(self, e: Exception, msg: str) -> NoReturn: + raise PipelineInitError(f"{msg}: {e}") from e def __init__( self, @@ -157,16 +157,15 @@ def __init__( pipeline_config=self.config, ) except Exception as e: - self._record_initialization_failure(e, "Failed to set up framework context") + self._raise_initialization_error(e, "Failed to set up framework context") sink_type = self.config.sink.type try: sink_class = sink_registry.get(sink_type) except Exception as e: - self._record_initialization_failure( + self._raise_initialization_error( e, f"Failed to find a registered sink for type {sink_type}" ) - return try: sink_config = self.config.sink.dict().get("config") or {} @@ -174,7 +173,7 @@ def __init__( logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") logger.info(f"Sink configured successfully. {self.sink.configured()}") except Exception as e: - self._record_initialization_failure( + self._raise_initialization_error( e, f"Failed to configure sink ({sink_type})" ) @@ -182,15 +181,13 @@ def __init__( try: self._configure_reporting(report_to, no_default_report) except Exception as e: - self._record_initialization_failure(e, "Failed to configure reporters") - return + self._raise_initialization_error(e, "Failed to configure reporters") try: source_type = self.config.source.type source_class = source_registry.get(source_type) except Exception as e: - self._record_initialization_failure(e, "Failed to create source") - return + self._raise_initialization_error(e, "Failed to create source") try: self.source: Source = source_class.create( @@ -199,10 +196,9 @@ def __init__( logger.debug(f"Source type:{source_type},{source_class} configured") logger.info("Source configured successfully.") except Exception as e: - self._record_initialization_failure( + self._raise_initialization_error( e, f"Failed to configure source ({source_type})" ) - return try: extractor_class = extractor_registry.get(self.config.source.extractor) @@ -210,16 +206,14 @@ def __init__( self.config.source.extractor_config, self.ctx ) except Exception as e: - self._record_initialization_failure( + self._raise_initialization_error( e, f"Failed to configure extractor ({self.config.source.extractor})" ) - return try: self._configure_transforms() except ValueError as e: - self._record_initialization_failure(e, "Failed to configure transformers") - return + self._raise_initialization_error(e, "Failed to configure transformers") def _configure_transforms(self) -> None: self.transformers = [] @@ -272,6 +266,8 @@ def _configure_reporting( except Exception as e: if reporter.required: raise + elif isinstance(e, IgnorableError): + logger.debug(f"Reporter type {reporter_type} is disabled: {e}") else: logger.warning( f"Failed to configure reporter: {reporter_type}", exc_info=e