Skip to content

Commit

Permalink
Move clear_old_streams into StreamsCache
Browse files Browse the repository at this point in the history
  • Loading branch information
samuller committed Nov 4, 2023
1 parent 5c82869 commit fc1e2db
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 49 deletions.
53 changes: 7 additions & 46 deletions log_rate_limit/log_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def __init__(
once per minute.
expire_offset_sec
This offset is the number of seconds after the log rate limit has been reached for a specific stream
before any info we store about the stream will expire. Default is set to 15 minutes.
before any info we store about the stream will expire. Due to the fact that we only check at specific
time intervals, this expiry offset could be delayed by as long as the `expire_check_sec` parameter as well.
Default is set to 15 minutes.
expire_msg
The message used to summarise logs that were expired.
stream_id_max_len
Expand Down Expand Up @@ -134,7 +136,7 @@ def __init__(
self._expire_msg = expire_msg
self._stream_id_max_len = stream_id_max_len

# Global coutner of when next to check expired streams.
# Global counter of when next to check expired streams.
self._next_expire_check_time: Optional[float] = None
# All data kept in memory for each stream.
if redis_url is not None:
Expand Down Expand Up @@ -240,7 +242,7 @@ def _check_expiry(self, expire_offset_sec: float, expire_msg: str) -> str:
srl_expire_note = ""
if self._next_expire_check_time is None or current_time > self._next_expire_check_time:
# Check and clear from memory any stream data that has expired.
srl_expire_note = self.clear_old_streams(expire_offset_sec, expire_msg=expire_msg)
srl_expire_note = self._streams.clear_old(expire_offset_sec, expire_msg=expire_msg)
self._next_expire_check_time = current_time + self._expire_check_sec
return srl_expire_note

Expand Down Expand Up @@ -283,6 +285,7 @@ def filter(self, record: logging.LogRecord) -> bool:
stream_id = self._limit_stream_id_length(stream_id)

# Run expiry checks before accessing any fields from the current stream.
# TODO: with a Redis cache, streams could get expired by other processes at any possible time.
srl_expire_note = self._check_expiry(expire_offset_sec, expire_msg)

# Add any extra attributes we might add to record as this allows user's own log formatting to use it (if
Expand All @@ -296,6 +299,7 @@ def filter(self, record: logging.LogRecord) -> bool:
return True

# Fetch stream that we'll use for this log message.
# TODO: with Redis cache, the value read here could immediately expire after being fetched (or any other time).
stream = self._streams[stream_id]

# Inner function to prevent code duplication.
Expand Down Expand Up @@ -336,49 +340,6 @@ def prep_to_allow_msg() -> None:
stream.skipped_log_count += 1
return self._skip_log_or_add_note(srl_expire_note, record)

def clear_old_streams(
self, expire_time_sec: Optional[float] = None, current_time: Optional[float] = None, expire_msg: str = ""
) -> str:
"""Clear old stream IDs to free up memory (in case of many large stream ID strings).
Parameters
----------
expire_time_sec
Only clear out streams that haven't been reset in this period of time (in seconds). This is an amount of
time added after rate-limiting no longer applies anyway. Highly recommended to be a positive number.
Default is the expire offset configured at initialisation.
current_time
Optional parameter that can be used to call this function for different points in time.
expire_msg
Message format of logs used to report expired streams.
"""
if expire_time_sec is None:
expire_time_sec = self._expire_offset_sec
if current_time is None:
current_time = time.time()

# We build a string of keys to remove first since the alternative of looping through the dictionary and
# removing keys will require that we iterate through a copy of the dictionary's keys which is exactly
# the part which might be using excessive memory.
keys_to_remove = []
for stream_id in self._streams.keys():
next_valid_time = self._streams[stream_id].next_valid_time
# Daylight savings or other time changes might break or trigger this check during the transition period?
if (next_valid_time + expire_time_sec) < current_time:
keys_to_remove.append(stream_id)

expire_note = ""
for stream_id in keys_to_remove:
skip_count = self._streams[stream_id].skipped_log_count
# If any logs were skipped, then we log it before clearing the cache.
if skip_count > 0:
added_msg = expire_msg.format(numskip=skip_count, stream_id=stream_id, expire_time_sec=expire_time_sec)
expire_note += f"\n{added_msg}"
# Remove keys
del self._streams[stream_id]

return expire_note

def _key_size(self) -> int:
"""Get an estimate of the number of keys stored in memory."""
return len(self._streams)
Expand Down
43 changes: 42 additions & 1 deletion log_rate_limit/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
# https://stackoverflow.com/questions/44640479/type-annotation-for-classmethod-returning-instance
from __future__ import annotations

import redis
import time
import hashlib
import dataclasses
from dataclasses import dataclass
from collections import defaultdict
from typing import Dict, Optional, Protocol, Set, Any

import redis


# Type for possible values of a stream_id.
StreamID = Optional[str]
Expand Down Expand Up @@ -57,6 +59,45 @@ def keys(self) -> Any: # Set[StreamID]:
"""Get an iterator of all streams cached under this prefix."""
...

def clear_old(self, expire_time_sec: float, current_time: Optional[float] = None, expire_msg: str = "") -> str:
"""Clear old stream IDs to free up memory (in case of many large stream ID strings).
Parameters
----------
expire_time_sec
Only clear out streams that haven't been reset in this period of time (in seconds). This is an amount of
time added after rate-limiting no longer applies anyway. Highly recommended to be a positive number.
Default is the expire offset configured at initialisation.
current_time
Optional parameter that can be used to call this function for different points in time.
expire_msg
Message format of logs used to report expired streams.
"""
if current_time is None:
current_time = time.time()

# We build a string of keys to remove first since the alternative of looping through the dictionary and
# removing keys will require that we iterate through a copy of the dictionary's keys which is exactly
# the part which might be using excessive memory.
keys_to_remove = []
for stream_id in self.keys():
next_valid_time = self[stream_id].next_valid_time
# Daylight savings or other time changes might break or trigger this check during the transition period?
if (next_valid_time + expire_time_sec) < current_time:
keys_to_remove.append(stream_id)

expire_note = ""
for stream_id in keys_to_remove:
skip_count = self[stream_id].skipped_log_count
# If any logs were skipped, then we log it before clearing the cache.
if skip_count > 0:
added_msg = expire_msg.format(numskip=skip_count, stream_id=stream_id, expire_time_sec=expire_time_sec)
expire_note += f"\n{added_msg}"
# Remove keys
del self[stream_id]

return expire_note


class StreamsCacheDict(defaultdict, StreamsCache): # type: ignore
"""An implementation of StreamsCache that uses a DefaultDict to store the stream info in-memory of the process."""
Expand Down
4 changes: 2 additions & 2 deletions tests/test_log_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def test_manual_clear(request):
_log.info("Line 1")
assert srlf._key_size() == 1
# Test manually clearing when using default values.
srlf.clear_old_streams()
srlf._streams.clear_old(0)
assert srlf._key_size() == 0


Expand All @@ -396,7 +396,7 @@ def test_clear_with_custom_time(request):
assert srlf._key_size() == 0
_log.info("Line 1")
assert srlf._key_size() == 1
srlf.clear_old_streams(3600, time.time() + 3600 + 1)
srlf._streams.clear_old(3600, time.time() + 3600 + 1)
assert srlf._key_size() == 0


Expand Down

0 comments on commit fc1e2db

Please sign in to comment.