diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index dc7521f8f8c555..93cea62cb1a7ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -1,4 +1,5 @@ import contextlib +import enum import itertools import logging import os @@ -117,6 +118,13 @@ class PipelineInitError(Exception): pass +class PipelineStatus(enum.Enum): + UNKNOWN = enum.auto() + COMPLETED = enum.auto() + PIPELINE_ERROR = enum.auto() + CANCELLED = enum.auto() + + @contextlib.contextmanager def _add_init_error_context(step: str) -> Iterator[None]: """Enriches any exceptions raised with information about the step that failed.""" @@ -366,11 +374,11 @@ def _notify_reporters_on_ingestion_completion(self) -> None: try: reporter.on_completion( status="CANCELLED" - if self.final_status == "cancelled" + if self.final_status == PipelineStatus.CANCELLED else "FAILURE" if self.has_failures() else "SUCCESS" - if self.final_status == "completed" + if self.final_status == PipelineStatus.COMPLETED else "UNKNOWN", report=self._get_structured_report(), ctx=self.ctx, @@ -422,7 +430,7 @@ def run(self) -> None: ) ) - self.final_status = "unknown" + self.final_status = PipelineStatus.UNKNOWN self._notify_reporters_on_ingestion_start() callback = None try: @@ -459,9 +467,7 @@ def run(self) -> None: f"Failed to write record: {e}" ) - except RuntimeError: - raise - except SystemExit: + except (RuntimeError, SystemExit): raise except Exception as e: logger.error( @@ -490,11 +496,22 @@ def run(self) -> None: self.sink.write_record_async(record_envelope, callback) self.process_commits() - self.final_status = "completed" - except (SystemExit, RuntimeError, KeyboardInterrupt) as e: - self.final_status = "cancelled" + self.final_status = PipelineStatus.COMPLETED + except (SystemExit, KeyboardInterrupt) as e: + self.final_status = PipelineStatus.CANCELLED logger.error("Caught error", exc_info=e) raise + except Exception as exc: + self.final_status = PipelineStatus.PIPELINE_ERROR + logger.exception("Ingestion pipeline threw an uncaught exception") + + # HACK: We'll report this as a source error, since we don't have a great place to put it. + # It theoretically could've come from any part of the pipeline, but usually it's from the source. + # This ensures that it is included in the report, and that the run is marked as failed. + self.source.get_report().report_failure( + "pipeline_error", + f"Ingestion pipeline threw an uncaught exception: {exc}", + ) finally: clear_global_warnings() @@ -629,20 +646,30 @@ def has_failures(self) -> bool: def pretty_print_summary( self, warnings_as_failure: bool = False, currently_running: bool = False ) -> int: - click.echo() - click.secho("Cli report:", bold=True) - click.secho(self.cli_report.as_string()) - click.secho(f"Source ({self.source_type}) report:", bold=True) - click.echo(self.source.get_report().as_string()) - click.secho(f"Sink ({self.sink_type}) report:", bold=True) - click.echo(self.sink.get_report().as_string()) - global_warnings = get_global_warnings() - if len(global_warnings) > 0: - click.secho("Global Warnings:", bold=True) - click.echo(global_warnings) - click.echo() workunits_produced = self.sink.get_report().total_records_written + if ( + not workunits_produced + and not currently_running + and self.final_status == PipelineStatus.PIPELINE_ERROR + ): + # If the pipeline threw an uncaught exception before doing anything, printing + # out the report would just be annoying. + pass + else: + click.echo() + click.secho("Cli report:", bold=True) + click.secho(self.cli_report.as_string()) + click.secho(f"Source ({self.source_type}) report:", bold=True) + click.echo(self.source.get_report().as_string()) + click.secho(f"Sink ({self.sink_type}) report:", bold=True) + click.echo(self.sink.get_report().as_string()) + global_warnings = get_global_warnings() + if len(global_warnings) > 0: + click.secho("Global Warnings:", bold=True) + click.echo(global_warnings) + click.echo() + duration_message = f"in {humanfriendly.format_timespan(self.source.get_report().running_time)}." if currently_running: message_template = f"⏳ Pipeline running {{status}} so far; produced {workunits_produced} events {duration_message}"