Skip to content

Commit

Permalink
Remove ds/ts etc from Template/run context when no logical date i…
Browse files Browse the repository at this point in the history
…s defined (#46522)

This is part of the AIP-83 amendments.
  • Loading branch information
ashb authored Feb 6, 2025
1 parent 0585b29 commit 993a009
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class DagRun(StrictBaseModel):
dag_id: str
run_id: str

logical_date: UtcDateTime
logical_date: UtcDateTime | None
data_interval_start: UtcDateTime | None
data_interval_end: UtcDateTime | None
run_after: UtcDateTime
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,8 @@ def get_previous_dagrun(
:param session: SQLAlchemy ORM Session
:param state: the dag run state
"""
if dag_run.logical_date is None:
return None
filters = [
DagRun.dag_id == dag_run.dag_id,
DagRun.logical_date < dag_run.logical_date,
Expand Down
25 changes: 16 additions & 9 deletions docs/apache-airflow/templates-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,9 @@ Variable Type Description
``{{ logical_date }}`` `pendulum.DateTime`_ | A date-time that logically identifies the current DAG run. This value does not contain any semantics, but is simply a value for identification.
| Use ``data_interval_start`` and ``data_interval_end`` instead if you want a value that has real-world semantics,
| such as to get a slice of rows from the database based on timestamps.
``{{ ds }}`` str | The DAG run's logical date as ``YYYY-MM-DD``.
| Same as ``{{ logical_date | ds }}``.
``{{ ds_nodash }}`` str Same as ``{{ logical_date | ds_nodash }}``.
``{{ exception }}`` None | str | | Error occurred while running task instance.
Exception |
KeyboardInterrupt |
``{{ ts }}`` str | Same as ``{{ logical_date | ts }}``.
| Example: ``2018-01-01T00:00:00+00:00``.
``{{ ts_nodash_with_tz }}`` str | Same as ``{{ logical_date | ts_nodash_with_tz }}``.
| Example: ``20180101T000000+0000``.
``{{ ts_nodash }}`` str | Same as ``{{ logical_date | ts_nodash }}``.
| Example: ``20180101T000000``.
``{{ prev_data_interval_start_success }}`` `pendulum.DateTime`_ | Start of the data interval of the prior successful :class:`~airflow.models.dagrun.DagRun`.
| ``None`` | Added in version 2.2.
``{{ prev_data_interval_end_success }}`` `pendulum.DateTime`_ | End of the data interval of the prior successful :class:`~airflow.models.dagrun.DagRun`.
Expand Down Expand Up @@ -92,6 +83,22 @@ Variable Type Description
| Added in version 2.4.
=========================================== ===================== ===================================================================

The following are only available when the DagRun has a ``logical_date``

=========================================== ===================== ===================================================================
Variable Type Description
=========================================== ===================== ===================================================================
``{{ ds }}`` str | The DAG run's logical date as ``YYYY-MM-DD``.
| Same as ``{{ logical_date | ds }}``.
``{{ ds_nodash }}`` str Same as ``{{ logical_date | ds_nodash }}``.
``{{ ts }}`` str | Same as ``{{ logical_date | ts }}``.
| Example: ``2018-01-01T00:00:00+00:00``.
``{{ ts_nodash_with_tz }}`` str | Same as ``{{ logical_date | ts_nodash_with_tz }}``.
| Example: ``20180101T000000+0000``.
``{{ ts_nodash }}`` str | Same as ``{{ logical_date | ts_nodash }}``.
| Example: ``20180101T000000``.
=========================================== ===================== ===================================================================

.. note::

The DAG run's logical date, and values derived from it, such as ``ds`` and
Expand Down
2 changes: 2 additions & 0 deletions newsfragments/42404.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ The shift towards using ``run_id`` as the sole identifier for DAG runs eliminate

- Removed ``logical_date`` arguments from public APIs and Python functions related to DAG run lookups.
- ``run_id`` is now the exclusive identifier for DAG runs in these contexts.
- ``ds``, ``ds_nodash``, ``ts``, ``ts_nodash``, ``ts_nodash_with_tz`` (and ``logical_date``) will no longer exist for non-scheduled DAG runs (i.e. manually triggered runs)
- ``task_instance_key_str`` template variable has changed to use ``run_id``, not the logical_date. This means the value of it will change compared to 2.x, even for old runs

* Types of change

Expand Down
5 changes: 4 additions & 1 deletion scripts/ci/pre_commit/template_context_key_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
CONTEXT_HINT = ROOT_DIR.joinpath("task_sdk", "src", "airflow", "sdk", "definitions", "context.py")
TEMPLATES_REF_RST = ROOT_DIR.joinpath("docs", "apache-airflow", "templates-ref.rst")

# These are only conditionally set
IGNORE = {"ds", "ds_nodash", "ts", "ts_nodash", "ts_nodash_with_tz", "logical_date"}


def _iter_template_context_keys_from_original_return() -> typing.Iterator[str]:
ti_mod = ast.parse(TASKRUNNER_PY.read_text("utf-8"), str(TASKRUNNER_PY))
Expand Down Expand Up @@ -154,7 +157,7 @@ def _compare_keys(retn_keys: set[str], decl_keys: set[str], hint_keys: set[str],
("Context type hint", hint_keys),
("templates-ref", docs_keys),
]
canonical_keys = set.union(*(s for _, s in check_candidates))
canonical_keys = set.union(*(s for _, s in check_candidates)) - IGNORE

def _check_one(identifier: str, keys: set[str]) -> int:
if missing := canonical_keys.difference(keys):
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class DagRun(BaseModel):
)
dag_id: Annotated[str, Field(title="Dag Id")]
run_id: Annotated[str, Field(title="Run Id")]
logical_date: Annotated[datetime, Field(title="Logical Date")]
logical_date: Annotated[datetime | None, Field(title="Logical Date")]
data_interval_start: Annotated[datetime | None, Field(title="Data Interval Start")] = None
data_interval_end: Annotated[datetime | None, Field(title="Data Interval End")] = None
run_after: Annotated[datetime, Field(title="Run After")]
Expand Down
39 changes: 23 additions & 16 deletions task_sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,12 @@ def get_template_context(self) -> Context:
if self._ti_context_from_server:
dag_run = self._ti_context_from_server.dag_run

logical_date = dag_run.logical_date
ds = logical_date.strftime("%Y-%m-%d")
ds_nodash = ds.replace("-", "")
ts = logical_date.isoformat()
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
ts_nodash_with_tz = ts.replace("-", "").replace(":", "")

context_from_server: Context = {
# TODO: Assess if we need to pass these through timezone.coerce_datetime
"dag_run": dag_run,
"data_interval_end": dag_run.data_interval_end,
"data_interval_start": dag_run.data_interval_start,
"logical_date": logical_date,
"ds": ds,
"ds_nodash": ds_nodash,
"task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{ds_nodash}",
"ts": ts,
"ts_nodash": ts_nodash,
"ts_nodash_with_tz": ts_nodash_with_tz,
"task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{dag_run.run_id}",
"prev_data_interval_start_success": lazy_object_proxy.Proxy(
lambda: get_previous_dagrun_success(self.id).data_interval_start
),
Expand All @@ -160,6 +147,24 @@ def get_template_context(self) -> Context:
}
context.update(context_from_server)

if logical_date := dag_run.logical_date:
ds = logical_date.strftime("%Y-%m-%d")
ds_nodash = ds.replace("-", "")
ts = logical_date.isoformat()
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
ts_nodash_with_tz = ts.replace("-", "").replace(":", "")
context.update(
{
"logical_date": logical_date,
"ds": ds,
"ds_nodash": ds_nodash,
"task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{ds_nodash}",
"ts": ts,
"ts_nodash": ts_nodash,
"ts_nodash_with_tz": ts_nodash_with_tz,
}
)

return context

def render_templates(
Expand Down Expand Up @@ -500,7 +505,7 @@ def _process_outlets(context: Context, outlets: list[AssetProfile]):
return task_outlets, outlet_events


def run(ti: RuntimeTaskInstance, log: Logger):
def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None:
"""Run the task in this process."""
from airflow.exceptions import (
AirflowException,
Expand Down Expand Up @@ -534,7 +539,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
return
return msg
context = ti.get_template_context()
with set_current_context(context):
jinja_env = ti.task.dag.get_template_env()
Expand Down Expand Up @@ -620,6 +625,8 @@ def run(ti: RuntimeTaskInstance, log: Logger):
finally:
if msg:
SUPERVISOR_COMMS.send_request(msg=msg, log=log)
# Return the message to make unit tests easier too
return msg


def _execute_task(context: Context, task: BaseOperator):
Expand Down
3 changes: 2 additions & 1 deletion task_sdk/src/airflow/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ class DagRunProtocol(Protocol):

dag_id: str
run_id: str
logical_date: datetime
logical_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
start_date: datetime
end_date: datetime | None
run_type: Any
run_after: datetime
conf: dict[str, Any] | None
external_trigger: bool

Expand Down
36 changes: 36 additions & 0 deletions task_sdk/tests/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import contextlib
import json
import os
import uuid
Expand Down Expand Up @@ -1088,6 +1089,41 @@ def execute(self, context):
"a_simple_list": ["one", "two", "three", "actually one value is made per line"],
}

@pytest.mark.parametrize(
("logical_date", "check"),
(
pytest.param(None, pytest.raises(KeyError), id="no-logical-date"),
pytest.param(timezone.datetime(2024, 12, 3), contextlib.nullcontext(), id="with-logical-date"),
),
)
def test_no_logical_date_key_error(
self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti, logical_date, check
):
"""Test that a params can be retrieved from context."""

class CustomOperator(BaseOperator):
def execute(self, context):
for key in ("ds", "ds_nodash", "ts", "ts_nodash", "ts_nodash_with_tz"):
with check:
context[key]
# We should always be able to get this
assert context["task_instance_key_str"]

task = CustomOperator(task_id="print-params")
runtime_ti = create_runtime_ti(
dag_id="basic_param_dag",
logical_date=logical_date,
task=task,
conf={
"x": 3,
"text": "Hello World!",
"flag": False,
"a_simple_list": ["one", "two", "three", "actually one value is made per line"],
},
)
msg = run(runtime_ti, log=mock.MagicMock())
assert isinstance(msg, SucceedTask)


class TestXComAfterTaskExecution:
@pytest.mark.parametrize(
Expand Down

0 comments on commit 993a009

Please sign in to comment.