Skip to content

Commit

Permalink
AIP-84 Handle backfill for DAGs with when depends_on_past is True (ap…
Browse files Browse the repository at this point in the history
…ache#45731)

* backfill handle depends on past

* fixing backfill model tests

* Implementing review comments

* fix test

* uncomment test commneted incorrectly

* implement review comments

* handle exceptions better

* fixing exceptin handling and tests

* remove 422 from router
  • Loading branch information
vatsrahul1001 authored and niklasr22 committed Feb 8, 2025
1 parent f79c0ab commit 5ca3689
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 32 deletions.
38 changes: 16 additions & 22 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Annotated

from fastapi import Depends, HTTPException, status
from fastapi.exceptions import RequestValidationError
from sqlalchemy import select, update

from airflow.api_fastapi.common.db.common import (
Expand All @@ -45,6 +46,8 @@
Backfill,
BackfillDagRun,
DagNoScheduleException,
InvalidBackfillDirection,
InvalidReprocessBehavior,
_create_backfill,
_do_dry_run,
)
Expand Down Expand Up @@ -183,12 +186,7 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse:

@backfills_router.post(
path="",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
),
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
)
def create_backfill(
backfill_request: BackfillPostBody,
Expand All @@ -206,32 +204,29 @@ def create_backfill(
reprocess_behavior=backfill_request.reprocess_behavior,
)
return BackfillResponse.model_validate(backfill_obj)

except AlreadyRunningBackfill:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{backfill_request.dag_id} has no schedule",
)

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {backfill_request.dag_id}",
)
except (
InvalidReprocessBehavior,
InvalidBackfillDirection,
DagNoScheduleException,
) as e:
raise RequestValidationError(str(e))


@backfills_router.post(
path="/dry_run",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
),
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
)
def create_backfill_dry_run(
body: BackfillPostBody,
Expand All @@ -252,13 +247,12 @@ def create_backfill_dry_run(
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]

return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {body.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{body.dag_id} has no schedule",
)

except (InvalidReprocessBehavior, InvalidBackfillDirection, DagNoScheduleException) as e:
raise RequestValidationError(str(e))
28 changes: 21 additions & 7 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
"""


class InvalidBackfillDirection(AirflowException):
"""
Raised when backfill is attempted in reverse order with tasks that depend on past runs.
:meta private:
"""


class InvalidReprocessBehavior(AirflowException):
"""
Raised when a backfill cannot be completed because the reprocess behavior is not valid.
:meta private:
"""


class ReprocessBehavior(str, Enum):
"""
Internal enum for setting reprocess behavior in a backfill.
Expand Down Expand Up @@ -188,18 +204,16 @@ def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) ->


def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None):
depends_on_past = None
depends_on_past = any(x.depends_on_past for x in dag.tasks)
if depends_on_past:
if reverse is True:
raise ValueError(
"Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True"
raise InvalidBackfillDirection(
"Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True."
)
if reprocess_behavior in (None, ReprocessBehavior.NONE):
raise ValueError(
"Dag has task for which depends_on_past is true. "
"You must set reprocess behavior to reprocess completed or "
"reprocess failed"
raise InvalidReprocessBehavior(
"DAG has tasks for which depends_on_past=True. "
"You must set reprocess behavior to reprocess completed or reprocess failed."
)


Expand Down
98 changes: 97 additions & 1 deletion tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,57 @@ def test_no_schedule_dag(self, session, dag_maker, test_client):
url="/public/backfills",
json=data,
)
assert response.status_code == 409
assert response.status_code == 422
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"

@pytest.mark.parametrize(
"repro_act, repro_exp, run_backwards, status_code",
[
("none", ReprocessBehavior.NONE, False, 422),
("completed", ReprocessBehavior.COMPLETED, False, 200),
("completed", ReprocessBehavior.COMPLETED, True, 422),
],
)
def test_create_backfill_with_depends_on_past(
self, repro_act, repro_exp, run_backwards, status_code, session, dag_maker, test_client
):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag:
EmptyOperator(task_id="mytask", depends_on_past=True)
session.query(DagModel).all()
session.commit()
from_date = pendulum.parse("2024-01-01")
from_date_iso = to_iso(from_date)
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_iso(to_date)
max_active_runs = 5
data = {
"dag_id": dag.dag_id,
"from_date": f"{from_date_iso}",
"to_date": f"{to_date_iso}",
"max_active_runs": max_active_runs,
"run_backwards": run_backwards,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
"reprocess_behavior": repro_act,
}
response = test_client.post(
url="/public/backfills",
json=data,
)
assert response.status_code == status_code

if response.status_code != 200:
if run_backwards:
assert (
response.json().get("detail")
== "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True."
)
else:
assert (
response.json().get("detail")
== "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed."
)


class TestCreateBackfillDryRun(TestBackfillEndpoint):
@pytest.mark.parametrize(
Expand Down Expand Up @@ -354,6 +402,54 @@ def test_create_backfill_dry_run(
response_json = response.json()
assert response_json["backfills"] == expected_dates

@pytest.mark.parametrize(
"repro_act, repro_exp, run_backwards, status_code",
[
("none", ReprocessBehavior.NONE, False, 422),
("completed", ReprocessBehavior.COMPLETED, False, 200),
("completed", ReprocessBehavior.COMPLETED, True, 422),
],
)
def test_create_backfill_dry_run_with_depends_on_past(
self, repro_act, repro_exp, run_backwards, status_code, session, dag_maker, test_client
):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag:
EmptyOperator(task_id="mytask", depends_on_past=True)
session.query(DagModel).all()
session.commit()
from_date = pendulum.parse("2024-01-01")
from_date_iso = to_iso(from_date)
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_iso(to_date)
max_active_runs = 5
data = {
"dag_id": dag.dag_id,
"from_date": f"{from_date_iso}",
"to_date": f"{to_date_iso}",
"max_active_runs": max_active_runs,
"run_backwards": run_backwards,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
"reprocess_behavior": repro_act,
}
response = test_client.post(
url="/public/backfills/dry_run",
json=data,
)
assert response.status_code == status_code

if response.status_code != 200:
if run_backwards:
assert (
response.json().get("detail")
== "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True."
)
else:
assert (
response.json().get("detail")
== "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed."
)


class TestCancelBackfill(TestBackfillEndpoint):
def test_cancel_backfill(self, session, test_client):
Expand Down
12 changes: 10 additions & 2 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
Backfill,
BackfillDagRun,
BackfillDagRunExceptionReason,
InvalidBackfillDirection,
InvalidReprocessBehavior,
ReprocessBehavior,
_create_backfill,
)
Expand Down Expand Up @@ -74,7 +76,10 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, dag_maker, session):
session.commit()
cm = nullcontext()
if dep_on_past:
cm = pytest.raises(ValueError, match="cannot be run in reverse")
cm = pytest.raises(
InvalidBackfillDirection,
match="Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True.",
)
b = None
with cm:
b = _create_backfill(
Expand Down Expand Up @@ -432,7 +437,10 @@ def test_depends_on_past_requires_reprocess_failed(dep_on_past, behavior, dag_ma
python_callable=lambda: print,
depends_on_past=dep_on_past,
)
raises_cm = pytest.raises(ValueError, match="Dag has task for which depends_on_past is true")
raises_cm = pytest.raises(
InvalidReprocessBehavior,
match="DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed.",
)
null_cm = nullcontext()
cm = null_cm
if dep_on_past and behavior in (ReprocessBehavior.NONE, None):
Expand Down

0 comments on commit 5ca3689

Please sign in to comment.