From f5595a0d3f1b032d8be17b153a8fbbd10f2ffa1c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 23 Jan 2023 20:47:09 -0800 Subject: [PATCH 1/5] start log-file handler --- metadata-ingestion/setup.py | 18 +- .../src/datahub/cli/cli_utils.py | 4 +- .../src/datahub/cli/delete_cli.py | 2 +- .../src/datahub/cli/ingest_cli.py | 2 +- metadata-ingestion/src/datahub/cli/migrate.py | 4 +- metadata-ingestion/src/datahub/entrypoints.py | 80 +++----- .../datahub_ingestion_run_summary_provider.py | 26 ++- .../src/datahub/utilities/logging_manager.py | 190 ++++++++++++++++++ .../src/datahub/utilities/tee_io.py | 23 +++ .../tests/unit/test_cli_logging.py | 58 ++++++ 10 files changed, 333 insertions(+), 74 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/logging_manager.py create mode 100644 metadata-ingestion/src/datahub/utilities/tee_io.py create mode 100644 metadata-ingestion/tests/unit/test_cli_logging.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 954f77168d55d6..6b4ff31be980db 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -122,6 +122,8 @@ def get_long_description(): "greenlet", } +sqllineage_lib = "sqllineage==1.3.6" + aws_common = { # AWS Python SDK "boto3", @@ -143,7 +145,7 @@ def get_long_description(): # See https://github.com/joshtemple/lkml/issues/73. "lkml>=1.3.0b5", "sql-metadata==2.2.2", - "sqllineage==1.3.6", + sqllineage_lib, "GitPython>2", } @@ -165,7 +167,7 @@ def get_long_description(): "sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", - "sqllineage==1.3.6", + sqllineage_lib, *path_spec_common, } @@ -255,18 +257,18 @@ def get_long_description(): "gql>=3.3.0", "gql[requests]>=3.3.0", }, - "great-expectations": sql_common | {"sqllineage==1.3.6"}, + "great-expectations": sql_common | {sqllineage_lib}, # Source plugins # PyAthena is pinned with exact version because we use private method in PyAthena "athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"}, "azure-ad": set(), "bigquery": sql_common | bigquery_common - | {"sqllineage==1.3.6", "sql_metadata", "sqlalchemy-bigquery>=1.4.1"}, + | {sqllineage_lib, "sql_metadata", "sqlalchemy-bigquery>=1.4.1"}, "bigquery-beta": sql_common | bigquery_common | { - "sqllineage==1.3.6", + sqllineage_lib, "sql_metadata", "sqlalchemy-bigquery>=1.4.1", }, # deprecated, but keeping the extra for backwards compatibility @@ -310,8 +312,8 @@ def get_long_description(): "ldap": {"python-ldap>=2.4"}, "looker": looker_common, "lookml": looker_common, - "metabase": {"requests", "sqllineage==1.3.6"}, - "mode": {"requests", "sqllineage==1.3.6", "tenacity>=8.0.1"}, + "metabase": {"requests", sqllineage_lib}, + "mode": {"requests", sqllineage_lib, "tenacity>=8.0.1"}, "mongodb": {"pymongo[srv]>=3.11", "packaging"}, "mssql": sql_common | {"sqlalchemy-pytds>=0.3"}, "mssql-odbc": sql_common | {"pyodbc"}, @@ -325,7 +327,7 @@ def get_long_description(): "presto-on-hive": sql_common | {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"}, "pulsar": {"requests"}, - "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.6"}, + "redash": {"redash-toolbelt", "sql-metadata", sqllineage_lib}, "redshift": sql_common | redshift_common, "redshift-usage": sql_common | usage_common | redshift_common, "s3": {*s3_base, *data_lake_profiling}, diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 604d53b28214e7..0c2d3db3ed22c6 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -687,8 +687,6 @@ def get_aspects_for_entity( aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name( aspect_name ) - if aspect_name == "unknown": - print(f"Failed to find aspect_name for class {aspect_name}") aspect_dict = a["value"] if not typed: @@ -699,6 +697,8 @@ def get_aspects_for_entity( aspect_map[aspect_name] = aspect_py_class.from_obj(post_json_obj) except Exception as e: log.error(f"Error on {json.dumps(aspect_dict)}", e) + else: + log.debug(f"Failed to find class for aspect {aspect_name}") if aspects: return {k: v for (k, v) in aspect_map.items() if k in aspects} diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 51037c684ed856..bb39d1ac8e5b02 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -187,7 +187,7 @@ def delete( remove_references: bool = False if references_count > 0: - print( + click.echo( f"This urn was referenced in {references_count} other aspects across your metadata graph:" ) click.echo( diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 80ed0555993142..1a3156cb37ab66 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -393,5 +393,5 @@ def rollback( writer.writerow([row.get("urn")]) except IOError as e: - print(e) + logger.exception(f"Unable to save rollback failure report: {e}") sys.exit(f"Unable to write reports to {report_dir}") diff --git a/metadata-ingestion/src/datahub/cli/migrate.py b/metadata-ingestion/src/datahub/cli/migrate.py index 17c2fdf9278494..735b6154b328b0 100644 --- a/metadata-ingestion/src/datahub/cli/migrate.py +++ b/metadata-ingestion/src/datahub/cli/migrate.py @@ -262,7 +262,7 @@ def dataplatform2instance_func( delete_cli._delete_one_urn(src_entity_urn, soft=not hard, run_id=run_id) migration_report.on_entity_migrated(src_entity_urn, "status") # type: ignore - print(f"{migration_report}") + click.echo(f"{migration_report}") migrate_containers( dry_run=dry_run, env=env, @@ -372,7 +372,7 @@ def migrate_containers( delete_cli._delete_one_urn(src_urn, soft=not hard, run_id=run_id) migration_report.on_entity_migrated(src_urn, "status") # type: ignore - print(f"{migration_report}") + click.echo(f"{migration_report}") def get_containers_for_migration(env: str) -> List[Any]: diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 0cb19b6fe42374..4ca319a23d3144 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -2,6 +2,7 @@ import os import platform import sys +from typing import Optional import click import stackprinter @@ -25,21 +26,11 @@ from datahub.cli.telemetry import telemetry as telemetry_cli from datahub.cli.timeline_cli import timeline from datahub.telemetry import telemetry +from datahub.utilities.logging_manager import configure_logging from datahub.utilities.server_config_util import get_gms_config logger = logging.getLogger(__name__) - -# Configure some loggers. -logging.getLogger("urllib3").setLevel(logging.ERROR) -logging.getLogger("snowflake").setLevel(level=logging.WARNING) -# logging.getLogger("botocore").setLevel(logging.INFO) -# logging.getLogger("google").setLevel(logging.INFO) - -# Configure logger. -BASE_LOGGING_FORMAT = ( - "[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s" -) -logging.basicConfig(format=BASE_LOGGING_FORMAT) +_logging_configured = None MAX_CONTENT_WIDTH = 120 @@ -58,6 +49,12 @@ default=False, help="Enable debug logging.", ) +@click.option( + "--log-file", + type=click.Path(dir_okay=False), + default=None, + help="Enable debug logging.", +) @click.option( "--debug-vars/--no-debug-vars", type=bool, @@ -79,35 +76,31 @@ ) @click.pass_context def datahub( - ctx: click.Context, debug: bool, debug_vars: bool, detect_memory_leaks: bool + ctx: click.Context, + debug: bool, + log_file: Optional[str], + debug_vars: bool, + detect_memory_leaks: bool, ) -> None: if debug_vars: + # debug_vars implies debug. This option isn't actually used here, but instead + # read directly from the command line arguments in the main entrypoint. debug = True - # Insulate 'datahub' and all child loggers from inadvertent changes to the - # root logger by the external site packages that we import. - # (Eg: https://github.com/reata/sqllineage/commit/2df027c77ea0a8ea4909e471dcd1ecbf4b8aeb2f#diff-30685ea717322cd1e79c33ed8d37903eea388e1750aa00833c33c0c5b89448b3R11 - # changes the root logger's handler level to WARNING, causing any message below - # WARNING level to be dropped after this module is imported, irrespective - # of the logger's logging level! The lookml source was affected by this). - - # 1. Create 'datahub' parent logger. - datahub_logger = logging.getLogger("datahub") - # 2. Setup the stream handler with formatter. - stream_handler = logging.StreamHandler() - formatter = logging.Formatter(BASE_LOGGING_FORMAT) - stream_handler.setFormatter(formatter) - datahub_logger.addHandler(stream_handler) - # 3. Turn off propagation to the root handler. - datahub_logger.propagate = False - # 4. Adjust log-levels. - if debug or get_boolean_env_variable("DATAHUB_DEBUG", False): - logging.getLogger().setLevel(logging.INFO) - datahub_logger.setLevel(logging.DEBUG) - logging.getLogger("datahub_classify").setLevel(logging.DEBUG) - else: - logging.getLogger().setLevel(logging.WARNING) - datahub_logger.setLevel(logging.INFO) + debug = debug or get_boolean_env_variable("DATAHUB_DEBUG", False) + + # Note that we're purposely leaking the context manager here. + # Technically we should wrap this with ctx.with_resource(). However, we have + # some important error logging in the main() wrapper function that we don't + # want to miss. If we wrap this with ctx.with_resource(), then click would + # clean it up before those error handlers are processed. + # So why is this ok? Because we're leaking a context manager, this will + # still get cleaned up automatically when the memory is reclaimed, which is + # worse-case at program exit. + global _logging_configured + _logging_configured = None # see if we can force python to GC this + _logging_configured = configure_logging(debug=debug, log_file=log_file) + _logging_configured.__enter__() # Setup the context for the memory_leak_detector decorator. ctx.ensure_object(dict) @@ -232,16 +225,3 @@ def main(**kwargs): ) logger.debug(f"GMS config {get_gms_config()}") sys.exit(1) - - -def _get_pretty_chained_message(exc: Exception) -> str: - pretty_msg = f"{exc.__class__.__name__} {exc}" - tmp_exc = exc.__cause__ - indent = "\n\t\t" - while tmp_exc: - pretty_msg = ( - f"{pretty_msg} due to {indent}{tmp_exc.__class__.__name__}{tmp_exc}" - ) - tmp_exc = tmp_exc.__cause__ - indent += "\t" - return pretty_msg diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py index 3f0a966a95dc68..c78e72ac52c597 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py @@ -24,8 +24,10 @@ ExecutionRequestInputClass, ExecutionRequestResultClass, ExecutionRequestSourceClass, + StructuredExecutionReportClass, _Aspect, ) +from datahub.utilities.logging_manager import get_log_buffer from datahub.utilities.urns.urn import Urn logger = logging.getLogger(__name__) @@ -143,7 +145,6 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non # Emit the dataHubIngestionSourceInfo aspect self._emit_aspect( entity_urn=self.ingestion_source_urn, - aspect_name="dataHubIngestionSourceInfo", aspect_value=source_info_aspect, ) @@ -154,17 +155,12 @@ def _get_recipe_to_report(self, ctx: PipelineContext) -> str: else: return json.dumps(redact_raw_config(ctx.pipeline_config._raw_dict)) - def _emit_aspect( - self, entity_urn: Urn, aspect_name: str, aspect_value: _Aspect - ) -> None: + def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None: self.sink.write_record_async( RecordEnvelope( record=MetadataChangeProposalWrapper( - entityType=entity_urn.get_type(), entityUrn=str(entity_urn), - aspectName=aspect_name, aspect=aspect_value, - changeType="UPSERT", ), metadata={}, ), @@ -190,7 +186,6 @@ def on_start(self, ctx: PipelineContext) -> None: # Emit the dataHubExecutionRequestInput aspect self._emit_aspect( entity_urn=self.execution_request_input_urn, - aspect_name="dataHubExecutionRequestInput", aspect_value=execution_input_aspect, ) @@ -200,18 +195,29 @@ def on_completion( report: Dict[str, Any], ctx: PipelineContext, ) -> None: + # Prepare a nicely formatted summary + structured_report_str = json.dumps(report, indent=2) + summary = f"~~~~ Ingestion Report ~~~~\n{structured_report_str}\n\n" + summary += "~~~~ Ingestion Logs ~~~~\n" + summary += get_log_buffer().format_lines() + # Construct the dataHubExecutionRequestResult aspect + structured_report = StructuredExecutionReportClass( + type="CLI_INGEST", + serializedValue=structured_report_str, + contentType="application/json", + ) execution_result_aspect = ExecutionRequestResultClass( status=status, startTimeMs=self.start_time_ms, durationMs=self.get_cur_time_in_ms() - self.start_time_ms, - report=json.dumps(report, indent=2), + report=summary, + structuredReport=structured_report, ) # Emit the dataHubExecutionRequestResult aspect self._emit_aspect( entity_urn=self.execution_request_input_urn, - aspect_name="dataHubExecutionRequestResult", aspect_value=execution_result_aspect, ) self.sink.close() diff --git a/metadata-ingestion/src/datahub/utilities/logging_manager.py b/metadata-ingestion/src/datahub/utilities/logging_manager.py new file mode 100644 index 00000000000000..2d7d900c7b8a3b --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/logging_manager.py @@ -0,0 +1,190 @@ +""" +Configure logging and stdout for the CLI. Our goal is to have the following behavior: + +1. By default, show INFO level logs from datahub and WARNINGs from everything else. +2. If the user passes --debug, show DEBUG level logs from datahub and INFOs from everything else. +3. If the user passes --log-file, write all logs and stdout to the specified file. + This should contain debug logs regardless of the user's CLI args. +4. When outputting to a TTY, colorize the logs. + +This code path should not be executed if we're being used as a library. +""" + +import collections +import contextlib +import logging +import sys +from typing import Deque, Iterator, Optional + +import click + +from datahub.utilities.tee_io import TeeIO + +BASE_LOGGING_FORMAT = ( + "[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s" +) +DATAHUB_PACKAGES = [ + "datahub", + "datahub_provider", + "datahub_classify", + "datahub_actions", +] +IN_MEMORY_LOG_BUFFER_SIZE = 2000 # lines + + +class _ColorLogFormatter(logging.Formatter): + # Adapted from https://stackoverflow.com/a/56944256/3638629. + + MESSAGE_COLORS = { + "DEBUG": "blue", + "INFO": None, # print with default color + "WARNING": "yellow", + "ERROR": "red", + "CRITICAL": "red", + } + + def __init__(self) -> None: + super().__init__(BASE_LOGGING_FORMAT) + + def formatMessage(self, record: logging.LogRecord) -> str: + if sys.stderr.isatty(): + return self._formatMessageColor(record) + else: + return super().formatMessage(record) + + def _formatMessageColor(self, record: logging.LogRecord) -> str: + # Mimic our default format, but with color. + message_fg = self.MESSAGE_COLORS.get(record.levelname) + return ( + f'{click.style(f"[{self.formatTime(record, self.datefmt)}]", fg="green", dim=True)} ' + f"{click.style(f'{record.levelname:8}', fg=message_fg)} " + f'{click.style(f"{{{record.name}:{record.lineno}}}", fg="blue", dim=True)} - ' + f"{click.style(record.getMessage(), fg=message_fg)}" + ) + + +class _DatahubLogFilter(logging.Filter): + def __init__(self, debug: bool) -> None: + self.debug = debug + + def filter(self, record: logging.LogRecord) -> bool: + top_module = record.name.split(".")[0] + + if top_module in DATAHUB_PACKAGES: + if self.debug: + return record.levelno >= logging.DEBUG + else: + return record.levelno >= logging.INFO + else: + if self.debug: + return record.levelno >= logging.WARNING + else: + return record.levelno >= logging.INFO + + +class _LogBuffer: + def __init__(self, maxlen: Optional[int] = None) -> None: + self._buffer: Deque[str] = collections.deque(maxlen=maxlen) + + def write(self, line: str) -> None: + self._buffer.append(line) + + def clear(self) -> None: + self._buffer.clear() + + def format_lines(self) -> str: + text = "\n".join(self._buffer) + if len(self._buffer) == 0: + text = "[no logs]" + elif len(self._buffer) == self._buffer.maxlen: + text = f"[earlier logs truncated...]\n{text}" + return text + + +class _BufferLogHandler(logging.Handler): + def __init__(self, storage: _LogBuffer) -> None: + super().__init__() + self._storage = storage + + def emit(self, record: logging.LogRecord) -> None: + self._storage.write(self.format(record)) + + +def _remove_all_handlers(logger: logging.Logger) -> None: + for handler in logger.handlers[:]: + logger.removeHandler(handler) + handler.close() + + +_log_buffer = _LogBuffer(maxlen=IN_MEMORY_LOG_BUFFER_SIZE) + + +def get_log_buffer() -> _LogBuffer: + return _log_buffer + + +_stream_formatter = _ColorLogFormatter() +_default_formatter = logging.Formatter(BASE_LOGGING_FORMAT) + + +@contextlib.contextmanager +def configure_logging(debug: bool, log_file: Optional[str] = None) -> Iterator[None]: + _log_buffer.clear() + + with contextlib.ExitStack() as stack: + # Create stdout handler. + stream_handler = logging.StreamHandler() + stream_handler.addFilter(_DatahubLogFilter(debug=debug)) + stream_handler.setFormatter(_stream_formatter) + + # Create file handler. + file_handler: logging.Handler + if log_file: + file = stack.enter_context(open(log_file, "w")) + tee = TeeIO(sys.stdout, file) + stack.enter_context(contextlib.redirect_stdout(tee)) # type: ignore + + file_handler = logging.StreamHandler(file) + file_handler.addFilter(_DatahubLogFilter(debug=True)) + file_handler.setFormatter(_default_formatter) + else: + file_handler = logging.NullHandler() + + # Create the in-memory buffer handler. + buffer_handler = _BufferLogHandler(_log_buffer) + buffer_handler.addFilter(_DatahubLogFilter(debug=True)) + buffer_handler.setFormatter(_default_formatter) + + handlers = [ + stream_handler, + file_handler, + buffer_handler, + ] + + # Configure the loggers. + root_logger = logging.getLogger() + _remove_all_handlers(root_logger) + root_logger.setLevel(logging.INFO) + for handler in handlers: + root_logger.addHandler(handler) + + for lib in DATAHUB_PACKAGES: + # Using a separate handler from the root logger allows us to control the log level + # of the datahub libs independently of the root logger. + # It also insulates us from rogue libraries that might call `logging.basicConfig` + # or otherwise mess with the logging configuration. + lib_logger = logging.getLogger(lib) + _remove_all_handlers(lib_logger) + lib_logger.setLevel(logging.DEBUG) + lib_logger.propagate = False + for handler in handlers: + lib_logger.addHandler(handler) + + yield + + +# Reduce logging from some particularly chatty libraries. +logging.getLogger("urllib3").setLevel(logging.ERROR) +logging.getLogger("snowflake").setLevel(level=logging.WARNING) +# logging.getLogger("botocore").setLevel(logging.INFO) +# logging.getLogger("google").setLevel(logging.INFO) diff --git a/metadata-ingestion/src/datahub/utilities/tee_io.py b/metadata-ingestion/src/datahub/utilities/tee_io.py new file mode 100644 index 00000000000000..4c0de26b570668 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/tee_io.py @@ -0,0 +1,23 @@ +from typing import Any, TextIO + + +class TeeIO: + """ + A file-like object that writes to multiple streams, similar to `tee`. + It mirrors the attributes of the first stream for encoding/tty/etc. + """ + + def __init__(self, *args: TextIO) -> None: + assert args + self._streams = args + + def write(self, line: str) -> None: + for stream in self._streams: + stream.write(line) + + def flush(self) -> None: + for stream in self._streams: + stream.flush() + + def __getattr__(self, attr: str) -> Any: + return getattr(self._streams[0], attr) diff --git a/metadata-ingestion/tests/unit/test_cli_logging.py b/metadata-ingestion/tests/unit/test_cli_logging.py new file mode 100644 index 00000000000000..50f2b8e70d9c98 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_cli_logging.py @@ -0,0 +1,58 @@ +import logging +import pathlib + +import click +import regex +from click.testing import CliRunner + +from datahub.entrypoints import datahub + + +@datahub.command() +def my_logging_fn(): + logger = logging.getLogger("datahub.my_cli_module") + + print("this is a print statement") + + click.echo("this is a click.echo statement") + + logger.debug("Example debug line") + logger.info("This is an %s message", "info") # test string interpolation + logger.warning("This is a warning message") + logger.error("this is an error with no stack trace") + try: + 1 / 0 + except ZeroDivisionError: + logger.exception("failed to divide by zero") + + +def test_cli_logging(tmp_path): + log_file: pathlib.Path = tmp_path / "datahub.log" + + runner = CliRunner() + result = runner.invoke( + datahub, ["--debug", "--log-file", str(log_file), "my-logging-fn"] + ) + assert result.exit_code == 0 + + # The output should include the stdout and stderr, formatted as expected. + regex.match( + r"""\ +this is a print statement +this is a click.echo statement +[.+] DEBUG .datahub.my_cli_module:\d+. - Example debug line +[.+] INFO .datahub.my_cli_module:\d+. - This is an info message +[.+] WARNING .datahub.my_cli_module:\d+. - This is a warning message +[.+] ERROR .datahub.my_cli_module:\d+. - this is an error with no stack trace +[.+] ERROR .datahub.my_cli_module:\d+. - failed to divide by zero +Traceback (most recent call last): + File .+, in my_logging_fn + 1 / 0 +ZeroDivisionError: division by zero +""", + result.output, + ) + + # The log file should match the output exactly. + log_file_output = log_file.read_text() + assert log_file_output == result.output From 308428fb1e9e97f182f4c9917fa4853eddb17fca Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 23 Jan 2023 23:47:39 -0800 Subject: [PATCH 2/5] update docs --- metadata-ingestion/src/datahub/utilities/logging_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/utilities/logging_manager.py b/metadata-ingestion/src/datahub/utilities/logging_manager.py index 2d7d900c7b8a3b..6a2379e0c447cf 100644 --- a/metadata-ingestion/src/datahub/utilities/logging_manager.py +++ b/metadata-ingestion/src/datahub/utilities/logging_manager.py @@ -5,7 +5,8 @@ 2. If the user passes --debug, show DEBUG level logs from datahub and INFOs from everything else. 3. If the user passes --log-file, write all logs and stdout to the specified file. This should contain debug logs regardless of the user's CLI args. -4. When outputting to a TTY, colorize the logs. +4. Maintain an in-memory buffer of the latest logs for reporting purposes. +5. When outputting to a TTY, colorize the logs. This code path should not be executed if we're being used as a library. """ From 4e36436912ea677cfd235cc02ec4a245e73a8a1d Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 24 Jan 2023 13:26:30 -0800 Subject: [PATCH 3/5] test in-memory buffer --- metadata-ingestion/tests/unit/test_cli_logging.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/metadata-ingestion/tests/unit/test_cli_logging.py b/metadata-ingestion/tests/unit/test_cli_logging.py index 50f2b8e70d9c98..8cc239e3f18e18 100644 --- a/metadata-ingestion/tests/unit/test_cli_logging.py +++ b/metadata-ingestion/tests/unit/test_cli_logging.py @@ -6,6 +6,7 @@ from click.testing import CliRunner from datahub.entrypoints import datahub +from datahub.utilities.logging_manager import get_log_buffer @datahub.command() @@ -56,3 +57,8 @@ def test_cli_logging(tmp_path): # The log file should match the output exactly. log_file_output = log_file.read_text() assert log_file_output == result.output + + # The in-memory log buffer should contain the log messages. + # The first two lines are stdout, so we skip them. + expected_log_output = "\n".join(result.output.splitlines()[2:]) + assert get_log_buffer().format_lines() == expected_log_output From 3e22c5eb1d5a6fe0adee7735236ac6031cb7035f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 24 Jan 2023 17:16:54 -0800 Subject: [PATCH 4/5] fix test --- metadata-ingestion/src/datahub/cli/cli_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 0c2d3db3ed22c6..f04881c4d115ab 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -88,8 +88,7 @@ def write_gms_config( # ok to fail on this previous_config = {} log.debug( - f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}. This isn't fatal.", - e, + f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal." ) config_dict = {**previous_config, **config.dict()} else: From 62a890d803eb7900ab92b97288ef4f40daea1929 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 25 Jan 2023 15:59:29 -0800 Subject: [PATCH 5/5] support NO_COLOR --- .../src/datahub/ingestion/source/s3/source.py | 2 -- metadata-ingestion/src/datahub/telemetry/telemetry.py | 2 +- metadata-ingestion/src/datahub/upgrade/upgrade.py | 3 +-- .../src/datahub/utilities/logging_manager.py | 7 +++++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index eb9fd27367a12c..142c449f404d02 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -160,8 +160,6 @@ def get_column_type( "include_field_sample_values", ] -S3_PREFIXES = ("s3://", "s3n://", "s3a://") - # LOCAL_BROWSE_PATH_TRANSFORMER_CONFIG = AddDatasetBrowsePathConfig( # path_templates=["/ENV/PLATFORMDATASET_PARTS"], replace_existing=True diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index f60371cb3f5acf..5395794dc00dde 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -242,7 +242,7 @@ def ping( # send event try: - logger.debug("Sending Telemetry") + logger.debug(f"Sending telemetry for {event_name}") properties.update(self._server_props(server)) self.mp.track(self.client_id, event_name, properties) diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index 4d7166df120640..9806fb0cd033b4 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -124,8 +124,7 @@ async def get_server_version_stats( server_config = await get_server_config(host, token) log.debug(f"server_config:{server_config}") except Exception as e: - log.debug("Failed to get a valid server", e) - pass + log.debug(f"Failed to get a valid server: {e}") else: server_config = server.server_config diff --git a/metadata-ingestion/src/datahub/utilities/logging_manager.py b/metadata-ingestion/src/datahub/utilities/logging_manager.py index 6a2379e0c447cf..ea86c1c7a475df 100644 --- a/metadata-ingestion/src/datahub/utilities/logging_manager.py +++ b/metadata-ingestion/src/datahub/utilities/logging_manager.py @@ -14,6 +14,7 @@ import collections import contextlib import logging +import os import sys from typing import Deque, Iterator, Optional @@ -32,6 +33,8 @@ ] IN_MEMORY_LOG_BUFFER_SIZE = 2000 # lines +NO_COLOR = os.environ.get("NO_COLOR", False) + class _ColorLogFormatter(logging.Formatter): # Adapted from https://stackoverflow.com/a/56944256/3638629. @@ -48,7 +51,7 @@ def __init__(self) -> None: super().__init__(BASE_LOGGING_FORMAT) def formatMessage(self, record: logging.LogRecord) -> str: - if sys.stderr.isatty(): + if not NO_COLOR and sys.stderr.isatty(): return self._formatMessageColor(record) else: return super().formatMessage(record) @@ -153,7 +156,7 @@ def configure_logging(debug: bool, log_file: Optional[str] = None) -> Iterator[N # Create the in-memory buffer handler. buffer_handler = _BufferLogHandler(_log_buffer) - buffer_handler.addFilter(_DatahubLogFilter(debug=True)) + buffer_handler.addFilter(_DatahubLogFilter(debug=debug)) buffer_handler.setFormatter(_default_formatter) handlers = [