Skip to content

Commit

Permalink
Merge branch 'main' into allauth-progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ericnewcomer committed Feb 26, 2025
2 parents e3a2874 + 8f4cc9e commit 421a424
Show file tree
Hide file tree
Showing 27 changed files with 897 additions and 889 deletions.
1,323 changes: 670 additions & 653 deletions CHANGELOG.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "temba"
version = "10.1.71"
version = "10.1.74"
description = "Hosted service for visually building interactive messaging applications"
authors = [
{"name" = "Nyaruka", "email" = "[email protected]"}
Expand Down
2 changes: 1 addition & 1 deletion temba/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "10.1.71"
__version__ = "10.1.74"

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
Expand Down
73 changes: 73 additions & 0 deletions temba/contacts/migrations/0205_create_session_expires_fires.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Generated by Django 5.1.4 on 2025-02-26 16:27

import random
from datetime import timedelta

from django.db import migrations
from django.db.models import Exists, OuterRef


def create_session_expires_fires(apps, schema_editor):
Contact = apps.get_model("contacts", "Contact")
ContactFire = apps.get_model("contacts", "ContactFire")
FlowSession = apps.get_model("flows", "FlowSession")

num_created, num_skipped = 0, 0

while True:
# find contacts with waiting sessions that don't have a corresponding session expiration fire
batch = list(
Contact.objects.filter(current_session_uuid__isnull=False)
.filter(~Exists(ContactFire.objects.filter(contact=OuterRef("pk"), fire_type="S")))
.only("id", "org_id", "current_session_uuid")[:1000]
)
if not batch:
break

sessions = FlowSession.objects.filter(uuid__in=[c.current_session_uuid for c in batch], status="W").only(
"uuid", "created_on"
)
created_on_by_uuid = {s.uuid: s.created_on for s in sessions}

to_create = []
for contact in batch:
session_created_on = created_on_by_uuid.get(contact.current_session_uuid)
if session_created_on:
to_create.append(
ContactFire(
org_id=contact.org_id,
contact=contact,
fire_type="S",
scope="",
fire_on=session_created_on + timedelta(days=30) + timedelta(seconds=random.randint(0, 86400)),
session_uuid=contact.current_session_uuid,
)
)
else:
contact.current_session_uuid = None
contact.current_flow = None
contact.save(update_fields=("current_session_uuid", "current_flow"))

num_skipped += 1

if to_create:
ContactFire.objects.bulk_create(to_create)
num_created += len(to_create)
print(f"Created {num_created} session expiration fires ({num_skipped} skipped)")


def apply_manual(): # pragma: no cover
from django.apps import apps

create_session_expires_fires(apps, None)


class Migration(migrations.Migration):

dependencies = [
("contacts", "0204_alter_contactfire_fire_type"),
]

operations = [
migrations.RunPython(create_session_expires_fires, migrations.RunPython.noop),
]
2 changes: 1 addition & 1 deletion temba/contacts/tests/test_contactcrudl.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ def test_history(self):

# fetch our contact history
self.login(self.admin)
with self.assertNumQueries(23):
with self.assertNumQueries(22):
response = self.client.get(history_url + "?limit=100")

# history should include all messages in the last 90 days, the channel event, the call, and the flow run
Expand Down
88 changes: 88 additions & 0 deletions temba/contacts/tests/test_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from datetime import timedelta

from django.utils import timezone

from temba.contacts.models import ContactFire
from temba.flows.models import FlowSession
from temba.tests import MigrationTest
from temba.utils.uuid import uuid4


class CreateSessionExpiresFiresTest(MigrationTest):
app = "contacts"
migrate_from = "0204_alter_contactfire_fire_type"
migrate_to = "0205_create_session_expires_fires"

def setUpBeforeMigration(self, apps):
def create_contact_and_sessions(name, phone, current_session_uuid):
contact = self.create_contact(name, phone=phone, current_session_uuid=current_session_uuid)
FlowSession.objects.create(
uuid=uuid4(),
contact=contact,
status=FlowSession.STATUS_COMPLETED,
output_url="http://sessions.com/123.json",
created_on=timezone.now(),
ended_on=timezone.now(),
)
FlowSession.objects.create(
uuid=current_session_uuid,
contact=contact,
status=FlowSession.STATUS_WAITING,
output_url="http://sessions.com/123.json",
created_on=timezone.now(),
)
return contact

# contacts with waiting sessions but no session expiration fire
self.contact1 = create_contact_and_sessions("Ann", "+1234567001", "a0e707ef-ae06-4e39-a9b1-49eed0273dae")
self.contact2 = create_contact_and_sessions("Bob", "+1234567002", "4a675e5d-ebc1-4fe7-be74-0450f550f8ee")

# contact with waiting session and already has a session expiration fire
self.contact3 = create_contact_and_sessions("Cat", "+1234567003", "a83a82f4-6a25-4662-a8e1-b53ee7d259a2")
ContactFire.objects.create(
org=self.org,
contact=self.contact3,
fire_type="S",
scope="",
fire_on=timezone.now() + timedelta(days=30),
session_uuid="a83a82f4-6a25-4662-a8e1-b53ee7d259a2",
)

# contact with no waiting session
self.contact4 = self.create_contact("Dan", phone="+1234567004")

# contact with session mismatch
self.contact5 = self.create_contact(
"Dan", phone="+1234567004", current_session_uuid="ffca65c7-42ac-40cd-bef0-63aedc099ec9"
)
FlowSession.objects.create(
uuid="80466ed4-de5c-49e8-acad-2432b4e9cdf9",
contact=self.contact5,
status=FlowSession.STATUS_WAITING,
output_url="http://sessions.com/123.json",
created_on=timezone.now(),
)

def test_migration(self):
def assert_session_expire(contact):
self.assertTrue(contact.fires.exists())

session = contact.sessions.filter(status="W").get()
fire = contact.fires.get()

self.assertEqual(fire.org, contact.org)
self.assertEqual(fire.fire_type, "S")
self.assertEqual(fire.scope, "")
self.assertGreaterEqual(fire.fire_on, session.created_on + timedelta(days=30))
self.assertLess(fire.fire_on, session.created_on + timedelta(days=31))
self.assertEqual(fire.session_uuid, session.uuid)

assert_session_expire(self.contact1)
assert_session_expire(self.contact2)
assert_session_expire(self.contact3)

self.assertFalse(self.contact4.fires.exists())
self.assertFalse(self.contact5.fires.exists())

self.contact5.refresh_from_db()
self.assertIsNone(self.contact5.current_session_uuid)
4 changes: 2 additions & 2 deletions temba/flows/management/commands/undo_footgun.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ def handle(self, start_id: int, event_types: list, dry_run: bool, quiet: bool, *

def undo_for_batch(self, runs: list, undoers: dict, dry_run: bool):
contact_ids = {r.contact_id for r in runs}
session_ids = {r.session_id for r in runs}
session_uuids = {r.session_uuid for r in runs}

if undoers:
contacts_by_uuid = {str(c.uuid): c for c in Contact.objects.filter(id__in=contact_ids)}

for session in FlowSession.objects.filter(id__in=session_ids):
for session in FlowSession.objects.filter(uuid__in=session_uuids):
contact = contacts_by_uuid[str(session.contact.uuid)]
for run in reversed(session.output_json["runs"]):
for event in reversed(run["events"]):
Expand Down
2 changes: 1 addition & 1 deletion temba/flows/migrations/0374_backfill_run_session_uuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.db import migrations, transaction


def backfill_run_session_uuid(apps, schema_editor):
def backfill_run_session_uuid(apps, schema_editor): # pragma: no cover
FlowSession = apps.get_model("flows", "FlowSession")
FlowRun = apps.get_model("flows", "FlowRun")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 5.1.4 on 2025-02-26 20:32

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("flows", "0378_alter_flowrun_session_alter_flowsession_status"),
]

operations = [
migrations.RemoveConstraint(
model_name="flowrun",
name="flows_run_active_or_waiting_has_session",
),
]
5 changes: 0 additions & 5 deletions temba/flows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,11 +1207,6 @@ class Meta:
models.Index(name="flowruns_by_session", fields=("session_uuid",), condition=Q(status__in=("A", "W"))),
]
constraints = [
# all active/waiting runs must have a session
models.CheckConstraint(
check=~Q(status__in=("A", "W")) | Q(session__isnull=False),
name="flows_run_active_or_waiting_has_session",
),
# all non-active/waiting runs must have an exited_on
models.CheckConstraint(
check=Q(status__in=("A", "W")) | Q(exited_on__isnull=False),
Expand Down
41 changes: 3 additions & 38 deletions temba/flows/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import itertools
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone as tzone
from datetime import datetime, timezone as tzone

from django_redis import get_redis_connection

from django.conf import settings
from django.utils import timezone
from django.utils.timesince import timesince

from temba import mailroom
from temba.utils.crons import cron_task
from temba.utils.models import delete_in_batches

Expand All @@ -27,49 +23,18 @@ def squash_flow_counts():

@cron_task()
def trim_flow_revisions():
start = timezone.now()

# get when the last time we trimmed was
r = get_redis_connection()
last_trim = r.get(FlowRevision.LAST_TRIM_KEY)
if not last_trim:
last_trim = 0

last_trim = datetime.utcfromtimestamp(int(last_trim)).astimezone(tzone.utc)
count = FlowRevision.trim(last_trim)
num_trimmed = FlowRevision.trim(last_trim)

r.set(FlowRevision.LAST_TRIM_KEY, int(timezone.now().timestamp()))

elapsed = timesince(start)
logger.info(f"Trimmed {count} flow revisions since {last_trim} in {elapsed}")


@cron_task()
def interrupt_flow_sessions():
"""
Interrupt old flow sessions which have exceeded the absolute time limit
"""

before = timezone.now() - timedelta(days=89)
num_interrupted = 0

# get old sessions and organize into lists by org
by_org = defaultdict(list)
sessions = (
FlowSession.objects.filter(created_on__lte=before, status=FlowSession.STATUS_WAITING)
.only("id", "contact")
.select_related("contact__org")
.order_by("id")
)
for session in sessions:
by_org[session.contact.org].append(session)

for org, sessions in by_org.items():
for batch in itertools.batched(sessions, 100):
mailroom.queue_interrupt(org, sessions=batch)
num_interrupted += len(sessions)

return {"interrupted": num_interrupted}
return {"trimmed": num_trimmed}


@cron_task()
Expand Down
9 changes: 1 addition & 8 deletions temba/flows/tests/test_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,15 @@ class FlowActivityCountTest(TembaTest):
def test_node_counts(self):
flow = self.create_flow("Test 1")
contact = self.create_contact("Bob", phone="+1234567890")
session = FlowSession.objects.create(
uuid=uuid4(),
contact=contact,
status=FlowSession.STATUS_WAITING,
output_url="http://sessions.com/123.json",
created_on=timezone.now(),
)

def create_run(status, node_uuid):
return FlowRun.objects.create(
uuid=uuid4(),
org=self.org,
session=session,
flow=flow,
contact=contact,
status=status,
session_uuid="082cb7a8-a8fc-468d-b0a4-06f5a5179e2b",
created_on=timezone.now(),
modified_on=timezone.now(),
exited_on=timezone.now() if status not in ("A", "W") else None,
Expand Down
Loading

0 comments on commit 421a424

Please sign in to comment.