Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimisation to StreamChangeCache #17130

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17130.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add optimisation to `StreamChangeCache.get_entities_changed(..)`.
20 changes: 19 additions & 1 deletion synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
return False

def get_entities_changed(
self, entities: Collection[EntityType], stream_pos: int
self, entities: Collection[EntityType], stream_pos: int, _perf_factor: int = 1
) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
Returns the subset of the given entities that have had changes after the given position.
Expand All @@ -177,13 +177,31 @@ def get_entities_changed(
Args:
entities: Entities to check for changes.
stream_pos: The stream position to check for changes after.
_perf_factor: Used by unit tests to choose when to use each
optimisation.

Return:
A subset of entities which have changed after the given stream position.

This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)

# If there have been tonnes of changes compared with the number of
# entities, it is faster to check each entities stream ordering
# one-by-one.
Comment on lines +193 to +195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conditional is more "if there have been more changes compared with the number of entities", given the _perf_factor during runtime is 1, right? In the case that the number of entities vs. changes is roughly the same, is it still the same speed or faster to make this calculation rather than what we had before?

If not, should we set the default of _perf_factor to something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I did a bunch of testing to figure out at what point it's faster, and it is indeed with a _perf_factor of 1.

I think that intuitively makes some sense: get_all_entities_changed essentially involves one look up per stream position, and checking each entity one by one requires one lookup per entity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sounds plausible!

max_stream_pos, _ = self._cache.peekitem()
if max_stream_pos - stream_pos > _perf_factor * len(entities):
self.metrics.inc_hits()
return {
entity
for entity in entities
if self._entity_to_key.get(entity, -1) > stream_pos
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to remember that stream_pos can go negative... is that relevant here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamChangeCache assumes they're positive incrementing numbers. The only stream that has negatives is the backfill stream that goes backwards (though in helper classes tend to be modelled as positive numbers), which doesn't have a StreamChangeCache.

}

cache_result = self.get_all_entities_changed(stream_pos)
if cache_result.hit:
# We now do an intersection, trying to do so in the most efficient
Expand Down
17 changes: 14 additions & 3 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from parameterized import parameterized

from synapse.util.caches.stream_change_cache import StreamChangeCache

from tests import unittest
Expand Down Expand Up @@ -161,7 +163,8 @@ def test_has_any_entity_changed(self) -> None:
self.assertFalse(cache.has_any_entity_changed(2))
self.assertFalse(cache.has_any_entity_changed(3))

def test_get_entities_changed(self) -> None:
@parameterized.expand([(0,), (1000000000,)])
def test_get_entities_changed(self, perf_factor: int) -> None:
"""
StreamChangeCache.get_entities_changed will return the entities in the
given list that have changed since the provided stream ID. If the
Expand All @@ -178,7 +181,9 @@ def test_get_entities_changed(self) -> None:
# get the ones after that point.
self.assertEqual(
cache.get_entities_changed(
["[email protected]", "[email protected]", "[email protected]"], stream_pos=2
["[email protected]", "[email protected]", "[email protected]"],
stream_pos=2,
_perf_factor=perf_factor,
),
{"[email protected]", "[email protected]"},
)
Expand All @@ -195,6 +200,7 @@ def test_get_entities_changed(self) -> None:
"[email protected]",
],
stream_pos=2,
_perf_factor=perf_factor,
),
{"[email protected]", "[email protected]"},
)
Expand All @@ -210,14 +216,19 @@ def test_get_entities_changed(self) -> None:
"[email protected]",
],
stream_pos=0,
_perf_factor=perf_factor,
),
{"[email protected]", "[email protected]", "[email protected]", "[email protected]"},
)

# Query a subset of the entries mid-way through the stream. We should
# only get back the subset.
self.assertEqual(
cache.get_entities_changed(["[email protected]"], stream_pos=2),
cache.get_entities_changed(
["[email protected]"],
stream_pos=2,
_perf_factor=perf_factor,
),
{"[email protected]"},
)

Expand Down
Loading