-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #16 from edx/ammar/trigger-segment-events-for-lear…
…ning-time feat: Trigger segment events for learners who have achieved 30 minutes of learning
- Loading branch information
Showing
17 changed files
with
912 additions
and
302 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
201 changes: 201 additions & 0 deletions
201
outcome_surveys/management/commands/send_learning_time_achieved_segment_events.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
""" | ||
Send segment events for passed learners so that Braze can send 90 day follow up email. | ||
""" | ||
|
||
import logging | ||
|
||
import snowflake.connector | ||
from django.conf import settings | ||
from django.core.management.base import BaseCommand | ||
from django.utils import timezone | ||
from snowflake.connector import DictCursor | ||
|
||
from outcome_surveys.constants import SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE | ||
from outcome_surveys.models import LearnerCourseEvent | ||
|
||
try: | ||
from common.djangoapps.track.segment import track | ||
except ImportError: | ||
track = None | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
ENTERPRISE = settings.ENTERPRISE_VSF_UUID | ||
QUERY = f''' | ||
WITH prepared_learners as ( | ||
-- get users who have hit :30 | ||
-- minutes threshold, but haven't been | ||
-- surveyed yet. | ||
SELECT | ||
user_id, | ||
SUM(learning_time_seconds) as learning_time_seconds | ||
FROM | ||
PROD.BUSINESS_INTELLIGENCE.LEARNING_TIME | ||
WHERE | ||
enterprise_customer_uuid='{ENTERPRISE}' | ||
AND | ||
user_id not in ( | ||
-- filter learners who already emitted this event. | ||
SELECT | ||
user_id | ||
FROM | ||
PROD.LMS.OUTCOME_SURVEYS_LEARNERCOURSEEVENT | ||
WHERE | ||
event_type = 'edx.course.learner.achieved.learning.time' | ||
AND | ||
already_sent = TRUE | ||
) | ||
GROUP BY | ||
user_id | ||
HAVING | ||
-- filter learners who haven't hit the threshold. | ||
SUM(learning_time_seconds) >= 1800 | ||
), | ||
last_course as ( | ||
-- Get the last courserun the user interacted with. | ||
-- Since it is captured at the date levels, ties will be common | ||
-- in the dataset. Break the ties by getting the course run | ||
-- with the most engagement on the last day of engagement. | ||
select | ||
user_id, | ||
courserun_key | ||
FROM | ||
PROD.BUSINESS_INTELLIGENCE.LEARNING_TIME | ||
WHERE | ||
enterprise_customer_uuid='{ENTERPRISE}' | ||
QUALIFY | ||
-- get latest date, highest learning on date, per learner. | ||
ROW_NUMBER() OVER (PARTITION by user_id ORDER BY date DESC, learning_time_seconds DESC) = 1 | ||
) | ||
-- join it all together. | ||
select | ||
pl.user_id, | ||
pl.learning_time_seconds, | ||
runs.courserun_key, | ||
runs.course_key, | ||
runs.courserun_title | ||
from | ||
prepared_learners pl | ||
left join | ||
last_course lc | ||
on | ||
pl.user_id = lc.user_id | ||
left join | ||
prod.core.dim_courseruns runs | ||
on | ||
lc.courserun_key = runs.courserun_key | ||
''' | ||
NUM_ROWS_TO_FETCH = 5000 | ||
BULK_CREATE_BATCH_SIZE = 500 | ||
|
||
|
||
class Command(BaseCommand): | ||
""" | ||
Example usage: | ||
$ ./manage.py send_learning_time_achieved_segment_events | ||
""" | ||
|
||
help = 'Send follow up segment events for passed learners.' | ||
|
||
def add_arguments(self, parser): | ||
""" | ||
Entry point to add arguments. | ||
""" | ||
parser.add_argument( | ||
'--dry-run', | ||
action='store_true', | ||
dest='dry_run', | ||
default=False, | ||
help='Dry Run, print log messages without firing the segment event.', | ||
) | ||
|
||
def fetch_data_from_snowflake(self, log_prefix): | ||
""" | ||
Get query results from Snowflake and yield each row. | ||
""" | ||
connection = snowflake.connector.connect( | ||
user=settings.SNOWFLAKE_SERVICE_USER, | ||
password=settings.SNOWFLAKE_SERVICE_USER_PASSWORD, | ||
account='edx.us-east-1', | ||
database='prod' | ||
) | ||
cursor = connection.cursor(DictCursor) | ||
try: | ||
log.info('%s Executing query', log_prefix) | ||
cursor.execute(QUERY) | ||
while True: | ||
log.info('%s Fetching results', log_prefix) | ||
rows = cursor.fetchmany(NUM_ROWS_TO_FETCH) | ||
log.info('%s Rows Fetched: [%s]', log_prefix, len(rows)) | ||
if len(rows) == 0: | ||
break | ||
|
||
yield rows | ||
finally: | ||
log.info('%s Closing cursor', log_prefix) | ||
cursor.close() | ||
log.info('%s Closing connection', log_prefix) | ||
connection.close() | ||
|
||
def handle(self, *args, **options): | ||
""" | ||
Command's entry point. | ||
""" | ||
fire_event = not options['dry_run'] | ||
|
||
log_prefix = '[SEND_LEARNING_TIME_ACHIEVED_SEGMENT_EVENTS]' | ||
if not fire_event: | ||
log_prefix = '[DRY RUN]' | ||
|
||
log.info('%s Command started.', log_prefix) | ||
|
||
user_ids = [] | ||
for rows_chunk in self.fetch_data_from_snowflake(log_prefix): | ||
log.info('%s Processing [%s] rows', log_prefix, len(rows_chunk)) | ||
|
||
triggered_event_records = [] | ||
for row in rows_chunk: | ||
log.info('%s Processing %s', log_prefix, row) | ||
|
||
user_id = row['USER_ID'] | ||
course_key = row['COURSE_KEY'] | ||
courserun_key = row['COURSERUN_KEY'] | ||
course_title = row['COURSERUN_TITLE'] | ||
event_properties = { | ||
'course_key': course_key, | ||
'course_title': course_title, | ||
} | ||
user_ids.append(user_id) | ||
|
||
if fire_event: | ||
track( | ||
user_id, | ||
SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
event_properties | ||
) | ||
triggered_event_records.append( | ||
LearnerCourseEvent( | ||
user_id=user_id, | ||
course_id=courserun_key, | ||
data=event_properties, | ||
follow_up_date=timezone.now().date(), | ||
event_type=SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
already_sent=True, | ||
) | ||
) | ||
log.info( | ||
"%s Segment event triggered. Event: [%s], Properties: [%s]", | ||
log_prefix, | ||
SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
event_properties | ||
) | ||
|
||
if triggered_event_records: | ||
LearnerCourseEvent.objects.bulk_create(triggered_event_records, batch_size=BULK_CREATE_BATCH_SIZE) | ||
|
||
log.info('%s Processing completed of [%s] rows', log_prefix, len(rows_chunk)) | ||
|
||
log.info("%s Command completed. Segment events triggered for user ids: %s", log_prefix, user_ids) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
115 changes: 115 additions & 0 deletions
115
outcome_surveys/management/commands/tests/test_send_learning_time_achieved_segment_events.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
""" | ||
Tests for `send_learning_time_achieved_segment_events` management command. | ||
""" | ||
|
||
from unittest import TestCase, mock | ||
from unittest.mock import call, patch | ||
|
||
import pytest | ||
from django.core.management import call_command | ||
from django.utils import timezone | ||
|
||
from outcome_surveys.constants import SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE | ||
from outcome_surveys.management.commands import send_learning_time_achieved_segment_events | ||
from outcome_surveys.management.commands.tests.mock_responses import MOCK_QUERY_DATA | ||
from outcome_surveys.models import LearnerCourseEvent | ||
|
||
|
||
@pytest.mark.django_db | ||
class TestSendSegmentEventsForPreparedLearnersCommand(TestCase): | ||
""" | ||
Tests `send_learning_time_achieved_segment_events` management command. | ||
""" | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.command = send_learning_time_achieved_segment_events.Command() | ||
|
||
@staticmethod | ||
def generate_query_data(size=2): | ||
""" | ||
Generator to return records for processing. | ||
""" | ||
for i in range(0, len(MOCK_QUERY_DATA), size): | ||
yield MOCK_QUERY_DATA[i:i + size] | ||
|
||
@patch('outcome_surveys.management.commands.send_learning_time_achieved_segment_events.track') | ||
@mock.patch( | ||
'outcome_surveys.management.commands.send_learning_time_achieved_segment_events.Command.fetch_data_from_snowflake' # nopep8 pylint: disable=line-too-long | ||
) | ||
def test_command_dry_run(self, mock_fetch_data_from_snowflake, segment_track_mock): | ||
""" | ||
Verify that management command does not fire any segment event in dry run mode. | ||
""" | ||
mock_fetch_data_from_snowflake.return_value = self.generate_query_data() | ||
mock_path = 'outcome_surveys.management.commands.send_learning_time_achieved_segment_events.log.info' | ||
|
||
with mock.patch(mock_path) as mock_logger: | ||
call_command(self.command, '--dry-run') | ||
segment_track_mock.assert_has_calls([]) | ||
assert LearnerCourseEvent.objects.count() == 0 | ||
|
||
user_ids = [record['USER_ID'] for record in MOCK_QUERY_DATA] | ||
mock_logger.assert_has_calls( | ||
[ | ||
call('%s Command started.', '[DRY RUN]'), | ||
call('%s Processing [%s] rows', '[DRY RUN]', 2), | ||
call('%s Processing %s', '[DRY RUN]', MOCK_QUERY_DATA[0]), | ||
call('%s Processing %s', '[DRY RUN]', MOCK_QUERY_DATA[1]), | ||
call('%s Processing completed of [%s] rows', '[DRY RUN]', 2), | ||
call('%s Processing [%s] rows', '[DRY RUN]', 2), | ||
call('%s Processing %s', '[DRY RUN]', MOCK_QUERY_DATA[2]), | ||
call('%s Processing %s', '[DRY RUN]', MOCK_QUERY_DATA[3]), | ||
call('%s Processing completed of [%s] rows', '[DRY RUN]', 2), | ||
call('%s Command completed. Segment events triggered for user ids: %s', '[DRY RUN]', user_ids) | ||
] | ||
) | ||
|
||
@patch('outcome_surveys.management.commands.send_learning_time_achieved_segment_events.track') | ||
@mock.patch( | ||
'outcome_surveys.management.commands.send_learning_time_achieved_segment_events.Command.fetch_data_from_snowflake' # nopep8 pylint: disable=line-too-long | ||
) | ||
def test_command(self, mock_fetch_data_from_snowflake, segment_track_mock): | ||
""" | ||
Verify that management command fires segment events with correct data. | ||
""" | ||
mock_fetch_data_from_snowflake.return_value = self.generate_query_data() | ||
|
||
call_command(self.command) | ||
|
||
expected_segment_calls = [ | ||
call( | ||
5000, | ||
SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
{'course_key': 'UUX+ITAx', 'course_title': 'Intro to Accounting'} | ||
), | ||
call( | ||
5001, | ||
SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
{'course_key': 'BCC+ITC', 'course_title': 'Intro to Calculus'} | ||
), | ||
call( | ||
5002, | ||
SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
{'course_key': 'ABC+CSA', 'course_title': 'Intro to Computer Architecture'} | ||
), | ||
call( | ||
5003, | ||
SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE, | ||
{'course_key': 'BCC+ITC', 'course_title': 'Intro to Quantum Computing'} | ||
) | ||
] | ||
segment_track_mock.assert_has_calls(expected_segment_calls) | ||
|
||
sent_events = LearnerCourseEvent.objects.all() | ||
assert sent_events.count() == len(MOCK_QUERY_DATA) | ||
for record in MOCK_QUERY_DATA: | ||
tracked_event = LearnerCourseEvent.objects.get(user_id=record['USER_ID']) | ||
assert tracked_event.already_sent | ||
assert str(tracked_event.course_id) == record['COURSERUN_KEY'] | ||
assert tracked_event.data == { | ||
'course_key': record['COURSE_KEY'], | ||
'course_title': record['COURSERUN_TITLE'] | ||
} | ||
assert tracked_event.follow_up_date == timezone.now().date() | ||
assert tracked_event.event_type == SEGMENT_LEARNER_ACHIEVED_LEARNING_TIME_EVENT_TYPE |
18 changes: 18 additions & 0 deletions
18
outcome_surveys/migrations/0006_alter_learnercourseevent_event_type.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Generated by Django 3.2.22 on 2023-10-31 16:55 | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('outcome_surveys', '0005_auto_20230220_0433'), | ||
] | ||
|
||
operations = [ | ||
migrations.AlterField( | ||
model_name='learnercourseevent', | ||
name='event_type', | ||
field=models.CharField(choices=[('edx.course.learner.passed.first_time', 'edx.course.learner.passed.first_time'), ('edx.course.learner.achieved.learning.time.achieved', 'edx.course.learner.achieved.learning.time.achieved')], default='edx.course.learner.passed.first_time', max_length=255), | ||
), | ||
] |
Oops, something went wrong.