Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#16692 show schedule_interval/timetable description in UI #16931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Added timetable description column

Revision ID: 786e3737b18f
Revises: 5e3ec427fdd3
Create Date: 2021-10-15 13:33:04.754052

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = '786e3737b18f'
down_revision = '5e3ec427fdd3'
branch_labels = None
depends_on = None


def upgrade():
"""Apply Added timetable description column"""
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.add_column(sa.Column('timetable_description', sa.String(length=1000), nullable=True))


def downgrade():
"""Unapply Added timetable description column"""
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.drop_column('timetable_description')
4 changes: 4 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2481,6 +2481,7 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=NEW_SESSION):
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.max_active_runs = dag.max_active_runs
orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)
orm_dag.timetable_description = dag.timetable.description

run: Optional[DagRun] = most_recent_runs.get(dag.dag_id)
if run is None:
Expand Down Expand Up @@ -2727,6 +2728,9 @@ class DagModel(Base):
default_view = Column(String(25))
# Schedule interval
schedule_interval = Column(Interval)
# Timetable/Schedule Interval description
timetable_description = Column(String(1000), nullable=True)

# Tags for view filter
tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag'))

Expand Down
7 changes: 7 additions & 0 deletions airflow/timetables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ def logical_date(self) -> DateTime:
class Timetable(Protocol):
"""Protocol that all Timetable classes are expected to implement."""

description: str = ""
"""Human-readable description of the timetable.

For example, this can produce something like ``'At 21:30, only on Friday'``
from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI.
"""

periodic: bool = True
"""Whether this timetable runs periodically.

Expand Down
14 changes: 14 additions & 0 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
from typing import Any, Dict, Optional, Union

from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter
from dateutil.relativedelta import relativedelta
from pendulum import DateTime
Expand Down Expand Up @@ -130,6 +131,19 @@ def __init__(self, cron: str, timezone: Timezone) -> None:
self._expression = cron_presets.get(cron, cron)
self._timezone = timezone

descriptor = ExpressionDescriptor(
expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
)
try:
# checking for more than 5 parameters in Cron and avoiding evaluation for now,
# as Croniter has inconsistent evaluation with other libraries
if len(croniter(self._expression).expanded) > 5:
raise FormatException()
interval_description = descriptor.get_description()
except (CroniterBadCronError, FormatException, MissingFieldException):
interval_description = ""
self.description = interval_description

@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
from airflow.serialization.serialized_objects import decode_timezone
Expand Down
4 changes: 4 additions & 0 deletions airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class NullTimetable(_TrivialTimetable):
This corresponds to ``schedule_interval=None``.
"""

description: str = "Never, external triggers only"

@property
def summary(self) -> str:
return "None"
Expand All @@ -73,6 +75,8 @@ class OnceTimetable(_TrivialTimetable):
This corresponds to ``schedule_interval="@once"``.
"""

description: str = "Once, as soon as possible"

@property
def summary(self) -> str:
return "@once"
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ <h4 class="pull-right" style="user-select: none;-moz-user-select: auto;">
<a class="label label-default" href="{{ url_for('DagRunModelView.list') }}?_flt_3_dag_id={{ dag.dag_id }}">
Schedule: {{ dag.schedule_interval }}
</a>
{% if dag_model is defined and dag_model and dag_model.timetable_description %}
<span class="material-icons text-muted js-tooltip" aria-hidden="true" data-original-title="Schedule: {{ dag_model.timetable_description|string }}">info</span>
{% endif %}
{% if dag_model is defined and dag_model.next_dagrun is defined %}
<p class="label label-default js-tooltip" style="margin-left: 5px" id="next-run" data-html="true" data-placement="bottom">
Next Run: <time datetime="{{ dag_model.next_dagrun }}">{{ dag_model.next_dagrun }}</time>
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ <h2>{{ page_title }}</h2>
<a class="label label-default schedule" href="{{ url_for('DagRunModelView.list') }}?_flt_3_dag_id={{ dag.dag_id }}" data-dag-id="{{ dag.dag_id }}">
{{ dag.schedule_interval }}
</a>
{% if dag is defined and dag.timetable_description %}
<span class="material-icons text-muted js-tooltip" aria-hidden="true" data-original-title="Schedule: {{ dag.timetable_description|string }}">info</span>
{% endif %}
</td>
<td id="last-run-{{ dag.safe_dag_id }}" class="text-nowrap latest_dag_run">
{{ loading_dots(classes='js-loading-last-run text-muted') }}
Expand Down
40 changes: 40 additions & 0 deletions docs/apache-airflow/howto/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,43 @@ The *Schedule* column would say ``after each workday, at 08:00:00``.
Module :mod:`airflow.timetables.base`
The public interface is heavily documented to explain what should be
implemented by subclasses.


Timetable Description Display in UI
-----------------------------------

You can also provide a description for your Timetable Implementation
by overriding the ``description`` property.
This is especially useful for providing comprehensive description for your implementation in UI.
For our ``SometimeAfterWorkdayTimetable`` class, for example, we could have:

.. code-block:: python

description = "Schedule: after each workday, at {_schedule_at}"

You can also wrap this inside ``__init__``, if you want to derive description.

.. code-block:: python

def __init__(self) -> None:
self.description = "Schedule: after each workday, at {self._schedule_at}"


This is specially useful when you want to provide comprehensive description which is different from ``summary`` property.

So for a DAG declared like this:

.. code-block:: python

with DAG(
timetable=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
) as dag:
...

The *i* icon would show, ``Schedule: after each workday, at 08:00:00``.


.. seealso::
Module :mod:`airflow.timetables.interval`
check ``CronDataIntervalTimetable`` description implementation which provides comprehensive cron description in UI.
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``5e3ec427fdd3`` (head) | ``be2bfac3da23`` | ``2.3.0`` | Increase length of email and username in ``ab_user`` and ``ab_register_user`` table |
| ``786e3737b18f`` (head) | ``5e3ec427fdd3`` | ``2.3.0`` | Add ``timetable_description`` column to DagModel for UI. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``5e3ec427fdd3`` | ``be2bfac3da23`` | ``2.3.0`` | Increase length of email and username in ``ab_user`` and ``ab_register_user`` table |
| | | | to ``256`` characters |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``be2bfac3da23`` | ``7b2661a43ba3`` | ``2.2.3`` | Add has_import_errors column to DagModel |
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ install_requires =
# Required by vendored-in connexion
clickclick>=1.2
colorlog>=4.0.2, <6.0
cron-descriptor>=1.2.24
croniter>=0.3.17, <1.1
cryptography>=0.9.3
dataclasses;python_version<"3.7"
Expand Down
48 changes: 39 additions & 9 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1331,19 +1331,49 @@ def test_get_paused_dag_ids(self):

@parameterized.expand(
[
(None, NullTimetable()),
("@daily", cron_timetable("0 0 * * *")),
("@weekly", cron_timetable("0 0 * * 0")),
("@monthly", cron_timetable("0 0 1 * *")),
("@quarterly", cron_timetable("0 0 1 */3 *")),
("@yearly", cron_timetable("0 0 1 1 *")),
("@once", OnceTimetable()),
(datetime.timedelta(days=1), delta_timetable(datetime.timedelta(days=1))),
(None, NullTimetable(), "Never, external triggers only"),
("@daily", cron_timetable("0 0 * * *"), "At 00:00"),
("@weekly", cron_timetable("0 0 * * 0"), "At 00:00, only on Sunday"),
("@monthly", cron_timetable("0 0 1 * *"), "At 00:00, on day 1 of the month"),
("@quarterly", cron_timetable("0 0 1 */3 *"), "At 00:00, on day 1 of the month, every 3 months"),
("@yearly", cron_timetable("0 0 1 1 *"), "At 00:00, on day 1 of the month, only in January"),
("5 0 * 8 *", cron_timetable("5 0 * 8 *"), "At 00:05, only in August"),
("@once", OnceTimetable(), "Once, as soon as possible"),
(datetime.timedelta(days=1), delta_timetable(datetime.timedelta(days=1)), ""),
("30 21 * * 5 1", cron_timetable("30 21 * * 5 1"), ""),
]
)
def test_timetable_from_schedule_interval(self, schedule_interval, expected_timetable):
def test_timetable_and_description_from_schedule_interval(
self, schedule_interval, expected_timetable, interval_description
):
dag = DAG("test_schedule_interval", schedule_interval=schedule_interval)
assert dag.timetable == expected_timetable
assert dag.schedule_interval == schedule_interval
assert dag.timetable.description == interval_description

@parameterized.expand(
[
(NullTimetable(), "Never, external triggers only"),
(cron_timetable("0 0 * * *"), "At 00:00"),
(cron_timetable("@daily"), "At 00:00"),
(cron_timetable("0 0 * * 0"), "At 00:00, only on Sunday"),
(cron_timetable("@weekly"), "At 00:00, only on Sunday"),
(cron_timetable("0 0 1 * *"), "At 00:00, on day 1 of the month"),
(cron_timetable("@monthly"), "At 00:00, on day 1 of the month"),
(cron_timetable("0 0 1 */3 *"), "At 00:00, on day 1 of the month, every 3 months"),
(cron_timetable("@quarterly"), "At 00:00, on day 1 of the month, every 3 months"),
(cron_timetable("0 0 1 1 *"), "At 00:00, on day 1 of the month, only in January"),
(cron_timetable("@yearly"), "At 00:00, on day 1 of the month, only in January"),
(cron_timetable("5 0 * 8 *"), "At 00:05, only in August"),
(OnceTimetable(), "Once, as soon as possible"),
(delta_timetable(datetime.timedelta(days=1)), ""),
(cron_timetable("30 21 * * 5 1"), ""),
]
)
def test_description_from_timetable(self, timetable, expected_description):
dag = DAG("test_schedule_interval_description", timetable=timetable)
assert dag.timetable == timetable
assert dag.timetable.description == expected_description

def test_create_dagrun_run_id_is_generated(self):
dag = DAG(dag_id="run_id_is_generated")
Expand Down