From d193865f5cc164e3b0fa8e3d9d0468bfcfec62c9 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 20 Dec 2022 12:09:57 -0500 Subject: [PATCH 1/2] feat(ingest): add failure/warning counts to ingest_stats --- .../src/datahub/ingestion/run/pipeline.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 89aed12e991c19..dc6321a65fe796 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -463,6 +463,11 @@ def raise_from_status(self, raise_warnings: bool = False) -> None: ) def log_ingestion_stats(self) -> None: + source_failures = self._approx_all_vals(self.source.get_report().failures) + source_warnings = self._approx_all_vals(self.source.get_report().warnings) + sink_failures = self._approx_all_vals(self.sink.get_report().failures) + sink_warnings = self._approx_all_vals(self.sink.get_report().warnings) + telemetry.telemetry_instance.ping( "ingest_stats", { @@ -471,6 +476,12 @@ def log_ingestion_stats(self) -> None: "records_written": stats.discretize( self.sink.get_report().total_records_written ), + "source_failures": stats.discretize(source_failures), + "source_warnings": stats.discretize(source_warnings), + "sink_failures": stats.discretize(sink_failures), + "sink_warnings": stats.discretize(sink_warnings), + "failures": stats.discretize(source_failures + sink_failures), + "warnings": stats.discretize(source_warnings + sink_warnings), }, self.ctx.graph, ) From e28245979aee94f843ac54c09589e18d7c18d4f3 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 20 Dec 2022 13:56:32 -0500 Subject: [PATCH 2/2] fix bug --- metadata-ingestion/src/datahub/ingestion/run/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index dc6321a65fe796..7e2fad01655657 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -465,8 +465,8 @@ def raise_from_status(self, raise_warnings: bool = False) -> None: def log_ingestion_stats(self) -> None: source_failures = self._approx_all_vals(self.source.get_report().failures) source_warnings = self._approx_all_vals(self.source.get_report().warnings) - sink_failures = self._approx_all_vals(self.sink.get_report().failures) - sink_warnings = self._approx_all_vals(self.sink.get_report().warnings) + sink_failures = len(self.sink.get_report().failures) + sink_warnings = len(self.sink.get_report().warnings) telemetry.telemetry_instance.ping( "ingest_stats",