Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add config flags to allow for cache auto-tuning #12701

Merged
merged 25 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8dc1836
Refactor jemalloc
erikjohnston May 3, 2022
8af38b8
add config option, read from config, apply logic in _expire_old_caches
H-Shay May 5, 2022
877712e
Merge remote-tracking branch 'origin/erikj/mem_limit_caches' into sha…
H-Shay May 5, 2022
0e9e9f5
import jemalloc
H-Shay May 6, 2022
7b1c638
more work
H-Shay May 9, 2022
96e83cb
add tests and refine logic
H-Shay May 10, 2022
f09485d
lints
H-Shay May 10, 2022
26e5329
newsfragment
H-Shay May 10, 2022
4ae489f
better guard clause
H-Shay May 10, 2022
628fdbd
Merge branch 'develop' into shay/cache_tuning
H-Shay May 10, 2022
c629938
regenerate sample config
H-Shay May 10, 2022
4aa6740
less stupid mypy situation
H-Shay May 10, 2022
ef9824a
Update lrucache.py
H-Shay May 10, 2022
864c06f
Fix typo refreshing stats
erikjohnston May 11, 2022
e2a441e
Call refresh_stats() to update stats
erikjohnston May 11, 2022
bc0de12
stop evicting if can't read memory
H-Shay May 11, 2022
6f57dc0
do less work in try block
H-Shay May 11, 2022
308557a
apply suggestions from code review
H-Shay May 11, 2022
69f765f
Merge branch 'shay/cache_tuning' of https://github.com/matrix-org/syn…
H-Shay May 11, 2022
a50cbb6
lints + regenerate sample config
H-Shay May 11, 2022
63680b2
Merge branch 'develop' into shay/cache_tuning
H-Shay May 11, 2022
06e712f
more revision
H-Shay May 11, 2022
99cc533
Merge branch 'shay/cache_tuning' of https://github.com/matrix-org/syn…
H-Shay May 11, 2022
9ae7671
move sleep
H-Shay May 11, 2022
13e0f2c
add config manual entry
H-Shay May 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12701.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a config options to allow for auto-tuning of caches.
18 changes: 18 additions & 0 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,24 @@ caches:
#
#cache_entry_ttl: 30m

# This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`,
# `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain
# a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize
# this option, and all three of the options must be specified for this feature to work.
#cache_autotuning:
# This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted.
# They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
# the flag below, or until the `min_cache_ttl` is hit.
#max_cache_memory_usage: 1024M

# This flag sets a rough target for the desired memory usage of the caches.
#target_cache_memory_usage: 758M

# 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when
# caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches
# from being emptied while Synapse is evicting due to memory.
#min_cache_ttl: 5m

# Controls how long the results of a /sync request are cached for after
# a successful response is returned. A higher duration can help clients with
# intermittent connections, at the cost of higher memory usage.
Expand Down
33 changes: 33 additions & 0 deletions synapse/config/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ def generate_config_section(self, **kwargs: Any) -> str:
#
#cache_entry_ttl: 30m

# This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`,
# `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain
# a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize
# this option, and all three of the options must be specified for this feature to work.
#cache_autotuning:
# This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted.
# They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
# the flag below, or until the `min_cache_ttl` is hit.
#max_cache_memory_usage: 1024M

# This flag sets a rough target for the desired memory usage of the caches.
#target_cache_memory_usage: 758M

# 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when
# caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches
# from being emptied while Synapse is evicting due to memory.
#min_cache_ttl: 5m

# Controls how long the results of a /sync request are cached for after
# a successful response is returned. A higher duration can help clients with
# intermittent connections, at the cost of higher memory usage.
Expand Down Expand Up @@ -263,6 +281,21 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
)
self.expiry_time_msec = self.parse_duration(expiry_time)

self.cache_autotuning = cache_config.get("cache_autotuning")
if self.cache_autotuning:
max_memory_usage = self.cache_autotuning.get("max_cache_memory_usage")
self.cache_autotuning["max_cache_memory_usage"] = self.parse_size(
max_memory_usage
)

target_mem_size = self.cache_autotuning.get("target_cache_memory_usage")
self.cache_autotuning["target_cache_memory_usage"] = self.parse_size(
target_mem_size
)

min_cache_ttl = self.cache_autotuning.get("min_cache_ttl")
self.cache_autotuning["min_cache_ttl"] = self.parse_duration(min_cache_ttl)

self.sync_response_cache_duration = self.parse_duration(
cache_config.get("sync_response_cache_duration", 0)
)
Expand Down
114 changes: 73 additions & 41 deletions synapse/metrics/jemalloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import re
from typing import Iterable, Optional, overload

import attr
from prometheus_client import REGISTRY, Metric
from typing_extensions import Literal

Expand All @@ -27,52 +28,24 @@
logger = logging.getLogger(__name__)


def _setup_jemalloc_stats() -> None:
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
statistics exposed by jemalloc.
"""

# Try to find the loaded jemalloc shared library, if any. We need to
# introspect into what is loaded, rather than loading whatever is on the
# path, as if we load a *different* jemalloc version things will seg fault.

# We look in `/proc/self/maps`, which only exists on linux.
if not os.path.exists("/proc/self/maps"):
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
return

# We're looking for a path at the end of the line that includes
# "libjemalloc".
regex = re.compile(r"/\S+/libjemalloc.*$")

jemalloc_path = None
with open("/proc/self/maps") as f:
for line in f:
match = regex.search(line.strip())
if match:
jemalloc_path = match.group()

if not jemalloc_path:
# No loaded jemalloc was found.
logger.debug("jemalloc not found")
return

logger.debug("Found jemalloc at %s", jemalloc_path)

jemalloc = ctypes.CDLL(jemalloc_path)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class JemallocStats:
jemalloc: ctypes.CDLL

@overload
def _mallctl(
name: str, read: Literal[True] = True, write: Optional[int] = None
self, name: str, read: Literal[True] = True, write: Optional[int] = None
) -> int:
...

@overload
def _mallctl(name: str, read: Literal[False], write: Optional[int] = None) -> None:
def _mallctl(
self, name: str, read: Literal[False], write: Optional[int] = None
) -> None:
...

def _mallctl(
name: str, read: bool = True, write: Optional[int] = None
self, name: str, read: bool = True, write: Optional[int] = None
) -> Optional[int]:
"""Wrapper around `mallctl` for reading and writing integers to
jemalloc.
Expand Down Expand Up @@ -120,7 +93,7 @@ def _mallctl(
# Where oldp/oldlenp is a buffer where the old value will be written to
# (if not null), and newp/newlen is the buffer with the new value to set
# (if not null). Note that they're all references *except* newlen.
result = jemalloc.mallctl(
result = self.jemalloc.mallctl(
name.encode("ascii"),
input_var_ref,
input_len_ref,
Expand All @@ -136,21 +109,80 @@ def _mallctl(

return input_var.value

def _jemalloc_refresh_stats() -> None:
def refresh_stats(self) -> None:
"""Request that jemalloc updates its internal statistics. This needs to
be called before querying for stats, otherwise it will return stale
values.
"""
try:
_mallctl("epoch", read=False, write=1)
self._mallctl("epoch", read=False, write=1)
except Exception as e:
logger.warning("Failed to reload jemalloc stats: %s", e)

def get_stat(self, name: str) -> int:
"""Request the stat of the given name at the time of the last
`refresh_stats` call. This may throw if we fail to read
the stat.
"""
return self._mallctl(f"stats.{name}")


_JEMALLOC_STATS: Optional[JemallocStats] = None


def get_jemalloc_stats() -> Optional[JemallocStats]:
"""Returns an interface to jemalloc, if it is being used.

Note that this will always return None until `setup_jemalloc_stats` has been
called.
"""
return _JEMALLOC_STATS


def _setup_jemalloc_stats() -> None:
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
statistics exposed by jemalloc.
"""

global _JEMALLOC_STATS

# Try to find the loaded jemalloc shared library, if any. We need to
# introspect into what is loaded, rather than loading whatever is on the
# path, as if we load a *different* jemalloc version things will seg fault.

# We look in `/proc/self/maps`, which only exists on linux.
if not os.path.exists("/proc/self/maps"):
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
return

# We're looking for a path at the end of the line that includes
# "libjemalloc".
regex = re.compile(r"/\S+/libjemalloc.*$")

jemalloc_path = None
with open("/proc/self/maps") as f:
for line in f:
match = regex.search(line.strip())
if match:
jemalloc_path = match.group()

if not jemalloc_path:
# No loaded jemalloc was found.
logger.debug("jemalloc not found")
return

logger.debug("Found jemalloc at %s", jemalloc_path)

jemalloc_dll = ctypes.CDLL(jemalloc_path)

stats = JemallocStats(jemalloc_dll)
_JEMALLOC_STATS = stats

class JemallocCollector(Collector):
"""Metrics for internal jemalloc stats."""

def collect(self) -> Iterable[Metric]:
_jemalloc_refresh_stats()
stats.refresh_stats()

g = GaugeMetricFamily(
"jemalloc_stats_app_memory_bytes",
Expand Down Expand Up @@ -184,7 +216,7 @@ def collect(self) -> Iterable[Metric]:
"metadata",
):
try:
value = _mallctl(f"stats.{t}")
value = stats.get_stat(t)
except Exception as e:
# There was an error fetching the value, skip.
logger.warning("Failed to read jemalloc stats.%s: %s", t, e)
Expand Down
79 changes: 69 additions & 10 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import math
import threading
import weakref
from enum import Enum
Expand Down Expand Up @@ -40,6 +41,7 @@

from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
Expand Down Expand Up @@ -106,10 +108,16 @@ def update_last_access(self, clock: Clock) -> None:


@wrap_as_background_process("LruCache._expire_old_entries")
async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
async def _expire_old_entries(
clock: Clock, expiry_seconds: int, autotune_config: Optional[dict]
) -> None:
"""Walks the global cache list to find cache entries that haven't been
accessed in the given number of seconds.
accessed in the given number of seconds, or if a given memory threshold has been breached.
"""
if autotune_config:
max_cache_memory_usage = autotune_config["max_cache_memory_usage"]
target_cache_memory_usage = autotune_config["target_cache_memory_usage"]
min_cache_ttl = autotune_config["min_cache_ttl"] / 1000

now = int(clock.time())
node = GLOBAL_ROOT.prev_node
Expand All @@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:

logger.debug("Searching for stale caches")

evicting_due_to_memory = False

# determine if we're evicting due to memory
jemalloc_interface = get_jemalloc_stats()
if jemalloc_interface and autotune_config:
try:
jemalloc_interface.refresh_stats()
mem_usage = jemalloc_interface.get_stat("allocated")
if mem_usage > max_cache_memory_usage:
logger.info("Begin memory-based cache eviction.")
evicting_due_to_memory = True
except Exception:
logger.warning(
"Unable to read allocated memory, skipping memory-based cache eviction."
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
while node is not GLOBAL_ROOT:
# Only the root node isn't a `_TimedListNode`.
assert isinstance(node, _TimedListNode)

if node.last_access_ts_secs > now - expiry_seconds:
# if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's
# nothing to do here
if (
node.last_access_ts_secs > now - expiry_seconds
and not evicting_due_to_memory
):
break

# if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer
if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl:
break

cache_entry = node.get_cache_entry()
Expand All @@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
assert cache_entry is not None
cache_entry.drop_from_cache()

# Check mem allocation periodically if we are evicting a bunch of caches
if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0:
try:
jemalloc_interface.refresh_stats()
mem_usage = jemalloc_interface.get_stat("allocated")
if mem_usage < target_cache_memory_usage:
evicting_due_to_memory = False
logger.info("Stop memory-based cache eviction.")
except Exception:
logger.warning(
"Unable to read allocated memory, this may affect memory-based cache eviction."
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
# If we've failed to read the current memory usage then we
# should stop trying to evict based on memory usage
evicting_due_to_memory = False

# If we do lots of work at once we yield to allow other stuff to happen.
if (i + 1) % 10000 == 0:
logger.debug("Waiting during drop")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some logic such as:

@@ -185,7 +185,12 @@ async def _expire_old_entries(
         # If we do lots of work at once we yield to allow other stuff to happen.
         if (i + 1) % 10000 == 0:
             logger.debug("Waiting during drop")
-            await clock.sleep(0)
+            if node.last_access_ts_secs > now - expiry_seconds:
+                await clock.sleep(0.5)
+            else:
+                await clock.sleep(0)
             logger.debug("Waking during drop")
 
         node = next_node

So that we sleep a bit when we're evicting due to memory usage. Testing this on jki.re the memory often wasn't deallocated immediately, so giving a bit of a pause allows the memory usage to drop.

await clock.sleep(0)
if node.last_access_ts_secs > now - expiry_seconds:
await clock.sleep(0.5)
else:
await clock.sleep(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant updating the logic below where we already do a await clock.sleep(0) 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol that's what I thought at first and then I second-guessed myself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, its certainly a legitimate choice to put it there, and sleeping more often may actually be better in these circumstances 🙂

logger.debug("Waking during drop")

node = next_node
Expand All @@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:

def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
"""Start a background job that expires all cache entries if they have not
been accessed for the given number of seconds.
been accessed for the given number of seconds, or if a given memory usage threshold has been
breached.
"""
if not hs.config.caches.expiry_time_msec:
if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning:
return

logger.info(
"Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000
)
if hs.config.caches.expiry_time_msec:
expiry_time = hs.config.caches.expiry_time_msec / 1000
logger.info("Expiring LRU caches after %d seconds", expiry_time)
else:
expiry_time = math.inf

global USE_GLOBAL_LIST
USE_GLOBAL_LIST = True

clock = hs.get_clock()
clock.looping_call(
_expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000
_expire_old_entries,
30 * 1000,
clock,
expiry_time,
hs.config.caches.cache_autotuning,
)


Expand Down
Loading