Skip to content

Commit

Permalink
Remove task to interrupt sessions as this is now handled by session e…
Browse files Browse the repository at this point in the history
…xpiration fires
  • Loading branch information
rowanseymour committed Feb 26, 2025
1 parent 4f996f4 commit 1cd7b39
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 107 deletions.
41 changes: 3 additions & 38 deletions temba/flows/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import itertools
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone as tzone
from datetime import datetime, timezone as tzone

from django_redis import get_redis_connection

from django.conf import settings
from django.utils import timezone
from django.utils.timesince import timesince

from temba import mailroom
from temba.utils.crons import cron_task
from temba.utils.models import delete_in_batches

Expand All @@ -27,49 +23,18 @@ def squash_flow_counts():

@cron_task()
def trim_flow_revisions():
start = timezone.now()

# get when the last time we trimmed was
r = get_redis_connection()
last_trim = r.get(FlowRevision.LAST_TRIM_KEY)
if not last_trim:
last_trim = 0

last_trim = datetime.utcfromtimestamp(int(last_trim)).astimezone(tzone.utc)
count = FlowRevision.trim(last_trim)
num_trimmed = FlowRevision.trim(last_trim)

r.set(FlowRevision.LAST_TRIM_KEY, int(timezone.now().timestamp()))

elapsed = timesince(start)
logger.info(f"Trimmed {count} flow revisions since {last_trim} in {elapsed}")


@cron_task()
def interrupt_flow_sessions():
"""
Interrupt old flow sessions which have exceeded the absolute time limit
"""

before = timezone.now() - timedelta(days=89)
num_interrupted = 0

# get old sessions and organize into lists by org
by_org = defaultdict(list)
sessions = (
FlowSession.objects.filter(created_on__lte=before, status=FlowSession.STATUS_WAITING)
.only("id", "contact")
.select_related("contact__org")
.order_by("id")
)
for session in sessions:
by_org[session.contact.org].append(session)

for org, sessions in by_org.items():
for batch in itertools.batched(sessions, 100):
mailroom.queue_interrupt(org, sessions=batch)
num_interrupted += len(sessions)

return {"interrupted": num_interrupted}
return {"trimmed": num_trimmed}


@cron_task()
Expand Down
45 changes: 3 additions & 42 deletions temba/flows/tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,14 @@
from datetime import datetime, timedelta, timezone as tzone
from datetime import datetime, timezone as tzone

from django.utils import timezone

from temba.flows.models import FlowRun, FlowSession
from temba.flows.tasks import interrupt_flow_sessions, trim_flow_sessions
from temba.tests import TembaTest, matchers, mock_mailroom
from temba.flows.tasks import trim_flow_sessions
from temba.tests import TembaTest
from temba.utils.uuid import uuid4


class FlowSessionTest(TembaTest):
@mock_mailroom
def test_interrupt(self, mr_mocks):
org1_contact = self.create_contact("Ben", phone="+250788123123")
org2_contact = self.create_contact("Ben", phone="+250788123123", org=self.org2)

def create_session(contact, created_on: datetime):
return FlowSession.objects.create(
uuid=uuid4(),
contact=contact,
created_on=created_on,
output_url="http://sessions.com/123.json",
status=FlowSession.STATUS_WAITING,
)

create_session(org1_contact, timezone.now() - timedelta(days=88))
session2 = create_session(org1_contact, timezone.now() - timedelta(days=90))
session3 = create_session(org1_contact, timezone.now() - timedelta(days=91))
session4 = create_session(org2_contact, timezone.now() - timedelta(days=92))

interrupt_flow_sessions()

self.assertEqual(
[
{
"type": "interrupt_sessions",
"org_id": self.org.id,
"queued_on": matchers.Datetime(),
"task": {"session_ids": [session2.id, session3.id]},
},
{
"type": "interrupt_sessions",
"org_id": self.org2.id,
"queued_on": matchers.Datetime(),
"task": {"session_ids": [session4.id]},
},
],
mr_mocks.queued_batch_tasks,
)

def test_trim(self):
contact = self.create_contact("Ben Haggerty", phone="+250788123123")
flow = self.create_flow("Test")
Expand Down
6 changes: 2 additions & 4 deletions temba/mailroom/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,18 @@ def queue_interrupt_channel(org, channel):
_queue_batch_task(org.id, BatchTask.INTERRUPT_CHANNEL, task, HIGH_PRIORITY)


def queue_interrupt(org, *, contacts=None, flow=None, sessions=None):
def queue_interrupt(org, *, contacts=None, flow=None):
"""
Queues an interrupt task for handling by mailroom
"""

assert contacts or flow or sessions, "must specify either a set of contacts or a flow or sessions"
assert contacts or flow, "must specify either a set of contacts or a flow"

task = {}
if contacts:
task["contact_ids"] = [c.id for c in contacts]
if flow:
task["flow_ids"] = [flow.id]
if sessions:
task["session_ids"] = [s.id for s in sessions]

_queue_batch_task(org.id, BatchTask.INTERRUPT_SESSIONS, task, HIGH_PRIORITY)

Expand Down
23 changes: 1 addition & 22 deletions temba/mailroom/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

from django_redis import get_redis_connection

from django.utils import timezone

from temba.flows.models import FlowSession, FlowStart
from temba.flows.models import FlowStart
from temba.mailroom.queue import queue_interrupt
from temba.tests import TembaTest, matchers
from temba.utils import json
from temba.utils.uuid import uuid4


class MailroomQueueTest(TembaTest):
Expand Down Expand Up @@ -104,24 +101,6 @@ def test_queue_interrupt_by_flow(self):
{"type": "interrupt_sessions", "task": {"flow_ids": [flow.id]}, "queued_on": matchers.ISODate()},
)

def test_queue_interrupt_by_session(self):
contact = self.create_contact("Bob", phone="+1234567890")
session = FlowSession.objects.create(
uuid=uuid4(),
contact=contact,
status=FlowSession.STATUS_WAITING,
output_url="http://sessions.com/123.json",
created_on=timezone.now(),
)

queue_interrupt(self.org, sessions=[session])

self.assert_org_queued(self.org)
self.assert_queued_batch_task(
self.org,
{"type": "interrupt_sessions", "task": {"session_ids": [session.id]}, "queued_on": matchers.ISODate()},
)

def assert_org_queued(self, org):
r = get_redis_connection()

Expand Down
1 change: 0 additions & 1 deletion temba/settings_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@
"delete-released-orgs": {"task": "delete_released_orgs", "schedule": crontab(hour=4, minute=0)},
"expire-invitations": {"task": "expire_invitations", "schedule": crontab(hour=0, minute=10)},
"fail-old-android-messages": {"task": "fail_old_android_messages", "schedule": crontab(hour=0, minute=0)},
"interrupt-flow-sessions": {"task": "interrupt_flow_sessions", "schedule": crontab(hour=23, minute=30)},
"refresh-whatsapp-tokens": {"task": "refresh_whatsapp_tokens", "schedule": crontab(hour=6, minute=0)},
"refresh-templates": {"task": "refresh_templates", "schedule": timedelta(seconds=900)},
"send-notification-emails": {"task": "send_notification_emails", "schedule": timedelta(seconds=60)},
Expand Down

0 comments on commit 1cd7b39

Please sign in to comment.