Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease time-to-sync for learn-only devices joining the network #11271

Merged
merged 6 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kolibri/core/auth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.exceptions import UserCancelledError
from kolibri.core.tasks.job import JobStatus
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
from kolibri.core.tasks.main import job_storage
from kolibri.core.tasks.permissions import IsAdminForJob
Expand Down Expand Up @@ -627,9 +628,7 @@ def stop_request_soud_sync(server, user):
stoppeerusersync(server, user)


@register_task(
queue=soud_sync_queue,
)
@register_task(queue=soud_sync_queue, priority=Priority.HIGH, status_fn=status_fn)
def request_soud_sync(server, user, queue_id=None, ttl=4):
"""
Make a request to the serverurl endpoint to sync this SoUD (Subset of Users Device)
Expand Down
71 changes: 62 additions & 9 deletions kolibri/core/discovery/test/test_network_search.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import mock
from django.test import TransactionTestCase

from ..utils.network.broadcast import KolibriBroadcast
from ..utils.network.broadcast import KolibriInstance
from ..utils.network.search import NetworkLocationListener

from kolibri.core.tasks.job import Priority

MOCK_INTERFACE_IP = "111.222.111.222"
MOCK_PORT = 555
MOCK_ID = "abba"
SEARCH_MODULE = "kolibri.core.discovery.utils.network.search."
DYNAMIC_NETWORK_LOCATION_TASK_PRIORITY_METHOD = (
SEARCH_MODULE
+ "NetworkLocationListener._get_dynamic_network_location_task_priority"
)


class NetworkLocationListenerTestCase(TransactionTestCase):
Expand All @@ -24,34 +29,82 @@ def setUp(self):
"instance_id": MOCK_ID,
},
)
self.mock_broadcast = mock.MagicMock(id="abc123")
self.listener = NetworkLocationListener(self.mock_broadcast)
self.broadcast_instance = KolibriInstance(
"abcd",
ip=MOCK_INTERFACE_IP,
port=MOCK_PORT,
device_info={
"instance_id": "abcd",
},
)

self.broadcast = KolibriBroadcast(instance=self.broadcast_instance)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep using the mock, to keep the unit testing scope limited to the listeners. You can use spec or spec_set on the mock to constrain its usage to align with a particular class, in this case KolibriBroadcast. All that said, the broadcast class doesn't do much without start_broadcast called.

self.broadcast.id = "abc123"

self.listener = NetworkLocationListener(self.broadcast)

@mock.patch(SEARCH_MODULE + "reset_connection_states.enqueue")
def test_register_instance(self, mock_enqueue):
self.listener.register_instance(self.instance)
mock_enqueue.assert_called_once_with(args=(self.mock_broadcast.id,))
mock_enqueue.assert_called_once_with(args=(self.broadcast.id,))

def test_dynamic_network_location_task_priority_self_no_lod(self):
# The current device is not a LOD.
self.broadcast_instance.device_info["subset_of_users_device"] = False

priority = self.listener._get_dynamic_network_location_task_priority(
self.instance
)
self.assertEqual(priority, Priority.HIGH)

def test_dynamic_network_location_task_priority_self_discovered_both_lod(self):
# The current device is a LOD.
self.broadcast_instance.device_info["subset_of_users_device"] = True
# The discovered device is LOD as well.
self.instance.device_info["subset_of_users_device"] = True

priority = self.listener._get_dynamic_network_location_task_priority(
self.instance
)
self.assertEqual(priority, Priority.REGULAR)

def test_dynamic_network_location_task_priority_self_lod_discovered_not_lod(self):
# The current device is a LOD.
self.broadcast_instance.device_info["subset_of_users_device"] = True
# The discovered device is not a LOD.
self.instance.device_info["subset_of_users_device"] = False

priority = self.listener._get_dynamic_network_location_task_priority(
self.instance
)
self.assertEqual(priority, Priority.HIGH)

@mock.patch(SEARCH_MODULE + "add_dynamic_network_location.enqueue")
def test_add_instance(self, mock_enqueue):
@mock.patch(DYNAMIC_NETWORK_LOCATION_TASK_PRIORITY_METHOD)
def test_add_instance(self, mock_priority_method, mock_enqueue):
self.listener.add_instance(self.instance)
mock_priority_method.assert_called_once_with(self.instance)
mock_enqueue.assert_called_once_with(
job_id="9e89d3ea5256721c9cd631eac36feafe",
args=(self.mock_broadcast.id, self.instance.to_dict()),
args=(self.broadcast.id, self.instance.to_dict()),
priority=mock_priority_method(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using mock_priority_method.return_value can be a bit clearer for test assertions, and also as a practice avoids potential gotchas asserting the number of times a mock is called.

)

@mock.patch(SEARCH_MODULE + "add_dynamic_network_location.enqueue")
def test_update_instance(self, mock_enqueue):
@mock.patch(DYNAMIC_NETWORK_LOCATION_TASK_PRIORITY_METHOD)
def test_update_instance(self, mock_priority_method, mock_enqueue):
self.listener.update_instance(self.instance)
mock_priority_method.assert_called_once_with(self.instance)
mock_enqueue.assert_called_once_with(
job_id="9e89d3ea5256721c9cd631eac36feafe",
args=(self.mock_broadcast.id, self.instance.to_dict()),
args=(self.broadcast.id, self.instance.to_dict()),
priority=mock_priority_method(),
)

@mock.patch(SEARCH_MODULE + "remove_dynamic_network_location.enqueue")
def test_remove_instance(self, mock_enqueue):
self.listener.remove_instance(self.instance)
mock_enqueue.assert_called_once_with(
job_id="c5e88d1cb4a342ad3d23081022248fbc",
args=(self.mock_broadcast.id, self.instance.to_dict()),
args=(self.broadcast.id, self.instance.to_dict()),
)
26 changes: 26 additions & 0 deletions kolibri/core/discovery/utils/network/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from kolibri.core.discovery.tasks import TYPE_ADD
from kolibri.core.discovery.tasks import TYPE_REMOVE
from kolibri.core.discovery.utils.network.broadcast import KolibriInstanceListener
from kolibri.core.tasks.job import Priority

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,13 +38,35 @@ def unregister_instance(self, instance):
# when we stop broadcasting, enqueue task to reset all connection states
reset_connection_states.enqueue(args=(self.broadcast.id,))

def _get_dynamic_network_location_task_priority(self, instance):
priority = Priority.REGULAR
is_current_device_lod = self.broadcast.instance.device_info.get(
"subset_of_users_device", False
)
is_discovered_device_lod = instance.device_info.get(
"subset_of_users_device", False
)

# If the current device is not an LOD,
# OR
# the current device is an LOD and the discovered device is not an LOD,
# then enqueue with high priority.
if (not is_current_device_lod) or (
is_current_device_lod and not is_discovered_device_lod
):
priority = Priority.HIGH
return priority

def add_instance(self, instance):
"""
:type instance: kolibri.core.discovery.utils.network.broadcast.KolibriInstance
"""
priority = self._get_dynamic_network_location_task_priority(instance)

add_dynamic_network_location.enqueue(
job_id=generate_job_id(TYPE_ADD, self.broadcast.id, instance.id),
args=(self.broadcast.id, instance.to_dict()),
priority=priority,
)

def update_instance(self, instance):
Expand All @@ -52,9 +75,12 @@ def update_instance(self, instance):
"""
# enqueue as 'add' because update event could fire immediately after 'add', so this dedupes
# the tasks, and it also doesn't do anything differently anyway
priority = self._get_dynamic_network_location_task_priority(instance)

add_dynamic_network_location.enqueue(
job_id=generate_job_id(TYPE_ADD, self.broadcast.id, instance.id),
args=(self.broadcast.id, instance.to_dict()),
priority=priority,
)

def remove_instance(self, instance):
Expand Down