Skip to content

Commit

Permalink
fix #18278 "Pipeline ingested from Airflow are now not limited to 100" (
Browse files Browse the repository at this point in the history
  • Loading branch information
akashverma0786 authored Nov 19, 2024
1 parent 12945c6 commit 43bbee9
Showing 1 changed file with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,32 +357,42 @@ def get_pipelines_list(self) -> Iterable[AirflowDagDetails]:
).filter(
DagModel.is_paused == False # pylint: disable=singleton-comparison
)
for serialized_dag in session_query.yield_per(100):
try:
data = serialized_dag[1]["dag"]
dag = AirflowDagDetails(
dag_id=serialized_dag[0],
fileloc=serialized_dag[2],
data=AirflowDag.model_validate(serialized_dag[1]),
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=list(
map(self._extract_serialized_task, data.get("tasks", []))
),
schedule_interval=get_schedule_interval(data),
owner=self.fetch_dag_owners(data),
)
limit = 100 # Number of records per batch
offset = 0 # Start

while True:
paginated_query = session_query.limit(limit).offset(offset)
results = paginated_query.all()
if not results:
break
for serialized_dag in results:
try:
data = serialized_dag[1]["dag"]
dag = AirflowDagDetails(
dag_id=serialized_dag[0],
fileloc=serialized_dag[2],
data=AirflowDag.model_validate(serialized_dag[1]),
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=list(
map(self._extract_serialized_task, data.get("tasks", []))
),
schedule_interval=get_schedule_interval(data),
owner=self.fetch_dag_owners(data),
)

yield dag
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error building pydantic model for {serialized_dag} - {err}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Wild error yielding dag {serialized_dag} - {err}")
yield dag
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error building pydantic model for {serialized_dag} - {err}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Wild error yielding dag {serialized_dag} - {err}")

offset += limit

def fetch_dag_owners(self, data) -> Optional[str]:
"""
Expand Down

0 comments on commit 43bbee9

Please sign in to comment.