Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Ensure that we do not cache empty sync responses after a timeout #10158

Merged
merged 4 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10157.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.21.0 which could cause `/sync` to return immediately with an empty response.
1 change: 0 additions & 1 deletion changelog.d/10157.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/10158.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.21.0 which could cause `/sync` to return immediately with an empty response.
36 changes: 26 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client

Expand Down Expand Up @@ -83,12 +83,15 @@
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100


SyncRequestKey = Tuple[Any, ...]


@attr.s(slots=True, frozen=True)
class SyncConfig:
user = attr.ib(type=UserID)
filter_collection = attr.ib(type=FilterCollection)
is_guest = attr.ib(type=bool)
request_key = attr.ib(type=Tuple[Any, ...])
request_key = attr.ib(type=SyncRequestKey)
device_id = attr.ib(type=Optional[str])


Expand Down Expand Up @@ -266,9 +269,9 @@ def __init__(self, hs: "HomeServer"):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(
self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
hs.get_clock(), "sync"
) # type: ResponseCache[Tuple[Any, ...]]
)
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
Expand Down Expand Up @@ -307,16 +310,18 @@ async def wait_for_sync_for_user(
since_token,
timeout,
full_state,
cache_context=True,
)
logger.debug("Returning sync response for %s", user_id)
return res

async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult:
if since_token is None:
sync_type = "initial_sync"
Expand All @@ -343,13 +348,13 @@ async def _wait_for_sync_for_user(
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = await self.current_sync_for_user(
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
)
else:

def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
async def current_sync_callback(before_token, after_token) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand All @@ -358,6 +363,17 @@ def current_sync_callback(before_token, after_token):
from_token=since_token,
)

# if nothing has happened in any of the users' rooms since /sync was called,
# the resultant next_batch will be the same as since_token (since the result
# is generated when wait_for_events is first called, and not regenerated
# when wait_for_events times out).
#
# If that happens, we mustn't cache it, so that when the client comes back
# with the same cache token, we don't immediately return the same empty
# result, causing a tightloop. (#8518)
if result.next_batch == since_token:
cache_context.should_cache = False

if result:
if sync_config.filter_collection.lazy_load_members():
lazy_loaded = "true"
Expand Down
6 changes: 2 additions & 4 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@
"phonenumbers>=8.2.0",
# we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
"prometheus_client>=0.4.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
# we use `order`, which arrived in attrs 19.2.0.
# Note: 21.1.0 broke `/sync`, see #9936
"attrs>=19.1.0,!=21.1.0",
"attrs>=19.2.0,!=21.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
"bleach>=1.4.3",
Expand Down
2 changes: 1 addition & 1 deletion synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def f2(m):
return username.decode("ascii")


@attr.s(frozen=True, slots=True, cmp=False)
@attr.s(frozen=True, slots=True, order=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to fix this to make the if result.next_batch == since_token conditional work. As I wrote in #8518, I think it might have been exacerbating the situation.

class RoomStreamToken:
"""Tokens are positions between events. The token "s1" comes after event 1.

Expand Down
50 changes: 50 additions & 0 deletions tests/rest/client/v2_alpha/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,53 @@ def _check_unread_count(self, expected_count: int):

# Store the next batch for the next request.
self.next_batch = channel.json_body["next_batch"]


class SyncCacheTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
sync.register_servlets,
]

def test_noop_sync_does_not_tightloop(self):
"""If the sync times out, we shouldn't cache the result

Essentially a regression test for #8518.
"""
self.user_id = self.register_user("kermit", "monkey")
self.tok = self.login("kermit", "monkey")

# we should immediately get an initial sync response
channel = self.make_request("GET", "/sync", access_token=self.tok)
self.assertEqual(channel.code, 200, channel.json_body)

# now, make an incremental sync request, with a timeout
next_batch = channel.json_body["next_batch"]
channel = self.make_request(
"GET",
f"/sync?since={next_batch}&timeout=10000",
access_token=self.tok,
await_result=False,
)
# that should block for 10 seconds
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=9900)
channel.await_result(timeout_ms=200)
self.assertEqual(channel.code, 200, channel.json_body)

# we expect the next_batch in the result to be the same as before
self.assertEqual(channel.json_body["next_batch"], next_batch)

# another incremental sync should also block.
channel = self.make_request(
"GET",
f"/sync?since={next_batch}&timeout=10000",
access_token=self.tok,
await_result=False,
)
# that should block for 10 seconds
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=9900)
channel.await_result(timeout_ms=200)
self.assertEqual(channel.code, 200, channel.json_body)
8 changes: 3 additions & 5 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,19 @@ def isSecure(self):
def transport(self):
return self

def await_result(self, timeout: int = 100) -> None:
def await_result(self, timeout_ms: int = 1000) -> None:
"""
Wait until the request is finished.
"""
end_time = self._reactor.seconds() + timeout_ms / 1000.0
self._reactor.run()
x = 0

while not self.is_finished():
# If there's a producer, tell it to resume producing so we get content
if self._producer:
self._producer.resumeProducing()

x += 1

if x > timeout:
if self._reactor.seconds() > end_time:
raise TimedOutException("Timed out waiting for request to finish.")

self._reactor.advance(0.1)
Expand Down