Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sync progress tracking overflow #10561

Merged
merged 6 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 17 additions & 25 deletions kolibri/core/auth/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import getpass
import json
import logging
import math
import sys
from contextlib import contextmanager
from functools import wraps
Expand Down Expand Up @@ -605,15 +604,6 @@ def _push(
# note this is the CompositeSessionContext
post_transfer_handler(sync_client.context)

def _update_all_progress(self, progress_fraction, progress):
"""
Override parent progress update callback to report from the progress tracker we're sent
"""
if self.job:
self.job.update_progress(progress_fraction, 1.0)
self.job.extra_metadata.update(progress.extra_data)
self.job.save_meta()

def _session_tracker_adapter(self, signal_group, noninteractive):
"""
Attaches a signal handler to session creation signals
Expand Down Expand Up @@ -649,7 +639,6 @@ def _transfer_tracker_adapter(
:type sync_state: str
:type noninteractive: bool
"""
tracker = self.start_progress(total=100).progresstracker

def stats_msg(transfer_session):
transfer_total = (
Expand All @@ -662,24 +651,26 @@ def stats_msg(transfer_session):
)

def stats(transfer_session):
if transfer_session.records_total > 0:
if (
noninteractive or self.progresstracker.progressbar is None
) and transfer_session.records_total > 0:
logger.info(stats_msg(transfer_session))

def started(transfer_session):
stats(transfer_session)
self.start_progress(total=transfer_session.records_total or 100)

def handler(transfer_session):
"""
:type transfer_session: morango.models.core.TransferSession
"""
if transfer_session.records_total > 0:
progress = (
100
* transfer_session.records_transferred
/ float(transfer_session.records_total)
)
progress = transfer_session.records_transferred
else:
progress = 100

self.update_progress(
increment=math.ceil(progress - tracker.progress),
current_progress=progress,
message=stats_msg(transfer_session),
extra_data=dict(
bytes_sent=transfer_session.bytes_sent,
Expand All @@ -688,11 +679,11 @@ def handler(transfer_session):
),
)

if noninteractive or tracker.progressbar is None:
signal_group.started.connect(stats)
signal_group.in_progress.connect(stats)

signal_group.connect(handler)
signal_group.started.connect(started)
signal_group.started.connect(handler)
signal_group.in_progress.connect(stats)
signal_group.in_progress.connect(handler)
signal_group.completed.connect(handler)

def _queueing_tracker_adapter(
self, signal_group, message, sync_state, noninteractive
Expand All @@ -705,11 +696,11 @@ def _queueing_tracker_adapter(
:type sync_state: str
:type noninteractive: bool
"""
tracker = self.start_progress(total=2).progresstracker

def started(transfer_session):
self.start_progress(total=1)
dataset_cache.clear()
if noninteractive or tracker.progressbar is None:
if noninteractive or self.progresstracker.progressbar is None:
if (
not sync_state.endswith("DEQUEUING")
or transfer_session.records_total > 0
Expand All @@ -725,3 +716,4 @@ def handler(transfer_session):

signal_group.started.connect(started)
signal_group.started.connect(handler)
signal_group.completed.connect(handler)
69 changes: 69 additions & 0 deletions kolibri/core/auth/test/test_sync_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from django.test import TestCase
from mock import Mock
from morango.sync.utils import SyncSignalGroup

from kolibri.core.auth.management.utils import MorangoSyncCommand


class TestProgressTracking(TestCase):
def test_transfer_tracker_adapter(self):
# Create an instance of the class you're testing
instance = MorangoSyncCommand()

# Mock the relevant methods
instance.start_progress = Mock()

instance.progresstracker = Mock()
instance.progresstracker.progress = 0

signal_group = SyncSignalGroup()
# Mock the TransferSession
transfer_session_mock = Mock()

transfer_session_mock.records_transferred = 0
transfer_session_mock.records_total = 10
transfer_session_mock.bytes_sent = 0
transfer_session_mock.bytes_received = 0

# Connect the signal group to _transfer_tracker_adapter for testing
instance._transfer_tracker_adapter(signal_group, "message", "sync_state", False)

# Check if start_progress hasn't been called yet
instance.start_progress.assert_not_called()

# Simulate the started signal
signal_group.started.fire(transfer_session=transfer_session_mock)

# Check that start_progress has now been called
instance.start_progress.assert_called()

def test_queueing_tracker_adapter(self):
# Create an instance of the class you're testing
instance = MorangoSyncCommand()

# Mock the relevant methods
instance.start_progress = Mock()

instance.progresstracker = Mock()
instance.progresstracker.progress = 0

signal_group = SyncSignalGroup()
# Mock the TransferSession
transfer_session_mock = Mock()

transfer_session_mock.records_transferred = 0
transfer_session_mock.records_total = 10
transfer_session_mock.bytes_sent = 0
transfer_session_mock.bytes_received = 0

# Connect the signal group to _transfer_tracker_adapter for testing
instance._queueing_tracker_adapter(signal_group, "message", "sync_state", False)

# Check if start_progress hasn't been called yet
instance.start_progress.assert_not_called()

# Simulate the started signal
signal_group.started.fire(transfer_session=transfer_session_mock)

# Check that start_progress has now been called
instance.start_progress.assert_called()
7 changes: 3 additions & 4 deletions kolibri/core/content/management/commands/importchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,11 @@ def _start_file_transfer(
):
progress_extra_data = {"channel_id": channel_id}

with filetransfer, self.start_progress(
total=filetransfer.transfer_size
) as progress_update:
with filetransfer:
self.start_progress(total=filetransfer.transfer_size)

def progress_callback(bytes):
progress_update(bytes, progress_extra_data)
self.update_progress(bytes, extra_data=progress_extra_data)

filetransfer.run(progress_callback)
# if upgrading, import the channel
Expand Down
Loading