From 43bbee9075ce21ec171b89bcfb9ac17a9c15bf8f Mon Sep 17 00:00:00 2001 From: Akash Verma <138790903+akashverma0786@users.noreply.github.com> Date: Tue, 19 Nov 2024 18:47:55 +0530 Subject: [PATCH] fix #18278 "Pipeline ingested from Airflow are now not limited to 100" (#18689) --- .../source/pipeline/airflow/metadata.py | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 5f5aa16a6e0a..1aae7862c601 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -357,32 +357,42 @@ def get_pipelines_list(self) -> Iterable[AirflowDagDetails]: ).filter( DagModel.is_paused == False # pylint: disable=singleton-comparison ) - for serialized_dag in session_query.yield_per(100): - try: - data = serialized_dag[1]["dag"] - dag = AirflowDagDetails( - dag_id=serialized_dag[0], - fileloc=serialized_dag[2], - data=AirflowDag.model_validate(serialized_dag[1]), - max_active_runs=data.get("max_active_runs", None), - description=data.get("_description", None), - start_date=data.get("start_date", None), - tasks=list( - map(self._extract_serialized_task, data.get("tasks", [])) - ), - schedule_interval=get_schedule_interval(data), - owner=self.fetch_dag_owners(data), - ) + limit = 100 # Number of records per batch + offset = 0 # Start + + while True: + paginated_query = session_query.limit(limit).offset(offset) + results = paginated_query.all() + if not results: + break + for serialized_dag in results: + try: + data = serialized_dag[1]["dag"] + dag = AirflowDagDetails( + dag_id=serialized_dag[0], + fileloc=serialized_dag[2], + data=AirflowDag.model_validate(serialized_dag[1]), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=list( + map(self._extract_serialized_task, data.get("tasks", [])) + ), + schedule_interval=get_schedule_interval(data), + owner=self.fetch_dag_owners(data), + ) - yield dag - except ValidationError as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Error building pydantic model for {serialized_dag} - {err}" - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.warning(f"Wild error yielding dag {serialized_dag} - {err}") + yield dag + except ValidationError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error building pydantic model for {serialized_dag} - {err}" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Wild error yielding dag {serialized_dag} - {err}") + + offset += limit def fetch_dag_owners(self, data) -> Optional[str]: """