-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optimisation to StreamChangeCache
#17130
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add optimisation to `StreamChangeCache.get_entities_changed(..)`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,7 +165,7 @@ def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: | |
return False | ||
|
||
def get_entities_changed( | ||
self, entities: Collection[EntityType], stream_pos: int | ||
self, entities: Collection[EntityType], stream_pos: int, _perf_factor: int = 1 | ||
) -> Union[Set[EntityType], FrozenSet[EntityType]]: | ||
""" | ||
Returns the subset of the given entities that have had changes after the given position. | ||
|
@@ -177,13 +177,31 @@ def get_entities_changed( | |
Args: | ||
entities: Entities to check for changes. | ||
stream_pos: The stream position to check for changes after. | ||
_perf_factor: Used by unit tests to choose when to use each | ||
optimisation. | ||
|
||
Return: | ||
A subset of entities which have changed after the given stream position. | ||
|
||
This will be all entities if the given stream position is at or earlier | ||
than the earliest known stream position. | ||
""" | ||
if not self._cache or stream_pos <= self._earliest_known_stream_pos: | ||
self.metrics.inc_misses() | ||
return set(entities) | ||
|
||
# If there have been tonnes of changes compared with the number of | ||
# entities, it is faster to check each entities stream ordering | ||
# one-by-one. | ||
max_stream_pos, _ = self._cache.peekitem() | ||
if max_stream_pos - stream_pos > _perf_factor * len(entities): | ||
self.metrics.inc_hits() | ||
return { | ||
entity | ||
for entity in entities | ||
if self._entity_to_key.get(entity, -1) > stream_pos | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I seem to remember that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
cache_result = self.get_all_entities_changed(stream_pos) | ||
if cache_result.hit: | ||
# We now do an intersection, trying to do so in the most efficient | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from parameterized import parameterized | ||
|
||
from synapse.util.caches.stream_change_cache import StreamChangeCache | ||
|
||
from tests import unittest | ||
|
@@ -161,7 +163,8 @@ def test_has_any_entity_changed(self) -> None: | |
self.assertFalse(cache.has_any_entity_changed(2)) | ||
self.assertFalse(cache.has_any_entity_changed(3)) | ||
|
||
def test_get_entities_changed(self) -> None: | ||
@parameterized.expand([(0,), (1000000000,)]) | ||
def test_get_entities_changed(self, perf_factor: int) -> None: | ||
""" | ||
StreamChangeCache.get_entities_changed will return the entities in the | ||
given list that have changed since the provided stream ID. If the | ||
|
@@ -178,7 +181,9 @@ def test_get_entities_changed(self) -> None: | |
# get the ones after that point. | ||
self.assertEqual( | ||
cache.get_entities_changed( | ||
["[email protected]", "[email protected]", "[email protected]"], stream_pos=2 | ||
["[email protected]", "[email protected]", "[email protected]"], | ||
stream_pos=2, | ||
_perf_factor=perf_factor, | ||
), | ||
{"[email protected]", "[email protected]"}, | ||
) | ||
|
@@ -195,6 +200,7 @@ def test_get_entities_changed(self) -> None: | |
"[email protected]", | ||
], | ||
stream_pos=2, | ||
_perf_factor=perf_factor, | ||
), | ||
{"[email protected]", "[email protected]"}, | ||
) | ||
|
@@ -210,14 +216,19 @@ def test_get_entities_changed(self) -> None: | |
"[email protected]", | ||
], | ||
stream_pos=0, | ||
_perf_factor=perf_factor, | ||
), | ||
{"[email protected]", "[email protected]", "[email protected]", "[email protected]"}, | ||
) | ||
|
||
# Query a subset of the entries mid-way through the stream. We should | ||
# only get back the subset. | ||
self.assertEqual( | ||
cache.get_entities_changed(["[email protected]"], stream_pos=2), | ||
cache.get_entities_changed( | ||
["[email protected]"], | ||
stream_pos=2, | ||
_perf_factor=perf_factor, | ||
), | ||
{"[email protected]"}, | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conditional is more "if there have been more changes compared with the number of entities", given the
_perf_factor
during runtime is1
, right? In the case that the number of entities vs. changes is roughly the same, is it still the same speed or faster to make this calculation rather than what we had before?If not, should we set the default of
_perf_factor
to something else?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I did a bunch of testing to figure out at what point it's faster, and it is indeed with a
_perf_factor
of 1.I think that intuitively makes some sense:
get_all_entities_changed
essentially involves one look up per stream position, and checking each entity one by one requires one lookup per entity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, sounds plausible!