From 3f18263afc170bd8bf82c20e96ccabc33d230c12 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Fri, 12 Apr 2019 11:00:52 +0100 Subject: [PATCH 1/8] Prevent multiple upgrades on the same room at once --- synapse/handlers/room.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 17628e268438..4c979657289d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -70,6 +70,7 @@ def __init__(self, hs): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() + self.currently_upgrading_rooms = {} # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -90,6 +91,12 @@ def upgrade_room(self, requester, old_room_id, new_version): user_id = requester.user.to_string() + if old_room_id in self.currently_upgrading_rooms: + raise SynapseError( + 400, "An upgrade for this room is currently in progress." + ) + self.currently_upgrading_rooms[old_room_id] = True + with (yield self._upgrade_linearizer.queue(old_room_id)): # start by allocating a new room id r = yield self.store.get_room(old_room_id) @@ -149,6 +156,8 @@ def upgrade_room(self, requester, old_room_id, new_version): requester, old_room_id, new_room_id, old_room_state, ) + del self.currently_upgrading_rooms[old_room_id] + defer.returnValue(new_room_id) @defer.inlineCallbacks From fcc26355b36473f9272ab6b0fdeb229f0f27567d Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Fri, 12 Apr 2019 11:17:00 +0100 Subject: [PATCH 2/8] changelog --- changelog.d/5051.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5051.bugfix diff --git a/changelog.d/5051.bugfix b/changelog.d/5051.bugfix new file mode 100644 index 000000000000..bfa22cc75961 --- /dev/null +++ b/changelog.d/5051.bugfix @@ -0,0 +1 @@ +Prevent >1 room upgrades happening simultaneously on the same room. From b239509bf87b936fec3a3638bdb10adea46bd865 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Fri, 24 May 2019 11:33:39 +0100 Subject: [PATCH 3/8] Better rules for how simultaneous upgrades behave --- synapse/handlers/room.py | 68 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a8b8da04465..dec20c259b5b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -70,9 +70,17 @@ def __init__(self, hs): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() - self.currently_upgrading_rooms = {} self.config = hs.config + # Dict of {room_id: {user_id: 1}} + # Used for keeping track of rooms that are currently being upgraded + self.currently_upgrading_rooms = {} + + # Dict of {room_id: request_response} + # Used for saving responses to requests to relay them to subsequent + # requests + self.upgrade_responses = {} + # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -92,11 +100,52 @@ def upgrade_room(self, requester, old_room_id, new_version): user_id = requester.user.to_string() + # Check if this room is already being upgraded if old_room_id in self.currently_upgrading_rooms: - raise SynapseError( - 400, "An upgrade for this room is currently in progress." - ) - self.currently_upgrading_rooms[old_room_id] = True + if self.currently_upgrading_rooms[old_room_id]["user"] == requester.user: + # This user is trying to update the same room before the + # previous request is done. Send the same response as the first + # attempt. + + # Say we're waiting on the response from another request + self.currently_upgrading_rooms[old_room_id]["pending_requests"] += 1 + + # Wait on the linearizer for this old room id + with (yield self._upgrade_linearizer.queue(old_room_id)): + # Each run of the linearizer needs to wait on something or + # else it will get stuck + pass + + logging.info("Room upgrade attempted again simultaneously by the same user. " + "Responding to this request with the result of the previous attempt") + + # Return what was responded to the previous request + response = self.upgrade_responses[old_room_id] + + # Decrement upgrading entry and remove if necessary + self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1 + if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0: + del self.currently_upgrading_rooms[old_room_id] + + # Remove saved response as well if nothing is waiting on it + del self.upgrade_responses[old_room_id] + + defer.returnValue(response) + else: + # Two different people are trying to upgrade the same room. + # Send the second an error. + # + # Of course this only gets caught if both users are on the same + # homeserver. + raise SynapseError( + # 409 - HTTP for "Conflict" + 409, "An upgrade for this room is currently in progress", + ) + + # Mark this room as currently being upgraded by this user + self.currently_upgrading_rooms[old_room_id] = { + "user": requester.user, "pending_requests": 1, + } with (yield self._upgrade_linearizer.queue(old_room_id)): # start by allocating a new room id @@ -157,7 +206,14 @@ def upgrade_room(self, requester, old_room_id, new_version): requester, old_room_id, new_room_id, old_room_state, ) - del self.currently_upgrading_rooms[old_room_id] + # Remove the pending request and the entry for this room id if + # necessary + self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1 + if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0: + del self.currently_upgrading_rooms[old_room_id] + else: + # Save this response if another request is waiting on it + self.upgrade_responses[old_room_id] = new_room_id defer.returnValue(new_room_id) From 0b244523b92aaa12fb59aaaac12e1cc7712ece39 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Fri, 24 May 2019 11:36:07 +0100 Subject: [PATCH 4/8] lint --- synapse/handlers/room.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dec20c259b5b..23bbbd1c80d6 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -116,8 +116,9 @@ def upgrade_room(self, requester, old_room_id, new_version): # else it will get stuck pass - logging.info("Room upgrade attempted again simultaneously by the same user. " - "Responding to this request with the result of the previous attempt") + logging.info("Room upgrade attempted again simultaneously " + "by the same user. Responding to this request " + "with the result of the previous attempt") # Return what was responded to the previous request response = self.upgrade_responses[old_room_id] From 91c55b117358bb207ef3610e9fd3570ef4fa66bb Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Fri, 24 May 2019 17:17:36 +0100 Subject: [PATCH 5/8] Refactor to use ResponseCache --- synapse/handlers/room.py | 189 +++++++++++--------------- synapse/util/caches/response_cache.py | 2 +- 2 files changed, 81 insertions(+), 110 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23bbbd1c80d6..e75d5b21af30 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -33,6 +33,7 @@ from synapse.util import stringutils from synapse.util.async_helpers import Linearizer from synapse.visibility import filter_events_for_client +from synapse.util.caches.response_cache import ResponseCache from ._base import BaseHandler @@ -40,6 +41,8 @@ id_server_scheme = "https://" +FIVE_MINUTES_IN_MS = 5 * 60 * 1000 + class RoomCreationHandler(BaseHandler): @@ -72,18 +75,15 @@ def __init__(self, hs): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config - # Dict of {room_id: {user_id: 1}} - # Used for keeping track of rooms that are currently being upgraded - self.currently_upgrading_rooms = {} - - # Dict of {room_id: request_response} - # Used for saving responses to requests to relay them to subsequent - # requests - self.upgrade_responses = {} - # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") + # If a user tries to update the same room multiple times in quick + # succession, only process the first attempt and return its result to + # subsequent requests + self._upgrade_response_cache = ResponseCache(hs, "room_upgrade", + timeout_ms=FIVE_MINUTES_IN_MS) + @defer.inlineCallbacks def upgrade_room(self, requester, old_room_id, new_version): """Replace a room with a new room with a different version @@ -100,123 +100,94 @@ def upgrade_room(self, requester, old_room_id, new_version): user_id = requester.user.to_string() - # Check if this room is already being upgraded - if old_room_id in self.currently_upgrading_rooms: - if self.currently_upgrading_rooms[old_room_id]["user"] == requester.user: - # This user is trying to update the same room before the - # previous request is done. Send the same response as the first - # attempt. - - # Say we're waiting on the response from another request - self.currently_upgrading_rooms[old_room_id]["pending_requests"] += 1 - - # Wait on the linearizer for this old room id - with (yield self._upgrade_linearizer.queue(old_room_id)): - # Each run of the linearizer needs to wait on something or - # else it will get stuck - pass - - logging.info("Room upgrade attempted again simultaneously " - "by the same user. Responding to this request " - "with the result of the previous attempt") - - # Return what was responded to the previous request - response = self.upgrade_responses[old_room_id] - - # Decrement upgrading entry and remove if necessary - self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1 - if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0: - del self.currently_upgrading_rooms[old_room_id] - - # Remove saved response as well if nothing is waiting on it - del self.upgrade_responses[old_room_id] - - defer.returnValue(response) - else: + # Check if this room is already being upgraded by another person + for key in self._upgrade_response_cache.pending_result_cache: + if key[0] == old_room_id and key[1] != user_id: # Two different people are trying to upgrade the same room. # Send the second an error. # - # Of course this only gets caught if both users are on the same - # homeserver. + # Note that this of course only gets caught if both users are + # on the same homeserver. raise SynapseError( - # 409 - HTTP for "Conflict" - 409, "An upgrade for this room is currently in progress", + 400, "An upgrade for this room is currently in progress", ) - # Mark this room as currently being upgraded by this user - self.currently_upgrading_rooms[old_room_id] = { - "user": requester.user, "pending_requests": 1, - } + # Upgrade the room + # + # If this user has sent multiple upgrade requests for the same room + # and one of them is not complete yet, cache the response and + # return it to all subsequent requests + ret = yield self._upgrade_response_cache.wrap( + (old_room_id, user_id, new_version), + self._upgrade_room, + requester, old_room_id, new_version, # args for _upgrade_room + ) - with (yield self._upgrade_linearizer.queue(old_room_id)): - # start by allocating a new room id - r = yield self.store.get_room(old_room_id) - if r is None: - raise NotFoundError("Unknown room id %s" % (old_room_id,)) - new_room_id = yield self._generate_room_id( - creator_id=user_id, is_public=r["is_public"], - ) + defer.returnValue(ret) - logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) + @defer.inlineCallbacks + def _upgrade_room(self, requester, old_room_id, new_version): + user_id = requester.user.to_string() - # we create and auth the tombstone event before properly creating the new - # room, to check our user has perms in the old room. - tombstone_event, tombstone_context = ( - yield self.event_creation_handler.create_event( - requester, { - "type": EventTypes.Tombstone, - "state_key": "", - "room_id": old_room_id, - "sender": user_id, - "content": { - "body": "This room has been replaced", - "replacement_room": new_room_id, - } - }, - token_id=requester.access_token_id, - ) - ) - old_room_version = yield self.store.get_room_version(old_room_id) - yield self.auth.check_from_context( - old_room_version, tombstone_event, tombstone_context, - ) + # start by allocating a new room id + r = yield self.store.get_room(old_room_id) + if r is None: + raise NotFoundError("Unknown room id %s" % (old_room_id,)) + new_room_id = yield self._generate_room_id( + creator_id=user_id, is_public=r["is_public"], + ) - yield self.clone_existing_room( - requester, - old_room_id=old_room_id, - new_room_id=new_room_id, - new_room_version=new_version, - tombstone_event_id=tombstone_event.event_id, - ) + logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) - # now send the tombstone - yield self.event_creation_handler.send_nonmember_event( - requester, tombstone_event, tombstone_context, + # we create and auth the tombstone event before properly creating the new + # room, to check our user has perms in the old room. + tombstone_event, tombstone_context = ( + yield self.event_creation_handler.create_event( + requester, { + "type": EventTypes.Tombstone, + "state_key": "", + "room_id": old_room_id, + "sender": user_id, + "content": { + "body": "This room has been replaced", + "replacement_room": new_room_id, + } + }, + token_id=requester.access_token_id, ) + ) + old_room_version = yield self.store.get_room_version(old_room_id) + yield self.auth.check_from_context( + old_room_version, tombstone_event, tombstone_context, + ) - old_room_state = yield tombstone_context.get_current_state_ids(self.store) + yield self.clone_existing_room( + requester, + old_room_id=old_room_id, + new_room_id=new_room_id, + new_room_version=new_version, + tombstone_event_id=tombstone_event.event_id, + ) - # update any aliases - yield self._move_aliases_to_new_room( - requester, old_room_id, new_room_id, old_room_state, - ) + # now send the tombstone + yield self.event_creation_handler.send_nonmember_event( + requester, tombstone_event, tombstone_context, + ) - # and finally, shut down the PLs in the old room, and update them in the new - # room. - yield self._update_upgraded_room_pls( - requester, old_room_id, new_room_id, old_room_state, - ) + old_room_state = yield tombstone_context.get_current_state_ids(self.store) - # Remove the pending request and the entry for this room id if - # necessary - self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1 - if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0: - del self.currently_upgrading_rooms[old_room_id] - else: - # Save this response if another request is waiting on it - self.upgrade_responses[old_room_id] = new_room_id + # update any aliases + yield self._move_aliases_to_new_room( + requester, old_room_id, new_room_id, old_room_state, + ) + + # and finally, shut down the PLs in the old room, and update them in the new + # room. + yield self._update_upgraded_room_pls( + requester, old_room_id, new_room_id, old_room_state, + ) - defer.returnValue(new_room_id) + defer.returnValue(new_room_id) @defer.inlineCallbacks def _update_upgraded_room_pls( diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index afb03b2e1b7c..9db29c75841d 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -140,7 +140,7 @@ def handle_request(request): *args: positional parameters to pass to the callback, if it is used - **kwargs: named paramters to pass to the callback, if it is used + **kwargs: named parameters to pass to the callback, if it is used Returns: twisted.internet.defer.Deferred: yieldable result From 5214a05663622b9fe5df8af0d7afe1b7d92b9743 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Fri, 24 May 2019 17:18:16 +0100 Subject: [PATCH 6/8] lint --- synapse/handlers/room.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e75d5b21af30..a5d12022e4a0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -32,8 +32,8 @@ from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils from synapse.util.async_helpers import Linearizer -from synapse.visibility import filter_events_for_client from synapse.util.caches.response_cache import ResponseCache +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -120,7 +120,7 @@ def upgrade_room(self, requester, old_room_id, new_version): ret = yield self._upgrade_response_cache.wrap( (old_room_id, user_id, new_version), self._upgrade_room, - requester, old_room_id, new_version, # args for _upgrade_room + requester, old_room_id, new_version, # args for _upgrade_room ) defer.returnValue(ret) From 3e4dc0a11197c5be03e5c6e7c46be0c7c7e4914e Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Thu, 20 Jun 2019 15:58:13 +0100 Subject: [PATCH 7/8] Don't filter response cache by new room version --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 093d390a8e0a..940dce835045 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -121,7 +121,7 @@ def upgrade_room(self, requester, old_room_id, new_version): # and one of them is not complete yet, cache the response and # return it to all subsequent requests ret = yield self._upgrade_response_cache.wrap( - (old_room_id, user_id, new_version), + (old_room_id, user_id), self._upgrade_room, requester, old_room_id, new_version, # args for _upgrade_room ) From b87212e975aee9201ccec232edcb88c99360f87c Mon Sep 17 00:00:00 2001 From: Andrew Morgan <andrew@amorgan.xyz> Date: Thu, 20 Jun 2019 16:41:02 +0100 Subject: [PATCH 8/8] black'd --- synapse/handlers/room.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 940dce835045..db3f8cb76b9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -81,8 +81,9 @@ def __init__(self, hs): # If a user tries to update the same room multiple times in quick # succession, only process the first attempt and return its result to # subsequent requests - self._upgrade_response_cache = ResponseCache(hs, "room_upgrade", - timeout_ms=FIVE_MINUTES_IN_MS) + self._upgrade_response_cache = ResponseCache( + hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS + ) self._server_notices_mxid = hs.config.server_notices_mxid self.third_party_event_rules = hs.get_third_party_event_rules() @@ -112,7 +113,7 @@ def upgrade_room(self, requester, old_room_id, new_version): # Note that this of course only gets caught if both users are # on the same homeserver. raise SynapseError( - 400, "An upgrade for this room is currently in progress", + 400, "An upgrade for this room is currently in progress" ) # Upgrade the room @@ -123,7 +124,9 @@ def upgrade_room(self, requester, old_room_id, new_version): ret = yield self._upgrade_response_cache.wrap( (old_room_id, user_id), self._upgrade_room, - requester, old_room_id, new_version, # args for _upgrade_room + requester, + old_room_id, + new_version, # args for _upgrade_room ) defer.returnValue(ret) @@ -136,7 +139,7 @@ def _upgrade_room(self, requester, old_room_id, new_version): if r is None: raise NotFoundError("Unknown room id %s" % (old_room_id,)) new_room_id = yield self._generate_room_id( - creator_id=user_id, is_public=r["is_public"], + creator_id=user_id, is_public=r["is_public"] ) logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) @@ -145,7 +148,8 @@ def _upgrade_room(self, requester, old_room_id, new_version): # room, to check our user has perms in the old room. tombstone_event, tombstone_context = ( yield self.event_creation_handler.create_event( - requester, { + requester, + { "type": EventTypes.Tombstone, "state_key": "", "room_id": old_room_id, @@ -153,14 +157,14 @@ def _upgrade_room(self, requester, old_room_id, new_version): "content": { "body": "This room has been replaced", "replacement_room": new_room_id, - } + }, }, token_id=requester.access_token_id, ) ) old_room_version = yield self.store.get_room_version(old_room_id) yield self.auth.check_from_context( - old_room_version, tombstone_event, tombstone_context, + old_room_version, tombstone_event, tombstone_context ) yield self.clone_existing_room( @@ -173,20 +177,20 @@ def _upgrade_room(self, requester, old_room_id, new_version): # now send the tombstone yield self.event_creation_handler.send_nonmember_event( - requester, tombstone_event, tombstone_context, + requester, tombstone_event, tombstone_context ) old_room_state = yield tombstone_context.get_current_state_ids(self.store) # update any aliases yield self._move_aliases_to_new_room( - requester, old_room_id, new_room_id, old_room_state, + requester, old_room_id, new_room_id, old_room_state ) # and finally, shut down the PLs in the old room, and update them in the new # room. yield self._update_upgraded_room_pls( - requester, old_room_id, new_room_id, old_room_state, + requester, old_room_id, new_room_id, old_room_state ) defer.returnValue(new_room_id)