From 3f18263afc170bd8bf82c20e96ccabc33d230c12 Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Fri, 12 Apr 2019 11:00:52 +0100
Subject: [PATCH 1/8] Prevent multiple upgrades on the same room at once

---
 synapse/handlers/room.py | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 17628e268438..4c979657289d 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -70,6 +70,7 @@ def __init__(self, hs):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.currently_upgrading_rooms = {}
 
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -90,6 +91,12 @@ def upgrade_room(self, requester, old_room_id, new_version):
 
         user_id = requester.user.to_string()
 
+        if old_room_id in self.currently_upgrading_rooms:
+            raise SynapseError(
+                400, "An upgrade for this room is currently in progress."
+            )
+        self.currently_upgrading_rooms[old_room_id] = True
+
         with (yield self._upgrade_linearizer.queue(old_room_id)):
             # start by allocating a new room id
             r = yield self.store.get_room(old_room_id)
@@ -149,6 +156,8 @@ def upgrade_room(self, requester, old_room_id, new_version):
                 requester, old_room_id, new_room_id, old_room_state,
             )
 
+            del self.currently_upgrading_rooms[old_room_id]
+
             defer.returnValue(new_room_id)
 
     @defer.inlineCallbacks

From fcc26355b36473f9272ab6b0fdeb229f0f27567d Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Fri, 12 Apr 2019 11:17:00 +0100
Subject: [PATCH 2/8] changelog

---
 changelog.d/5051.bugfix | 1 +
 1 file changed, 1 insertion(+)
 create mode 100644 changelog.d/5051.bugfix

diff --git a/changelog.d/5051.bugfix b/changelog.d/5051.bugfix
new file mode 100644
index 000000000000..bfa22cc75961
--- /dev/null
+++ b/changelog.d/5051.bugfix
@@ -0,0 +1 @@
+Prevent >1 room upgrades happening simultaneously on the same room.

From b239509bf87b936fec3a3638bdb10adea46bd865 Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Fri, 24 May 2019 11:33:39 +0100
Subject: [PATCH 3/8] Better rules for how simultaneous upgrades behave

---
 synapse/handlers/room.py | 68 ++++++++++++++++++++++++++++++++++++----
 1 file changed, 62 insertions(+), 6 deletions(-)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5a8b8da04465..dec20c259b5b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -70,9 +70,17 @@ def __init__(self, hs):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
-        self.currently_upgrading_rooms = {}
         self.config = hs.config
 
+        # Dict of {room_id: {user_id: 1}}
+        # Used for keeping track of rooms that are currently being upgraded
+        self.currently_upgrading_rooms = {}
+
+        # Dict of {room_id: request_response}
+        # Used for saving responses to requests to relay them to subsequent
+        # requests
+        self.upgrade_responses = {}
+
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
 
@@ -92,11 +100,52 @@ def upgrade_room(self, requester, old_room_id, new_version):
 
         user_id = requester.user.to_string()
 
+        # Check if this room is already being upgraded
         if old_room_id in self.currently_upgrading_rooms:
-            raise SynapseError(
-                400, "An upgrade for this room is currently in progress."
-            )
-        self.currently_upgrading_rooms[old_room_id] = True
+            if self.currently_upgrading_rooms[old_room_id]["user"] == requester.user:
+                # This user is trying to update the same room before the
+                # previous request is done. Send the same response as the first
+                # attempt.
+
+                # Say we're waiting on the response from another request
+                self.currently_upgrading_rooms[old_room_id]["pending_requests"] += 1
+
+                # Wait on the linearizer for this old room id
+                with (yield self._upgrade_linearizer.queue(old_room_id)):
+                    # Each run of the linearizer needs to wait on something or
+                    # else it will get stuck
+                    pass
+
+                logging.info("Room upgrade attempted again simultaneously by the same user. "
+                             "Responding to this request with the result of the previous attempt")
+
+                # Return what was responded to the previous request
+                response = self.upgrade_responses[old_room_id]
+
+                # Decrement upgrading entry and remove if necessary
+                self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1
+                if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0:
+                    del self.currently_upgrading_rooms[old_room_id]
+
+                    # Remove saved response as well if nothing is waiting on it
+                    del self.upgrade_responses[old_room_id]
+
+                defer.returnValue(response)
+            else:
+                # Two different people are trying to upgrade the same room.
+                # Send the second an error.
+                #
+                # Of course this only gets caught if both users are on the same
+                # homeserver.
+                raise SynapseError(
+                    # 409 - HTTP for "Conflict"
+                    409, "An upgrade for this room is currently in progress",
+                )
+
+        # Mark this room as currently being upgraded by this user
+        self.currently_upgrading_rooms[old_room_id] = {
+            "user": requester.user, "pending_requests": 1,
+        }
 
         with (yield self._upgrade_linearizer.queue(old_room_id)):
             # start by allocating a new room id
@@ -157,7 +206,14 @@ def upgrade_room(self, requester, old_room_id, new_version):
                 requester, old_room_id, new_room_id, old_room_state,
             )
 
-            del self.currently_upgrading_rooms[old_room_id]
+            # Remove the pending request and the entry for this room id if
+            # necessary
+            self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1
+            if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0:
+                del self.currently_upgrading_rooms[old_room_id]
+            else:
+                # Save this response if another request is waiting on it
+                self.upgrade_responses[old_room_id] = new_room_id
 
             defer.returnValue(new_room_id)
 

From 0b244523b92aaa12fb59aaaac12e1cc7712ece39 Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Fri, 24 May 2019 11:36:07 +0100
Subject: [PATCH 4/8] lint

---
 synapse/handlers/room.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index dec20c259b5b..23bbbd1c80d6 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -116,8 +116,9 @@ def upgrade_room(self, requester, old_room_id, new_version):
                     # else it will get stuck
                     pass
 
-                logging.info("Room upgrade attempted again simultaneously by the same user. "
-                             "Responding to this request with the result of the previous attempt")
+                logging.info("Room upgrade attempted again simultaneously "
+                             "by the same user. Responding to this request "
+                             "with the result of the previous attempt")
 
                 # Return what was responded to the previous request
                 response = self.upgrade_responses[old_room_id]

From 91c55b117358bb207ef3610e9fd3570ef4fa66bb Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Fri, 24 May 2019 17:17:36 +0100
Subject: [PATCH 5/8] Refactor to use ResponseCache

---
 synapse/handlers/room.py              | 189 +++++++++++---------------
 synapse/util/caches/response_cache.py |   2 +-
 2 files changed, 81 insertions(+), 110 deletions(-)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23bbbd1c80d6..e75d5b21af30 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -33,6 +33,7 @@
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
 from synapse.visibility import filter_events_for_client
+from synapse.util.caches.response_cache import ResponseCache
 
 from ._base import BaseHandler
 
@@ -40,6 +41,8 @@
 
 id_server_scheme = "https://"
 
+FIVE_MINUTES_IN_MS = 5 * 60 * 1000
+
 
 class RoomCreationHandler(BaseHandler):
 
@@ -72,18 +75,15 @@ def __init__(self, hs):
         self.room_member_handler = hs.get_room_member_handler()
         self.config = hs.config
 
-        # Dict of {room_id: {user_id: 1}}
-        # Used for keeping track of rooms that are currently being upgraded
-        self.currently_upgrading_rooms = {}
-
-        # Dict of {room_id: request_response}
-        # Used for saving responses to requests to relay them to subsequent
-        # requests
-        self.upgrade_responses = {}
-
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
 
+        # If a user tries to update the same room multiple times in quick
+        # succession, only process the first attempt and return its result to
+        # subsequent requests
+        self._upgrade_response_cache = ResponseCache(hs, "room_upgrade",
+                                                     timeout_ms=FIVE_MINUTES_IN_MS)
+
     @defer.inlineCallbacks
     def upgrade_room(self, requester, old_room_id, new_version):
         """Replace a room with a new room with a different version
@@ -100,123 +100,94 @@ def upgrade_room(self, requester, old_room_id, new_version):
 
         user_id = requester.user.to_string()
 
-        # Check if this room is already being upgraded
-        if old_room_id in self.currently_upgrading_rooms:
-            if self.currently_upgrading_rooms[old_room_id]["user"] == requester.user:
-                # This user is trying to update the same room before the
-                # previous request is done. Send the same response as the first
-                # attempt.
-
-                # Say we're waiting on the response from another request
-                self.currently_upgrading_rooms[old_room_id]["pending_requests"] += 1
-
-                # Wait on the linearizer for this old room id
-                with (yield self._upgrade_linearizer.queue(old_room_id)):
-                    # Each run of the linearizer needs to wait on something or
-                    # else it will get stuck
-                    pass
-
-                logging.info("Room upgrade attempted again simultaneously "
-                             "by the same user. Responding to this request "
-                             "with the result of the previous attempt")
-
-                # Return what was responded to the previous request
-                response = self.upgrade_responses[old_room_id]
-
-                # Decrement upgrading entry and remove if necessary
-                self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1
-                if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0:
-                    del self.currently_upgrading_rooms[old_room_id]
-
-                    # Remove saved response as well if nothing is waiting on it
-                    del self.upgrade_responses[old_room_id]
-
-                defer.returnValue(response)
-            else:
+        # Check if this room is already being upgraded by another person
+        for key in self._upgrade_response_cache.pending_result_cache:
+            if key[0] == old_room_id and key[1] != user_id:
                 # Two different people are trying to upgrade the same room.
                 # Send the second an error.
                 #
-                # Of course this only gets caught if both users are on the same
-                # homeserver.
+                # Note that this of course only gets caught if both users are
+                # on the same homeserver.
                 raise SynapseError(
-                    # 409 - HTTP for "Conflict"
-                    409, "An upgrade for this room is currently in progress",
+                    400, "An upgrade for this room is currently in progress",
                 )
 
-        # Mark this room as currently being upgraded by this user
-        self.currently_upgrading_rooms[old_room_id] = {
-            "user": requester.user, "pending_requests": 1,
-        }
+        # Upgrade the room
+        #
+        # If this user has sent multiple upgrade requests for the same room
+        # and one of them is not complete yet, cache the response and
+        # return it to all subsequent requests
+        ret = yield self._upgrade_response_cache.wrap(
+            (old_room_id, user_id, new_version),
+            self._upgrade_room,
+            requester, old_room_id, new_version, # args for _upgrade_room
+        )
 
-        with (yield self._upgrade_linearizer.queue(old_room_id)):
-            # start by allocating a new room id
-            r = yield self.store.get_room(old_room_id)
-            if r is None:
-                raise NotFoundError("Unknown room id %s" % (old_room_id,))
-            new_room_id = yield self._generate_room_id(
-                creator_id=user_id, is_public=r["is_public"],
-            )
+        defer.returnValue(ret)
 
-            logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
+    @defer.inlineCallbacks
+    def _upgrade_room(self, requester, old_room_id, new_version):
+        user_id = requester.user.to_string()
 
-            # we create and auth the tombstone event before properly creating the new
-            # room, to check our user has perms in the old room.
-            tombstone_event, tombstone_context = (
-                yield self.event_creation_handler.create_event(
-                    requester, {
-                        "type": EventTypes.Tombstone,
-                        "state_key": "",
-                        "room_id": old_room_id,
-                        "sender": user_id,
-                        "content": {
-                            "body": "This room has been replaced",
-                            "replacement_room": new_room_id,
-                        }
-                    },
-                    token_id=requester.access_token_id,
-                )
-            )
-            old_room_version = yield self.store.get_room_version(old_room_id)
-            yield self.auth.check_from_context(
-                old_room_version, tombstone_event, tombstone_context,
-            )
+        # start by allocating a new room id
+        r = yield self.store.get_room(old_room_id)
+        if r is None:
+            raise NotFoundError("Unknown room id %s" % (old_room_id,))
+        new_room_id = yield self._generate_room_id(
+            creator_id=user_id, is_public=r["is_public"],
+        )
 
-            yield self.clone_existing_room(
-                requester,
-                old_room_id=old_room_id,
-                new_room_id=new_room_id,
-                new_room_version=new_version,
-                tombstone_event_id=tombstone_event.event_id,
-            )
+        logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
 
-            # now send the tombstone
-            yield self.event_creation_handler.send_nonmember_event(
-                requester, tombstone_event, tombstone_context,
+        # we create and auth the tombstone event before properly creating the new
+        # room, to check our user has perms in the old room.
+        tombstone_event, tombstone_context = (
+            yield self.event_creation_handler.create_event(
+                requester, {
+                    "type": EventTypes.Tombstone,
+                    "state_key": "",
+                    "room_id": old_room_id,
+                    "sender": user_id,
+                    "content": {
+                        "body": "This room has been replaced",
+                        "replacement_room": new_room_id,
+                    }
+                },
+                token_id=requester.access_token_id,
             )
+        )
+        old_room_version = yield self.store.get_room_version(old_room_id)
+        yield self.auth.check_from_context(
+            old_room_version, tombstone_event, tombstone_context,
+        )
 
-            old_room_state = yield tombstone_context.get_current_state_ids(self.store)
+        yield self.clone_existing_room(
+            requester,
+            old_room_id=old_room_id,
+            new_room_id=new_room_id,
+            new_room_version=new_version,
+            tombstone_event_id=tombstone_event.event_id,
+        )
 
-            # update any aliases
-            yield self._move_aliases_to_new_room(
-                requester, old_room_id, new_room_id, old_room_state,
-            )
+        # now send the tombstone
+        yield self.event_creation_handler.send_nonmember_event(
+            requester, tombstone_event, tombstone_context,
+        )
 
-            # and finally, shut down the PLs in the old room, and update them in the new
-            # room.
-            yield self._update_upgraded_room_pls(
-                requester, old_room_id, new_room_id, old_room_state,
-            )
+        old_room_state = yield tombstone_context.get_current_state_ids(self.store)
 
-            # Remove the pending request and the entry for this room id if
-            # necessary
-            self.currently_upgrading_rooms[old_room_id]["pending_requests"] -= 1
-            if self.currently_upgrading_rooms[old_room_id]["pending_requests"] <= 0:
-                del self.currently_upgrading_rooms[old_room_id]
-            else:
-                # Save this response if another request is waiting on it
-                self.upgrade_responses[old_room_id] = new_room_id
+        # update any aliases
+        yield self._move_aliases_to_new_room(
+            requester, old_room_id, new_room_id, old_room_state,
+        )
+
+        # and finally, shut down the PLs in the old room, and update them in the new
+        # room.
+        yield self._update_upgraded_room_pls(
+            requester, old_room_id, new_room_id, old_room_state,
+        )
 
-            defer.returnValue(new_room_id)
+        defer.returnValue(new_room_id)
 
     @defer.inlineCallbacks
     def _update_upgraded_room_pls(
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index afb03b2e1b7c..9db29c75841d 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -140,7 +140,7 @@ def handle_request(request):
 
             *args: positional parameters to pass to the callback, if it is used
 
-            **kwargs: named paramters to pass to the callback, if it is used
+            **kwargs: named parameters to pass to the callback, if it is used
 
         Returns:
             twisted.internet.defer.Deferred: yieldable result

From 5214a05663622b9fe5df8af0d7afe1b7d92b9743 Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Fri, 24 May 2019 17:18:16 +0100
Subject: [PATCH 6/8] lint

---
 synapse/handlers/room.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e75d5b21af30..a5d12022e4a0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -32,8 +32,8 @@
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
-from synapse.visibility import filter_events_for_client
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -120,7 +120,7 @@ def upgrade_room(self, requester, old_room_id, new_version):
         ret = yield self._upgrade_response_cache.wrap(
             (old_room_id, user_id, new_version),
             self._upgrade_room,
-            requester, old_room_id, new_version, # args for _upgrade_room
+            requester, old_room_id, new_version,  # args for _upgrade_room
         )
 
         defer.returnValue(ret)

From 3e4dc0a11197c5be03e5c6e7c46be0c7c7e4914e Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Thu, 20 Jun 2019 15:58:13 +0100
Subject: [PATCH 7/8] Don't filter response cache by new room version

---
 synapse/handlers/room.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 093d390a8e0a..940dce835045 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -121,7 +121,7 @@ def upgrade_room(self, requester, old_room_id, new_version):
         # and one of them is not complete yet, cache the response and
         # return it to all subsequent requests
         ret = yield self._upgrade_response_cache.wrap(
-            (old_room_id, user_id, new_version),
+            (old_room_id, user_id),
             self._upgrade_room,
             requester, old_room_id, new_version,  # args for _upgrade_room
         )

From b87212e975aee9201ccec232edcb88c99360f87c Mon Sep 17 00:00:00 2001
From: Andrew Morgan <andrew@amorgan.xyz>
Date: Thu, 20 Jun 2019 16:41:02 +0100
Subject: [PATCH 8/8] black'd

---
 synapse/handlers/room.py | 26 +++++++++++++++-----------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 940dce835045..db3f8cb76b9f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -81,8 +81,9 @@ def __init__(self, hs):
         # If a user tries to update the same room multiple times in quick
         # succession, only process the first attempt and return its result to
         # subsequent requests
-        self._upgrade_response_cache = ResponseCache(hs, "room_upgrade",
-                                                     timeout_ms=FIVE_MINUTES_IN_MS)
+        self._upgrade_response_cache = ResponseCache(
+            hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
+        )
         self._server_notices_mxid = hs.config.server_notices_mxid
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
@@ -112,7 +113,7 @@ def upgrade_room(self, requester, old_room_id, new_version):
                 # Note that this of course only gets caught if both users are
                 # on the same homeserver.
                 raise SynapseError(
-                    400, "An upgrade for this room is currently in progress",
+                    400, "An upgrade for this room is currently in progress"
                 )
 
         # Upgrade the room
@@ -123,7 +124,9 @@ def upgrade_room(self, requester, old_room_id, new_version):
         ret = yield self._upgrade_response_cache.wrap(
             (old_room_id, user_id),
             self._upgrade_room,
-            requester, old_room_id, new_version,  # args for _upgrade_room
+            requester,
+            old_room_id,
+            new_version,  # args for _upgrade_room
         )
         defer.returnValue(ret)
 
@@ -136,7 +139,7 @@ def _upgrade_room(self, requester, old_room_id, new_version):
         if r is None:
             raise NotFoundError("Unknown room id %s" % (old_room_id,))
         new_room_id = yield self._generate_room_id(
-            creator_id=user_id, is_public=r["is_public"],
+            creator_id=user_id, is_public=r["is_public"]
         )
 
         logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
@@ -145,7 +148,8 @@ def _upgrade_room(self, requester, old_room_id, new_version):
         # room, to check our user has perms in the old room.
         tombstone_event, tombstone_context = (
             yield self.event_creation_handler.create_event(
-                requester, {
+                requester,
+                {
                     "type": EventTypes.Tombstone,
                     "state_key": "",
                     "room_id": old_room_id,
@@ -153,14 +157,14 @@ def _upgrade_room(self, requester, old_room_id, new_version):
                     "content": {
                         "body": "This room has been replaced",
                         "replacement_room": new_room_id,
-                    }
+                    },
                 },
                 token_id=requester.access_token_id,
             )
         )
         old_room_version = yield self.store.get_room_version(old_room_id)
         yield self.auth.check_from_context(
-            old_room_version, tombstone_event, tombstone_context,
+            old_room_version, tombstone_event, tombstone_context
         )
 
         yield self.clone_existing_room(
@@ -173,20 +177,20 @@ def _upgrade_room(self, requester, old_room_id, new_version):
 
         # now send the tombstone
         yield self.event_creation_handler.send_nonmember_event(
-            requester, tombstone_event, tombstone_context,
+            requester, tombstone_event, tombstone_context
         )
 
         old_room_state = yield tombstone_context.get_current_state_ids(self.store)
 
         # update any aliases
         yield self._move_aliases_to_new_room(
-            requester, old_room_id, new_room_id, old_room_state,
+            requester, old_room_id, new_room_id, old_room_state
         )
 
         # and finally, shut down the PLs in the old room, and update them in the new
         # room.
         yield self._update_upgraded_room_pls(
-            requester, old_room_id, new_room_id, old_room_state,
+            requester, old_room_id, new_room_id, old_room_state
         )
 
         defer.returnValue(new_room_id)