diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc new file mode 100644 index 000000000000..c42baf0e56c6 --- /dev/null +++ b/changelog.d/8196.misc @@ -0,0 +1 @@ +Fix `wait_for_stream_position` to allow multiple waiters on same stream ID. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index dd77a44b8db0..2d995ec456a5 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,7 +66,9 @@ "msgpack>=0.5.2", "phonenumbers>=8.2.0", "prometheus_client>=0.0.18,<0.9.0", - # we use attr.validators.deep_iterable, which arrived in 19.1.0 + # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note: + # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33 + # is out in November.) "attrs>=19.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fcf8ebf1e74f..d6ecf5b32703 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,7 +14,6 @@ # limitations under the License. """A replication client for use by synapse workers. """ -import heapq import logging from typing import TYPE_CHECKING, Dict, List, Tuple @@ -219,9 +218,8 @@ async def wait_for_stream_position( waiting_list = self._streams_to_waiters.setdefault(stream_name, []) - # We insert into the list using heapq as it is more efficient than - # pushing then resorting each time. - heapq.heappush(waiting_list, (position, deferred)) + waiting_list.append((position, deferred)) + waiting_list.sort(key=lambda t: t[0]) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"):