From 5a4d2a1c45f26645b2e9dabef9228f0da2d88402 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 <43964496+vatsrahul1001@users.noreply.github.com> Date: Wed, 22 Jan 2025 19:17:46 +0530 Subject: [PATCH] AIP-84 Handle backfill for DAGs with when depends_on_past is True (#45731) * backfill handle depends on past * fixing backfill model tests * Implementing review comments * fix test * uncomment test commneted incorrectly * implement review comments * handle exceptions better * fixing exceptin handling and tests * remove 422 from router --- .../core_api/routes/public/backfills.py | 38 +++---- airflow/models/backfill.py | 28 ++++-- .../core_api/routes/public/test_backfills.py | 98 ++++++++++++++++++- tests/models/test_backfill.py | 12 ++- 4 files changed, 144 insertions(+), 32 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 80f05631fe99d..78f0391fec975 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -19,6 +19,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status +from fastapi.exceptions import RequestValidationError from sqlalchemy import select, update from airflow.api_fastapi.common.db.common import ( @@ -45,6 +46,8 @@ Backfill, BackfillDagRun, DagNoScheduleException, + InvalidBackfillDirection, + InvalidReprocessBehavior, _create_backfill, _do_dry_run, ) @@ -183,12 +186,7 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse: @backfills_router.post( path="", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - status.HTTP_409_CONFLICT, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), ) def create_backfill( backfill_request: BackfillPostBody, @@ -206,32 +204,29 @@ def create_backfill( reprocess_behavior=backfill_request.reprocess_behavior, ) return BackfillResponse.model_validate(backfill_obj) + except AlreadyRunningBackfill: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"There is already a running backfill for dag {backfill_request.dag_id}", ) - except DagNoScheduleException: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"{backfill_request.dag_id} has no schedule", - ) except DagNotFound: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Could not find dag {backfill_request.dag_id}", ) + except ( + InvalidReprocessBehavior, + InvalidBackfillDirection, + DagNoScheduleException, + ) as e: + raise RequestValidationError(str(e)) @backfills_router.post( path="/dry_run", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - status.HTTP_409_CONFLICT, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), ) def create_backfill_dry_run( body: BackfillPostBody, @@ -252,13 +247,12 @@ def create_backfill_dry_run( backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run] return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run)) + except DagNotFound: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Could not find dag {body.dag_id}", ) - except DagNoScheduleException: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"{body.dag_id} has no schedule", - ) + + except (InvalidReprocessBehavior, InvalidBackfillDirection, DagNoScheduleException) as e: + raise RequestValidationError(str(e)) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 136d6635310f7..1564c4feb59a5 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException): """ +class InvalidBackfillDirection(AirflowException): + """ + Raised when backfill is attempted in reverse order with tasks that depend on past runs. + + :meta private: + """ + + +class InvalidReprocessBehavior(AirflowException): + """ + Raised when a backfill cannot be completed because the reprocess behavior is not valid. + + :meta private: + """ + + class ReprocessBehavior(str, Enum): """ Internal enum for setting reprocess behavior in a backfill. @@ -188,18 +204,16 @@ def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None): - depends_on_past = None depends_on_past = any(x.depends_on_past for x in dag.tasks) if depends_on_past: if reverse is True: - raise ValueError( - "Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True" + raise InvalidBackfillDirection( + "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True." ) if reprocess_behavior in (None, ReprocessBehavior.NONE): - raise ValueError( - "Dag has task for which depends_on_past is true. " - "You must set reprocess behavior to reprocess completed or " - "reprocess failed" + raise InvalidReprocessBehavior( + "DAG has tasks for which depends_on_past=True. " + "You must set reprocess behavior to reprocess completed or reprocess failed." ) diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 0aacf7b0e722e..0a71963c1007f 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -264,9 +264,57 @@ def test_no_schedule_dag(self, session, dag_maker, test_client): url="/public/backfills", json=data, ) - assert response.status_code == 409 + assert response.status_code == 422 assert response.json().get("detail") == f"{dag.dag_id} has no schedule" + @pytest.mark.parametrize( + "repro_act, repro_exp, run_backwards, status_code", + [ + ("none", ReprocessBehavior.NONE, False, 422), + ("completed", ReprocessBehavior.COMPLETED, False, 200), + ("completed", ReprocessBehavior.COMPLETED, True, 422), + ], + ) + def test_create_backfill_with_depends_on_past( + self, repro_act, repro_exp, run_backwards, status_code, session, dag_maker, test_client + ): + with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag: + EmptyOperator(task_id="mytask", depends_on_past=True) + session.query(DagModel).all() + session.commit() + from_date = pendulum.parse("2024-01-01") + from_date_iso = to_iso(from_date) + to_date = pendulum.parse("2024-02-01") + to_date_iso = to_iso(to_date) + max_active_runs = 5 + data = { + "dag_id": dag.dag_id, + "from_date": f"{from_date_iso}", + "to_date": f"{to_date_iso}", + "max_active_runs": max_active_runs, + "run_backwards": run_backwards, + "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": False, + "reprocess_behavior": repro_act, + } + response = test_client.post( + url="/public/backfills", + json=data, + ) + assert response.status_code == status_code + + if response.status_code != 200: + if run_backwards: + assert ( + response.json().get("detail") + == "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True." + ) + else: + assert ( + response.json().get("detail") + == "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed." + ) + class TestCreateBackfillDryRun(TestBackfillEndpoint): @pytest.mark.parametrize( @@ -354,6 +402,54 @@ def test_create_backfill_dry_run( response_json = response.json() assert response_json["backfills"] == expected_dates + @pytest.mark.parametrize( + "repro_act, repro_exp, run_backwards, status_code", + [ + ("none", ReprocessBehavior.NONE, False, 422), + ("completed", ReprocessBehavior.COMPLETED, False, 200), + ("completed", ReprocessBehavior.COMPLETED, True, 422), + ], + ) + def test_create_backfill_dry_run_with_depends_on_past( + self, repro_act, repro_exp, run_backwards, status_code, session, dag_maker, test_client + ): + with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag: + EmptyOperator(task_id="mytask", depends_on_past=True) + session.query(DagModel).all() + session.commit() + from_date = pendulum.parse("2024-01-01") + from_date_iso = to_iso(from_date) + to_date = pendulum.parse("2024-02-01") + to_date_iso = to_iso(to_date) + max_active_runs = 5 + data = { + "dag_id": dag.dag_id, + "from_date": f"{from_date_iso}", + "to_date": f"{to_date_iso}", + "max_active_runs": max_active_runs, + "run_backwards": run_backwards, + "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": False, + "reprocess_behavior": repro_act, + } + response = test_client.post( + url="/public/backfills/dry_run", + json=data, + ) + assert response.status_code == status_code + + if response.status_code != 200: + if run_backwards: + assert ( + response.json().get("detail") + == "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True." + ) + else: + assert ( + response.json().get("detail") + == "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed." + ) + class TestCancelBackfill(TestBackfillEndpoint): def test_cancel_backfill(self, session, test_client): diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index 27be2df009eb3..7b1625e1043ad 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -31,6 +31,8 @@ Backfill, BackfillDagRun, BackfillDagRunExceptionReason, + InvalidBackfillDirection, + InvalidReprocessBehavior, ReprocessBehavior, _create_backfill, ) @@ -74,7 +76,10 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, dag_maker, session): session.commit() cm = nullcontext() if dep_on_past: - cm = pytest.raises(ValueError, match="cannot be run in reverse") + cm = pytest.raises( + InvalidBackfillDirection, + match="Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True.", + ) b = None with cm: b = _create_backfill( @@ -432,7 +437,10 @@ def test_depends_on_past_requires_reprocess_failed(dep_on_past, behavior, dag_ma python_callable=lambda: print, depends_on_past=dep_on_past, ) - raises_cm = pytest.raises(ValueError, match="Dag has task for which depends_on_past is true") + raises_cm = pytest.raises( + InvalidReprocessBehavior, + match="DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed.", + ) null_cm = nullcontext() cm = null_cm if dep_on_past and behavior in (ReprocessBehavior.NONE, None):