Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where Sliding Sync could get stuck when using workers #17438

Merged
merged 10 commits into from
Jul 15, 2024
26 changes: 20 additions & 6 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
#
import abc
import logging
import re
import string
from enum import Enum
Expand Down Expand Up @@ -74,6 +75,9 @@
from synapse.storage.databases.main import DataStore, PurgeEventsStore
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore


logger = logging.getLogger(__name__)

# Define a state map type from type/state_key to T (usually an event ID or
# event)
T = TypeVar("T")
Expand Down Expand Up @@ -666,7 +670,10 @@ async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken
except CancelledError:
raise
except Exception:
pass
# We log an exception here as even though this *might* be a client
# handing a bad token, its more likely that Synapse returned a bad
# token (and we really want to catch those!).
logger.exception("Failed to parse stream token: %r", string)
raise SynapseError(400, "Invalid room stream token %r" % (string,))

@classmethod
Expand Down Expand Up @@ -727,8 +734,10 @@ async def to_string(self, store: "DataStore") -> str:
instance_id = await store.get_id_for_instance(name)
entries.append(f"{instance_id}.{pos}")

encoded_map = "~".join(entries)
return f"m{self.stream}~{encoded_map}"
if entries:
encoded_map = "~".join(entries)
return f"m{self.stream}~{encoded_map}"
return f"s{self.stream}"
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
else:
return "s%d" % (self.stream,)

Expand Down Expand Up @@ -770,7 +779,10 @@ async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken
except CancelledError:
raise
except Exception:
pass
# We log an exception here as even though this *might* be a client
# handing a bad token, its more likely that Synapse returned a bad
# token (and we really want to catch those!).
logger.exception("Failed to parse stream token: %r", string)
raise SynapseError(400, "Invalid stream token %r" % (string,))

async def to_string(self, store: "DataStore") -> str:
Expand All @@ -786,8 +798,10 @@ async def to_string(self, store: "DataStore") -> str:
instance_id = await store.get_id_for_instance(name)
entries.append(f"{instance_id}.{pos}")

encoded_map = "~".join(entries)
return f"m{self.stream}~{encoded_map}"
if entries:
encoded_map = "~".join(entries)
return f"m{self.stream}~{encoded_map}"
return str(self.stream)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
else:
return str(self.stream)

Expand Down
47 changes: 47 additions & 0 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
#
#

from unittest import skipUnless

from immutabledict import immutabledict

from synapse.api.errors import SynapseError
from synapse.types import (
RoomAlias,
RoomStreamToken,
UserID,
get_domain_from_id,
get_localpart_from_id,
map_username_to_mxid_localpart,
)

from tests import unittest
from tests.utils import USE_POSTGRES_FOR_TESTS


class IsMineIDTests(unittest.HomeserverTestCase):
Expand Down Expand Up @@ -127,3 +133,44 @@ def test_non_ascii(self) -> None:
# this should work with either a unicode or a bytes
self.assertEqual(map_username_to_mxid_localpart("têst"), "t=c3=aast")
self.assertEqual(map_username_to_mxid_localpart("têst".encode()), "t=c3=aast")


class RoomStreamTokenTestCase(unittest.HomeserverTestCase):
def test_basic_token(self) -> None:
"""Test that a simple stream token be serialized and unserialized"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
store = self.hs.get_datastores().main

token = RoomStreamToken(stream=5)

string_token = self.get_success(token.to_string(store))
self.assertEqual(string_token, "s5")

parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, token)

@skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres")
def test_instance_map(self) -> None:
"""Test for stream token with instance map"""
store = self.hs.get_datastores().main

token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 6}))

string_token = self.get_success(token.to_string(store))
self.assertEqual(string_token, "m5~1.6")

parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, token)

@skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres")
def test_instance_map_behind(self) -> None:
"""Test for stream token with instance map, where instance map entries
are from before stream token."""
store = self.hs.get_datastores().main

token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4}))

string_token = self.get_success(token.to_string(store))
self.assertEqual(string_token, "s5")

parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, RoomStreamToken(stream=5))