Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Handle backfill for DAGs with when depends_on_past is True #45731

Merged
merged 18 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Annotated

from fastapi import Depends, HTTPException, status
from fastapi.exceptions import RequestValidationError
from sqlalchemy import select, update

from airflow.api_fastapi.common.db.common import (
Expand All @@ -45,6 +46,8 @@
Backfill,
BackfillDagRun,
DagNoScheduleException,
InvalidBackfillDirection,
InvalidReprocessBehavior,
_create_backfill,
_do_dry_run,
)
Expand Down Expand Up @@ -183,12 +186,7 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse:

@backfills_router.post(
path="",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
),
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
)
def create_backfill(
backfill_request: BackfillPostBody,
Expand All @@ -206,32 +204,29 @@ def create_backfill(
reprocess_behavior=backfill_request.reprocess_behavior,
)
return BackfillResponse.model_validate(backfill_obj)

except AlreadyRunningBackfill:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{backfill_request.dag_id} has no schedule",
)

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {backfill_request.dag_id}",
)
except (
InvalidReprocessBehavior,
InvalidBackfillDirection,
DagNoScheduleException,
) as e:
raise RequestValidationError(str(e))


@backfills_router.post(
path="/dry_run",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
),
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
)
def create_backfill_dry_run(
body: BackfillPostBody,
Expand All @@ -252,13 +247,12 @@ def create_backfill_dry_run(
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]

return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {body.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{body.dag_id} has no schedule",
)

except (InvalidReprocessBehavior, InvalidBackfillDirection, DagNoScheduleException) as e:
raise RequestValidationError(str(e))
28 changes: 21 additions & 7 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
"""


class InvalidBackfillDirection(AirflowException):
"""
Raised when backfill is attempted in reverse order with tasks that depend on past runs.

:meta private:
"""


class InvalidReprocessBehavior(AirflowException):
"""
Raised when a backfill cannot be completed because the reprocess behavior is not valid.

:meta private:
"""


class ReprocessBehavior(str, Enum):
"""
Internal enum for setting reprocess behavior in a backfill.
Expand Down Expand Up @@ -188,18 +204,16 @@ def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) ->


def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None):
depends_on_past = None
depends_on_past = any(x.depends_on_past for x in dag.tasks)
if depends_on_past:
if reverse is True:
raise ValueError(
"Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True"
raise InvalidBackfillDirection(
"Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True."
)
if reprocess_behavior in (None, ReprocessBehavior.NONE):
raise ValueError(
"Dag has task for which depends_on_past is true. "
"You must set reprocess behavior to reprocess completed or "
"reprocess failed"
raise InvalidReprocessBehavior(
"DAG has tasks for which depends_on_past=True. "
"You must set reprocess behavior to reprocess completed or reprocess failed."
)


Expand Down
98 changes: 97 additions & 1 deletion tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,57 @@ def test_no_schedule_dag(self, session, dag_maker, test_client):
url="/public/backfills",
json=data,
)
assert response.status_code == 409
assert response.status_code == 422
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"

@pytest.mark.parametrize(
"repro_act, repro_exp, run_backwards, status_code",
[
("none", ReprocessBehavior.NONE, False, 422),
("completed", ReprocessBehavior.COMPLETED, False, 200),
("completed", ReprocessBehavior.COMPLETED, True, 422),
],
)
def test_create_backfill_with_depends_on_past(
self, repro_act, repro_exp, run_backwards, status_code, session, dag_maker, test_client
):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag:
EmptyOperator(task_id="mytask", depends_on_past=True)
session.query(DagModel).all()
session.commit()
from_date = pendulum.parse("2024-01-01")
from_date_iso = to_iso(from_date)
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_iso(to_date)
max_active_runs = 5
data = {
"dag_id": dag.dag_id,
"from_date": f"{from_date_iso}",
"to_date": f"{to_date_iso}",
"max_active_runs": max_active_runs,
"run_backwards": run_backwards,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
"reprocess_behavior": repro_act,
}
response = test_client.post(
url="/public/backfills",
json=data,
)
assert response.status_code == status_code

if response.status_code != 200:
if run_backwards:
assert (
response.json().get("detail")
== "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True."
)
else:
assert (
response.json().get("detail")
== "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed."
)


class TestCreateBackfillDryRun(TestBackfillEndpoint):
@pytest.mark.parametrize(
Expand Down Expand Up @@ -354,6 +402,54 @@ def test_create_backfill_dry_run(
response_json = response.json()
assert response_json["backfills"] == expected_dates

@pytest.mark.parametrize(
"repro_act, repro_exp, run_backwards, status_code",
[
("none", ReprocessBehavior.NONE, False, 422),
("completed", ReprocessBehavior.COMPLETED, False, 200),
("completed", ReprocessBehavior.COMPLETED, True, 422),
],
)
def test_create_backfill_dry_run_with_depends_on_past(
self, repro_act, repro_exp, run_backwards, status_code, session, dag_maker, test_client
):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag:
EmptyOperator(task_id="mytask", depends_on_past=True)
session.query(DagModel).all()
session.commit()
from_date = pendulum.parse("2024-01-01")
from_date_iso = to_iso(from_date)
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_iso(to_date)
max_active_runs = 5
data = {
"dag_id": dag.dag_id,
"from_date": f"{from_date_iso}",
"to_date": f"{to_date_iso}",
"max_active_runs": max_active_runs,
"run_backwards": run_backwards,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
"reprocess_behavior": repro_act,
}
response = test_client.post(
url="/public/backfills/dry_run",
json=data,
)
assert response.status_code == status_code

if response.status_code != 200:
if run_backwards:
assert (
response.json().get("detail")
== "Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True."
)
else:
assert (
response.json().get("detail")
== "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed."
)


class TestCancelBackfill(TestBackfillEndpoint):
def test_cancel_backfill(self, session, test_client):
Expand Down
12 changes: 10 additions & 2 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
Backfill,
BackfillDagRun,
BackfillDagRunExceptionReason,
InvalidBackfillDirection,
InvalidReprocessBehavior,
ReprocessBehavior,
_create_backfill,
)
Expand Down Expand Up @@ -74,7 +76,10 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, dag_maker, session):
session.commit()
cm = nullcontext()
if dep_on_past:
cm = pytest.raises(ValueError, match="cannot be run in reverse")
cm = pytest.raises(
InvalidBackfillDirection,
match="Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True.",
)
b = None
with cm:
b = _create_backfill(
Expand Down Expand Up @@ -432,7 +437,10 @@ def test_depends_on_past_requires_reprocess_failed(dep_on_past, behavior, dag_ma
python_callable=lambda: print,
depends_on_past=dep_on_past,
)
raises_cm = pytest.raises(ValueError, match="Dag has task for which depends_on_past is true")
raises_cm = pytest.raises(
InvalidReprocessBehavior,
match="DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed.",
)
null_cm = nullcontext()
cm = null_cm
if dep_on_past and behavior in (ReprocessBehavior.NONE, None):
Expand Down