Skip to content

Commit

Permalink
add periodic gc.collect() callback to workers
Browse files Browse the repository at this point in the history
  • Loading branch information
pfackeldey committed Oct 17, 2024
1 parent 4960e93 commit acb073b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,5 @@ _readthedocs
**/charts/coffea-casa/charts
**/Chart.lock

# pyright lsp
pyrightconfig.json
60 changes: 58 additions & 2 deletions coffea_casa/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import logging
import uuid
import subprocess
import gc
import datetime

from distributed.diagnostics.plugin import NannyPlugin
from dask.utils import tmpfile
from distributed.compatibility import PeriodicCallback
from distributed.diagnostics.plugin import NannyPlugin, WorkerPlugin
from dask.utils import tmpfile, parse_bytes

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,3 +131,56 @@ def teardown(self, nanny):
return

return



class PeriodicGC(WorkerPlugin):
"""
A WorkerPlugin that periodically triggers garbage collection (GC) on a worker node.
The GC is triggered if the process memory exceeds a specified threshold.
Attributes
----------
freq : datetime.timedelta
The frequency of garbage collection. Default is 1ms.
tresh : int
The threshold memory in bytes. If the process memory exceeds this value, garbage collection is triggered. Default is 100MB.
Setup via:
>>> periodic_gc = PeriodicGC()
>>> client.register_plugin(periodic_gc)
"""

def __init__(
self,
freq: datetime.timedelta = datetime.timedelta(milliseconds=1),
tresh: int = parse_bytes("100 MB"),
) -> None:
"""
Parameters:
freq: Frequency of garbage collection in seconds. Default is 1ms.
tresh: Threshold memory in bytes. If the process memory exceeds this value, garbage collection is triggered. Default is 100MB.
"""
self.freq = freq
self.tresh = tresh

def setup(self, worker) -> None:
"""
Set up the periodic callback for garbage collection on the worker node.
Parameters
----------
worker : distributed.worker.Worker
The worker node on which to set up the periodic callback.
"""
pc = PeriodicCallback(self._gc_collect, self.freq)
worker.periodic_callbacks["coffea_casa_gc_collect"] = pc
self.worker = worker

def _gc_collect(self) -> None:
"""
Trigger garbage collection if the process memory exceeds the threshold.
"""
if self.worker.monitor.get_process_memory() >= self.tresh:
gc.collect()

0 comments on commit acb073b

Please sign in to comment.