Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In sync wait for worker to catch up since token #17215

Merged
merged 9 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions changelog.d/17215.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where duplicate events could be sent down sync when using workers that are overloaded.
35 changes: 35 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,23 @@ def __bool__(self) -> bool:
or self.device_lists
)

@staticmethod
def empty(next_batch: StreamToken) -> "SyncResult":
"Return a new empty result"
return SyncResult(
next_batch=next_batch,
presence=[],
account_data=[],
joined=[],
invited=[],
knocked=[],
archived=[],
to_device=[],
device_lists=DeviceListUpdates(),
device_one_time_keys_count={},
device_unused_fallback_key_types=[],
)


class SyncHandler:
def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -401,6 +418,24 @@ async def _wait_for_sync_for_user(
if context:
context.tag = sync_label

if since_token is not None:
# We need to make sure this worker has caught up with the token. If
# this returns false it means we timed out waiting, and we should
# just return an empty response.
start = self.clock.time_msec()
if not await self.notifier.wait_for_stream_token(since_token):
logger.warning(
"Timed out waiting for worker to catch up. Returning empty response"
)
return SyncResult.empty(since_token)

# If we've spent significant time waiting to catch up, take it off
# the timeout.
now = self.clock.time_msec()
if now - start > 1_000:
timeout -= now - start
timeout = max(timeout, 0)

# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
if since_token is not None:
Expand Down
19 changes: 19 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,25 @@ async def check_for_updates(

return result

async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
"""Wait for this worker to catch up with the given stream token."""

start = self.clock.time_msec()
while True:
current_token = self.event_sources.get_current_token()
if stream_token.is_before_or_eq(current_token):
return True

now = self.clock.time_msec()

if now - start > 10_000:
return False

logger.info("Waiting for current token to reach %s", stream_token)

# TODO: be better
await self.clock.sleep(0.5)

async def _get_room_ids(
self, user: UserID, explicit_room_id: Optional[str]
) -> Tuple[StrCollection, bool]:
Expand Down
52 changes: 51 additions & 1 deletion synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from immutabledict import immutabledict
from signedjson.key import decode_verify_key_bytes
from signedjson.types import VerifyKey
from typing_extensions import TypedDict
from typing_extensions import Self, TypedDict
from unpaddedbase64 import decode_base64
from zope.interface import Interface

Expand Down Expand Up @@ -515,6 +515,27 @@ def get_stream_pos_for_instance(self, instance_name: str) -> int:
# at `self.stream`.
return self.instance_map.get(instance_name, self.stream)

def is_before_or_eq(self, other_token: Self) -> bool:
"""Wether this token is before the other token, i.e. every constituent
part is before the other.

Essentially it is `self <= other`.

Note: if `self.is_before_or_eq(other_token) is False` then that does not
imply that the reverse is True.
"""
if self.stream > other_token.stream:
return False

instances = self.instance_map.keys() | other_token.instance_map.keys()
for instance in instances:
if self.instance_map.get(
instance, self.stream
) > other_token.instance_map.get(instance, other_token.stream):
return False

return True


@attr.s(frozen=True, slots=True, order=False)
class RoomStreamToken(AbstractMultiWriterStreamToken):
Expand Down Expand Up @@ -1008,6 +1029,35 @@ def get_field(
"""Returns the stream ID for the given key."""
return getattr(self, key.value)

def is_before_or_eq(self, other_token: "StreamToken") -> bool:
"""Wether this token is before the other token, i.e. every constituent
part is before the other.

Essentially it is `self <= other`.

Note: if `self.is_before_or_eq(other_token) is False` then that does not
imply that the reverse is True.
"""

for _, key in StreamKeyType.__members__.items():
self_value = self.get_field(key)
other_value = other_token.get_field(key)

if isinstance(self_value, RoomStreamToken):
assert isinstance(other_value, RoomStreamToken)
if not self_value.is_before_or_eq(other_value):
return False
elif isinstance(self_value, MultiWriterStreamToken):
assert isinstance(other_value, MultiWriterStreamToken)
if not self_value.is_before_or_eq(other_value):
return False
else:
assert isinstance(other_value, int)
if self_value > other_value:
return False

return True


StreamToken.START = StreamToken(
RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0
Expand Down
Loading