This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Lay some foundation work to allow workers to only subscribe to some kinds of messages, reducing replication traffic. #12672
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
7467a56
Listen to more Redis channels so we can split up traffic later
reivilibre f7ee5a8
Test that we listen to the right channels on the right workers (for now)
reivilibre 25698fa
Newsfile
reivilibre f97cf42
Skip Redis test if Redis not installed
reivilibre 4bc2ab1
Add channel support to Fake Redis
reivilibre cfa55ed
Restructure RedisSubscriber and the factory
reivilibre 33702fa
Restructure the way we subscribe to channels
reivilibre 15c08e5
Migrate tests to new way of doing things
reivilibre 55c9efe
Skip tests if no Redis
reivilibre 3b0a1d0
Remove obsolete file
reivilibre 8f362fb
Fix comments
reivilibre 79983c9
Fix up the Redis tests
reivilibre File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Lay some foundation work to allow workers to only subscribe to some kinds of messages, reducing replication traffic. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# Copyright 2022 The Matrix.org Foundation C.I.C. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from tests.replication._base import RedisMultiWorkerStreamTestCase | ||
|
||
|
||
class ChannelsTestCase(RedisMultiWorkerStreamTestCase): | ||
def test_subscribed_to_enough_redis_channels(self) -> None: | ||
# The default main process is subscribed to the USER_IP channel. | ||
self.assertCountEqual( | ||
self.hs.get_replication_command_handler()._channels_to_subscribe_to, | ||
["USER_IP"], | ||
) | ||
|
||
def test_background_worker_subscribed_to_user_ip(self) -> None: | ||
# The default main process is subscribed to the USER_IP channel. | ||
worker1 = self.make_worker_hs( | ||
"synapse.app.generic_worker", | ||
extra_config={ | ||
"worker_name": "worker1", | ||
"run_background_tasks_on": "worker1", | ||
"redis": {"enabled": True}, | ||
}, | ||
) | ||
self.assertIn( | ||
"USER_IP", | ||
worker1.get_replication_command_handler()._channels_to_subscribe_to, | ||
) | ||
|
||
# Advance so the Redis subscription gets processed | ||
self.pump(0.1) | ||
|
||
# The counts are 2 because both the main process and the worker are subscribed. | ||
self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2) | ||
self.assertEqual( | ||
len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 2 | ||
) | ||
|
||
def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: | ||
# The default main process is subscribed to the USER_IP channel. | ||
worker2 = self.make_worker_hs( | ||
"synapse.app.generic_worker", | ||
extra_config={ | ||
"worker_name": "worker2", | ||
"run_background_tasks_on": "worker1", | ||
"redis": {"enabled": True}, | ||
}, | ||
) | ||
self.assertNotIn( | ||
"USER_IP", | ||
worker2.get_replication_command_handler()._channels_to_subscribe_to, | ||
) | ||
|
||
# Advance so the Redis subscription gets processed | ||
self.pump(0.1) | ||
|
||
# The count is 2 because both the main process and the worker are subscribed. | ||
self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2) | ||
# For USER_IP, the count is 1 because only the main process is subscribed. | ||
self.assertEqual( | ||
len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1 | ||
) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test that checks we do in fact connect to these channels? Presumably we can look at what's in
FakeRedisPubSubServer
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
I've added a new base test case class that enables Redis (because even if you're not using it with the main process for anything, you must enable it in order for the Fake Redis server to listen on a fake TCP port) and skips Postgres (since it turns out nothing subscribes to Redis when Postgres is not in use...).