Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Handle backfill for DAGs with when depends_on_past is True #45731

Merged
merged 18 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1285,11 +1285,11 @@ paths:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/backfills/{backfill_id}:
get:
tags:
Expand Down Expand Up @@ -1525,11 +1525,11 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'422':
description: Validation Error
description: Unprocessable Entity
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
$ref: '#/components/schemas/HTTPExceptionResponse'
/public/connections/{connection_id}:
delete:
tags:
Expand Down
40 changes: 30 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
Backfill,
BackfillDagRun,
DagNoScheduleException,
InvalidBackfillDirection,
InvalidReprocessBehavior,
_create_backfill,
_do_dry_run,
)
Expand Down Expand Up @@ -184,10 +186,7 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
@backfills_router.post(
path="",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
[status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY]
),
)
def create_backfill(
Expand All @@ -213,9 +212,21 @@ def create_backfill(
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{backfill_request.dag_id} has no schedule",
)
except InvalidReprocessBehavior:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{backfill_request.dag_id} has tasks for which depends_on_past=True. "
"You must set reprocess behavior to reprocess completed or reprocess failed.",
)

except InvalidBackfillDirection:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True.",
)

except DagNotFound:
raise HTTPException(
Expand All @@ -227,10 +238,7 @@ def create_backfill(
@backfills_router.post(
path="/dry_run",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
[status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY]
),
)
def create_backfill_dry_run(
Expand Down Expand Up @@ -259,6 +267,18 @@ def create_backfill_dry_run(
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{body.dag_id} has no schedule",
)

except InvalidReprocessBehavior:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{body.dag_id} has tasks for which depends_on_past=True. "
"You must set reprocess behavior to reprocess completed or reprocess failed.",
)
except InvalidBackfillDirection:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True.",
)
28 changes: 21 additions & 7 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
"""


class InvalidBackfillDirection(AirflowException):
"""
Raised when backfill is attempted in reverse order with tasks that depend on past runs.

:meta private:
"""


class InvalidReprocessBehavior(AirflowException):
"""
Raised when reprocess behavior is not set for tasks with depends_on_past=True.

:meta private:
"""


class ReprocessBehavior(str, Enum):
"""
Internal enum for setting reprocess behavior in a backfill.
Expand Down Expand Up @@ -188,18 +204,16 @@ def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) ->


def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None):
depends_on_past = None
depends_on_past = any(x.depends_on_past for x in dag.tasks)
if depends_on_past:
if reverse is True:
raise ValueError(
"Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True"
raise InvalidBackfillDirection(
"Backfill cannot be run in reverse when the DAG has tasks where depends_on_past=True"
)
if reprocess_behavior in (None, ReprocessBehavior.NONE):
raise ValueError(
"Dag has task for which depends_on_past is true. "
"You must set reprocess behavior to reprocess completed or "
"reprocess failed"
raise InvalidReprocessBehavior(
"DAG has tasks for which depends_on_past=True. "
"You must set reprocess behavior to reprocess completed or reprocess failed."
)


Expand Down
4 changes: 2 additions & 2 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ export class BackfillService {
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
422: "Unprocessable Entity",
},
});
}
Expand Down Expand Up @@ -950,7 +950,7 @@ export class BackfillService {
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
422: "Unprocessable Entity",
},
});
}
Expand Down
8 changes: 4 additions & 4 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2792,9 +2792,9 @@ export type $OpenApiTs = {
*/
409: HTTPExceptionResponse;
/**
* Validation Error
* Unprocessable Entity
*/
422: HTTPValidationError;
422: HTTPExceptionResponse;
};
};
};
Expand Down Expand Up @@ -2943,9 +2943,9 @@ export type $OpenApiTs = {
*/
409: HTTPExceptionResponse;
/**
* Validation Error
* Unprocessable Entity
*/
422: HTTPValidationError;
422: HTTPExceptionResponse;
};
};
};
Expand Down
Loading