Skip to content

Commit

Permalink
move all redis session creation to factory (#4365)
Browse files Browse the repository at this point in the history
This started as an attempt to mock redis for consistent testing outside
of normal behavior.

That attempt failed thus far, but to capture all redis session
initialization behind a single function still seems of value, for future
work and easier debugging.

This PR includes a fence around the cache clearing redis communication
necessary in a patient's timeline rebuild, to only hit redis when not
under test. Necessary as otherwise, we experience a deadlock situation
where tables can't be dropped between test runs, as postgres believes a
deadlock condition exists.

---------

Co-authored-by: Ivan Cvitkovic <[email protected]>
  • Loading branch information
pbugni and ivan-c authored Mar 5, 2024
1 parent dc1751d commit 834a9ae
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 36 deletions.
5 changes: 2 additions & 3 deletions portal/config/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""Configuration"""
import os

import redis

from portal.factories.redis import create_redis
from portal.models.role import ROLE

SITE_CFG = 'site.cfg'
Expand Down Expand Up @@ -152,7 +151,7 @@ class BaseConfig(object):
REDIS_URL
)

SESSION_REDIS = redis.from_url(SESSION_REDIS_URL)
SESSION_REDIS = create_redis(SESSION_REDIS_URL)

UPDATE_PATIENT_TASK_BATCH_SIZE = int(
os.environ.get('UPDATE_PATIENT_TASK_BATCH_SIZE', 16)
Expand Down
4 changes: 4 additions & 0 deletions portal/factories/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import redis

def create_redis(url):
return redis.Redis.from_url(url)
20 changes: 11 additions & 9 deletions portal/models/qb_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from dateutil.relativedelta import relativedelta
from flask import current_app
import redis
from redis.exceptions import ConnectionError
from sqlalchemy.types import Enum as SQLA_Enum
from werkzeug.exceptions import BadRequest
Expand All @@ -13,6 +12,7 @@
from ..cache import cache, TWO_HOURS
from ..database import db
from ..date_tools import FHIR_datetime, RelativeDelta
from ..factories.redis import create_redis
from ..set_tools import left_center_right
from ..timeout_lock import ADHERENCE_DATA_KEY, CacheModeration, TimeoutLock
from ..trace import trace
Expand Down Expand Up @@ -761,12 +761,15 @@ def invalidate_users_QBT(user_id, research_study_id):
for ad in adh_data:
db.session.delete(ad)

# clear the timeout lock as well, since we need a refresh
# after deletion of the adherence data
cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format(
patient_id=user_id,
research_study_id=research_study_id))
cache_moderation.reset()
if not current_app.config.get("TESTING", False):
# clear the timeout lock as well, since we need a refresh
# after deletion of the adherence data
# otherwise, we experience a deadlock situation where tables can't be dropped
# between test runs, as postgres believes a deadlock condition exists
cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format(
patient_id=user_id,
research_study_id=research_study_id))
cache_moderation.reset()


# args have to match order and values - no wild carding avail
Expand Down Expand Up @@ -1241,8 +1244,7 @@ def __init__(self):
# Lookup the configured expiration of the matching cache
# container ("DOGPILE_CACHE_REGIONS" -> "assessment_cache_region")
if self.redis is None:
self.redis = redis.StrictRedis.from_url(
current_app.config['REDIS_URL'])
self.redis = create_redis(current_app.config['REDIS_URL'])
regions = current_app.config['DOGPILE_CACHE_REGIONS']
for region_name, duration in regions:
if region_name == self.region_name:
Expand Down
6 changes: 3 additions & 3 deletions portal/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

from celery.utils.log import get_task_logger
from flask import current_app
import redis
from requests import Request, Session
from requests.exceptions import RequestException
from sqlalchemy import and_

from .database import db
from .factories.app import create_app
from .factories.redis import create_redis
from .factories.celery import create_celery
from .models.communication import Communication
from .models.communication_request import queue_outstanding_messages
Expand Down Expand Up @@ -401,7 +401,7 @@ def token_watchdog(**kwargs):
def celery_beat_health_check(**kwargs):
"""Refreshes self-expiring redis value for /healthcheck of celerybeat"""

rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL'])
rs = create_redis(current_app.config['REDIS_URL'])
return rs.setex(
name='last_celery_beat_ping',
time=current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'],
Expand All @@ -414,7 +414,7 @@ def celery_beat_health_check(**kwargs):
def celery_beat_health_check_low_priority_queue(**kwargs):
"""Refreshes self-expiring redis value for /healthcheck of celerybeat"""

rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL'])
rs = create_redis(current_app.config['REDIS_URL'])
return rs.setex(
name='last_celery_beat_ping_low_priority_queue',
time=10*current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'],
Expand Down
8 changes: 3 additions & 5 deletions portal/timeout_lock.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import time

from flask import current_app
import redis

from .factories.redis import create_redis

class LockTimeout(BaseException):
"""Exception raised when wait for TimeoutLock exceeds timeout"""
Expand Down Expand Up @@ -31,8 +31,7 @@ def __init__(self, key, expires=60, timeout=10):
self.key = key
self.timeout = timeout
self.expires = expires
self.redis = redis.StrictRedis.from_url(
current_app.config['REDIS_URL'])
self.redis = create_redis(current_app.config['REDIS_URL'])

def __enter__(self):
timeout = self.timeout
Expand Down Expand Up @@ -105,8 +104,7 @@ class CacheModeration(object):
def __init__(self, key, timeout=300):
self.key = key
self.timeout = timeout
self.redis = redis.StrictRedis.from_url(
current_app.config['REDIS_URL'])
self.redis = create_redis(current_app.config['REDIS_URL'])

def run_recently(self):
"""if key has value in redis (i.e. didn't expire) return value"""
Expand Down
8 changes: 4 additions & 4 deletions portal/views/healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from celery.exceptions import TimeoutError
from celery.result import AsyncResult
from flask import Blueprint, current_app
import redis
from redis.exceptions import ConnectionError
from sqlalchemy import text

from ..database import db
from ..factories.celery import create_celery
from ..factories.redis import create_redis

HEALTHCHECK_FAILURE_STATUS_CODE = 200

Expand All @@ -23,7 +23,7 @@ def celery_beat_ping():
This allows us to monitor whether celery beat tasks are running
"""
try:
rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL'])
rs = create_redis(current_app.config['REDIS_URL'])
rs.setex(
name='last_celery_beat_ping',
time=current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'],
Expand Down Expand Up @@ -64,7 +64,7 @@ def celery_available():
def celery_beat_available():
"""Determines whether celery beat is available"""
try:
rs = redis.from_url(current_app.config['REDIS_URL'])
rs = create_redis(current_app.config['REDIS_URL'])

# Celery beat feeds scheduled jobs (a la cron) to the respective
# job queues (standard and low priority). As a monitor, a job
Expand Down Expand Up @@ -109,7 +109,7 @@ def redis_available():
# is available. Otherwise we assume
# it's not available
try:
rs = redis.from_url(current_app.config["REDIS_URL"])
rs = create_redis(current_app.config["REDIS_URL"])
rs.ping()
return True, 'Redis is available.'
except Exception as e:
Expand Down
24 changes: 12 additions & 12 deletions tests/test_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ def test_celery_available_fails_when_celery_ping_fails(
results = celery_available()
assert results[0] is False

@patch('portal.views.healthcheck.redis')
@patch('portal.views.healthcheck.create_redis')
def test_celery_beat_available_fails_when_redis_var_none(
self,
redis_mock
create_redis_mock
):
redis_mock.from_url.return_value.get.return_value = None
create_redis_mock.return_value.get.return_value = None
results = celery_beat_available()
assert results[0] is False

@patch('portal.views.healthcheck.redis')
@patch('portal.views.healthcheck.create_redis')
def test_celery_beat_available_succeeds_when_redis_var_set(
self,
redis_mock
create_redis_mock
):
redis_mock.from_url.return_value.get.return_value = \
create_redis_mock.return_value.get.return_value = \
str(datetime.now())
results = celery_beat_available()
assert results[0] is True
Expand All @@ -68,21 +68,21 @@ def test_postgresql_available_fails_when_query_exception(
results = postgresql_available()
assert results[0] is False

@patch('portal.views.healthcheck.redis')
@patch('portal.views.healthcheck.create_redis')
def test_redis_available_succeeds_when_ping_successful(
self,
redis_mock
create_redis_mock
):
redis_mock.from_url.return_value.ping.return_value = True
create_redis_mock.return_value.ping.return_value = True
results = redis_available()
assert results[0] is True

@patch('portal.views.healthcheck.redis')
@patch('portal.views.healthcheck.create_redis')
def test_redis_available_fails_when_ping_throws_exception(
self,
redis_mock
create_redis_mock
):
redis_mock.from_url.return_value.ping.side_effect = \
create_redis_mock.return_value.ping.side_effect = \
redis.ConnectionError()
results = redis_available()
assert results[0] is False
Expand Down

0 comments on commit 834a9ae

Please sign in to comment.