Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that we can still render log templates even when a DagRun hasn't yet started. #46720

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.sdk.definitions.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.helpers import parse_template_string, render_template
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState

if TYPE_CHECKING:
from pendulum import DateTime

from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
Expand Down Expand Up @@ -269,30 +266,17 @@ def _render_filename(self, ti: TaskInstance, try_number: int, session=NEW_SESSIO
"""Return the worker log filename."""
ti = _ensure_ti(ti, session)
dag_run = ti.get_dagrun(session=session)

date = dag_run.logical_date or dag_run.run_after
date = date.isoformat()

template = dag_run.get_log_template(session=session).filename
str_tpl, jinja_tpl = parse_template_string(template)
filename = None
if jinja_tpl:
if getattr(ti, "task", None) is not None:
context = ti.get_template_context(session=session)
else:
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
filename = render_template_to_string(jinja_tpl, context)
if filename:
return filename
if str_tpl:
if ti.task is not None and ti.task.dag is not None:
dag = ti.task.dag
# TODO: TaskSDK: why do we need this on the DAG! Where is this render fn called from. Revisit
data_interval = dag.get_run_data_interval(dag_run) # type: ignore[attr-defined]
else:
from airflow.timetables.base import DataInterval
return render_template(jinja_tpl, {"ti": ti, "ts": date, "try_number": try_number}, native=False)

if TYPE_CHECKING:
assert isinstance(dag_run.data_interval_start, DateTime)
assert isinstance(dag_run.data_interval_end, DateTime)
data_interval = DataInterval(dag_run.data_interval_start, dag_run.data_interval_end)
if str_tpl:
data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
if data_interval[0]:
data_interval_start = data_interval[0].isoformat()
else:
Expand All @@ -307,7 +291,7 @@ def _render_filename(self, ti: TaskInstance, try_number: int, session=NEW_SESSIO
run_id=ti.run_id,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
logical_date=ti.get_dagrun().logical_date.isoformat(),
logical_date=date,
try_number=try_number,
)
else:
Expand Down
11 changes: 7 additions & 4 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,14 +520,16 @@ def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, se
assert actual == os.fspath(tmp_path / expected)


@pytest.mark.parametrize("logical_date", ((None), (DEFAULT_DATE)))
class TestFilenameRendering:
def test_python_formatting(self, create_log_template, create_task_instance):
def test_python_formatting(self, create_log_template, create_task_instance, logical_date):
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
run_after=DEFAULT_DATE,
logical_date=logical_date,
)

expected_filename = (
Expand All @@ -538,13 +540,14 @@ def test_python_formatting(self, create_log_template, create_task_instance):
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename

def test_jinja_rendering(self, create_log_template, create_task_instance):
def test_jinja_rendering(self, create_log_template, create_task_instance, logical_date):
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
run_after=DEFAULT_DATE,
logical_date=logical_date,
)

expected_filename = (
Expand Down
47 changes: 34 additions & 13 deletions tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ def __exit__(self, type, value, traceback):
def create_dagrun(self, *, logical_date=None, **kwargs):
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from airflow.utils.types import NOTSET, DagRunType

if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType
Expand All @@ -911,21 +911,26 @@ def create_dagrun(self, *, logical_date=None, **kwargs):
if not isinstance(run_type, DagRunType):
run_type = DagRunType(run_type)

if logical_date is None:
if logical_date is NOTSET:
# Explicit non requested
logical_date = None
elif logical_date is None:
if run_type == DagRunType.MANUAL:
logical_date = self.start_date
else:
logical_date = dag.next_dagrun_info(None).logical_date
logical_date = timezone.coerce_datetime(logical_date)

data_interval = None
try:
data_interval = kwargs["data_interval"]
except KeyError:
if run_type == DagRunType.MANUAL:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
else:
data_interval = dag.infer_automated_data_interval(logical_date)
kwargs["data_interval"] = data_interval
if logical_date is not None:
if run_type == DagRunType.MANUAL:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
else:
data_interval = dag.infer_automated_data_interval(logical_date)
kwargs["data_interval"] = data_interval

if "run_id" not in kwargs:
if "run_type" not in kwargs:
Expand Down Expand Up @@ -1229,9 +1234,13 @@ def create_task_instance(dag_maker: DagMaker, create_dummy_dag: CreateDummyDAG)
Uses ``create_dummy_dag`` to create the dag structure.
"""
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.types import NOTSET, ArgNotSet

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

def maker(
logical_date=None,
logical_date: datetime | None | ArgNotSet = NOTSET,
run_after=None,
dagrun_state=None,
state=None,
run_id=None,
Expand All @@ -1257,7 +1266,12 @@ def maker(
last_heartbeat_at=None,
**kwargs,
) -> TaskInstance:
if logical_date is None:
if run_after is None:
from airflow.utils import timezone

run_after = timezone.utcnow()
if logical_date is NOTSET:
# For now: default to having a logical date if None is not explicitly passed.
from airflow.utils import timezone

logical_date = timezone.utcnow()
Expand All @@ -1278,10 +1292,17 @@ def maker(
trigger_rule=trigger_rule,
**op_kwargs,
)
dagrun_kwargs = {
"logical_date": logical_date,
"state": dagrun_state,
}
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": logical_date,
"run_after": run_after,
"state": dagrun_state,
}
else:
dagrun_kwargs = {
"logical_date": logical_date if logical_date not in (None, NOTSET) else run_after,
"state": dagrun_state,
}
if run_id is not None:
dagrun_kwargs["run_id"] = run_id
if run_type is not None:
Expand Down
Loading