Skip to content

Commit

Permalink
Fix assignment of unassigned triggers (#21770)
Browse files Browse the repository at this point in the history
Previously, the query returned no alive triggerers which resulted
in all triggers to be assigned to the current triggerer. This works
fine, despite the logic bug, in the case where there's a single
triggerer. But with multiple triggerers, concurrent iterations of
the TriggerJob loop would bounce trigger ownership to whichever
loop ran last.

Addresses #21616

(cherry picked from commit b26d4d8)
  • Loading branch information
jkramer-ginkgo authored and ephraimbuddy committed Mar 22, 2022
1 parent 4a12aa3 commit 6fd4c42
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
10 changes: 7 additions & 3 deletions airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import datetime
from typing import Any, Dict, List, Optional

from sqlalchemy import Column, Integer, String, func
from sqlalchemy import Column, Integer, String, func, or_

from airflow.models.base import Base
from airflow.models.taskinstance import TaskInstance
Expand Down Expand Up @@ -175,7 +175,7 @@ def assign_unassigned(cls, triggerer_id, capacity, session=None):
alive_triggerer_ids = [
row[0]
for row in session.query(BaseJob.id).filter(
BaseJob.end_date is None,
BaseJob.end_date.is_(None),
BaseJob.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=30),
BaseJob.job_type == "TriggererJob",
)
Expand All @@ -184,7 +184,11 @@ def assign_unassigned(cls, triggerer_id, capacity, session=None):
# Find triggers who do NOT have an alive triggerer_id, and then assign
# up to `capacity` of those to us.
trigger_ids_query = (
session.query(cls.id).filter(cls.triggerer_id.notin_(alive_triggerer_ids)).limit(capacity).all()
session.query(cls.id)
# notin_ doesn't find NULL rows
.filter(or_(cls.triggerer_id.is_(None), cls.triggerer_id.notin_(alive_triggerer_ids)))
.limit(capacity)
.all()
)
session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update(
{cls.triggerer_id: triggerer_id},
Expand Down
52 changes: 52 additions & 0 deletions tests/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import datetime

import pytest

from airflow.jobs.triggerer_job import TriggererJob
from airflow.models import TaskInstance, Trigger
from airflow.operators.dummy import DummyOperator
from airflow.triggers.base import TriggerEvent
Expand All @@ -36,9 +39,11 @@ def session():
def clear_db(session):
session.query(TaskInstance).delete()
session.query(Trigger).delete()
session.query(TriggererJob).delete()
yield session
session.query(TaskInstance).delete()
session.query(Trigger).delete()
session.query(TriggererJob).delete()
session.commit()


Expand Down Expand Up @@ -124,3 +129,50 @@ def test_submit_failure(session, create_task_instance):
updated_task_instance = session.query(TaskInstance).one()
assert updated_task_instance.state == State.SCHEDULED
assert updated_task_instance.next_method == "__fail__"


def test_assign_unassigned(session, create_task_instance):
"""
Tests that unassigned triggers of all appropriate states are assigned.
"""
finished_triggerer = TriggererJob(None, heartrate=10, state=State.SUCCESS)
finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1)
session.add(finished_triggerer)
assert not finished_triggerer.is_alive()
healthy_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
session.add(healthy_triggerer)
assert healthy_triggerer.is_alive()
new_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
session.add(new_triggerer)
assert new_triggerer.is_alive()
session.commit()
trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_healthy_triggerer.id = 1
trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_killed_triggerer.id = 2
trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_unassigned_to_triggerer.id = 3
assert trigger_unassigned_to_triggerer.triggerer_id is None
session.add(trigger_on_healthy_triggerer)
session.add(trigger_on_killed_triggerer)
session.add(trigger_unassigned_to_triggerer)
session.commit()
assert session.query(Trigger).count() == 3
Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
session.expire_all()
# Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer
assert (
session.query(Trigger).filter(Trigger.id == trigger_on_killed_triggerer.id).one().triggerer_id
== new_triggerer.id
)
assert (
session.query(Trigger).filter(Trigger.id == trigger_unassigned_to_triggerer.id).one().triggerer_id
== new_triggerer.id
)
# Check that trigger on healthy triggerer still assigned to existing triggerer
assert (
session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id
== healthy_triggerer.id
)

0 comments on commit 6fd4c42

Please sign in to comment.