From e1415a5c8198f0f7a60dc9f94ea1fd364001c8a0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 20 Dec 2022 22:13:11 -0500 Subject: [PATCH] feat(ingest): add failure/warning counts to ingest_stats (#6823) --- .../src/datahub/ingestion/run/pipeline.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 89aed12e991c19..7e2fad01655657 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -463,6 +463,11 @@ def raise_from_status(self, raise_warnings: bool = False) -> None: ) def log_ingestion_stats(self) -> None: + source_failures = self._approx_all_vals(self.source.get_report().failures) + source_warnings = self._approx_all_vals(self.source.get_report().warnings) + sink_failures = len(self.sink.get_report().failures) + sink_warnings = len(self.sink.get_report().warnings) + telemetry.telemetry_instance.ping( "ingest_stats", { @@ -471,6 +476,12 @@ def log_ingestion_stats(self) -> None: "records_written": stats.discretize( self.sink.get_report().total_records_written ), + "source_failures": stats.discretize(source_failures), + "source_warnings": stats.discretize(source_warnings), + "sink_failures": stats.discretize(sink_failures), + "sink_warnings": stats.discretize(sink_warnings), + "failures": stats.discretize(source_failures + sink_failures), + "warnings": stats.discretize(source_warnings + sink_warnings), }, self.ctx.graph, )