diff --git a/portal/config/config.py b/portal/config/config.py index 0631e4795..d3fecfc07 100644 --- a/portal/config/config.py +++ b/portal/config/config.py @@ -1,8 +1,7 @@ """Configuration""" import os -import redis - +from portal.factories.redis import create_redis from portal.models.role import ROLE SITE_CFG = 'site.cfg' @@ -152,7 +151,7 @@ class BaseConfig(object): REDIS_URL ) - SESSION_REDIS = redis.from_url(SESSION_REDIS_URL) + SESSION_REDIS = create_redis(SESSION_REDIS_URL) UPDATE_PATIENT_TASK_BATCH_SIZE = int( os.environ.get('UPDATE_PATIENT_TASK_BATCH_SIZE', 16) diff --git a/portal/factories/redis.py b/portal/factories/redis.py new file mode 100644 index 000000000..d5debfb57 --- /dev/null +++ b/portal/factories/redis.py @@ -0,0 +1,4 @@ +import redis + +def create_redis(url): + return redis.Redis.from_url(url) diff --git a/portal/models/qb_timeline.py b/portal/models/qb_timeline.py index 25d3b8652..aee9822af 100644 --- a/portal/models/qb_timeline.py +++ b/portal/models/qb_timeline.py @@ -4,7 +4,6 @@ from dateutil.relativedelta import relativedelta from flask import current_app -import redis from redis.exceptions import ConnectionError from sqlalchemy.types import Enum as SQLA_Enum from werkzeug.exceptions import BadRequest @@ -13,6 +12,7 @@ from ..cache import cache, TWO_HOURS from ..database import db from ..date_tools import FHIR_datetime, RelativeDelta +from ..factories.redis import create_redis from ..set_tools import left_center_right from ..timeout_lock import ADHERENCE_DATA_KEY, CacheModeration, TimeoutLock from ..trace import trace @@ -761,12 +761,15 @@ def invalidate_users_QBT(user_id, research_study_id): for ad in adh_data: db.session.delete(ad) - # clear the timeout lock as well, since we need a refresh - # after deletion of the adherence data - cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( - patient_id=user_id, - research_study_id=research_study_id)) - cache_moderation.reset() + if not current_app.config.get("TESTING", False): + # clear the timeout lock as well, since we need a refresh + # after deletion of the adherence data + # otherwise, we experience a deadlock situation where tables can't be dropped + # between test runs, as postgres believes a deadlock condition exists + cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( + patient_id=user_id, + research_study_id=research_study_id)) + cache_moderation.reset() # args have to match order and values - no wild carding avail @@ -1241,8 +1244,7 @@ def __init__(self): # Lookup the configured expiration of the matching cache # container ("DOGPILE_CACHE_REGIONS" -> "assessment_cache_region") if self.redis is None: - self.redis = redis.StrictRedis.from_url( - current_app.config['REDIS_URL']) + self.redis = create_redis(current_app.config['REDIS_URL']) regions = current_app.config['DOGPILE_CACHE_REGIONS'] for region_name, duration in regions: if region_name == self.region_name: diff --git a/portal/tasks.py b/portal/tasks.py index f4ff4eec8..be2317f5b 100644 --- a/portal/tasks.py +++ b/portal/tasks.py @@ -14,13 +14,13 @@ from celery.utils.log import get_task_logger from flask import current_app -import redis from requests import Request, Session from requests.exceptions import RequestException from sqlalchemy import and_ from .database import db from .factories.app import create_app +from .factories.redis import create_redis from .factories.celery import create_celery from .models.communication import Communication from .models.communication_request import queue_outstanding_messages @@ -401,7 +401,7 @@ def token_watchdog(**kwargs): def celery_beat_health_check(**kwargs): """Refreshes self-expiring redis value for /healthcheck of celerybeat""" - rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) return rs.setex( name='last_celery_beat_ping', time=current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'], @@ -414,7 +414,7 @@ def celery_beat_health_check(**kwargs): def celery_beat_health_check_low_priority_queue(**kwargs): """Refreshes self-expiring redis value for /healthcheck of celerybeat""" - rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) return rs.setex( name='last_celery_beat_ping_low_priority_queue', time=10*current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'], diff --git a/portal/timeout_lock.py b/portal/timeout_lock.py index b33782d88..21ab8af39 100644 --- a/portal/timeout_lock.py +++ b/portal/timeout_lock.py @@ -1,8 +1,8 @@ import time from flask import current_app -import redis +from .factories.redis import create_redis class LockTimeout(BaseException): """Exception raised when wait for TimeoutLock exceeds timeout""" @@ -31,8 +31,7 @@ def __init__(self, key, expires=60, timeout=10): self.key = key self.timeout = timeout self.expires = expires - self.redis = redis.StrictRedis.from_url( - current_app.config['REDIS_URL']) + self.redis = create_redis(current_app.config['REDIS_URL']) def __enter__(self): timeout = self.timeout @@ -105,8 +104,7 @@ class CacheModeration(object): def __init__(self, key, timeout=300): self.key = key self.timeout = timeout - self.redis = redis.StrictRedis.from_url( - current_app.config['REDIS_URL']) + self.redis = create_redis(current_app.config['REDIS_URL']) def run_recently(self): """if key has value in redis (i.e. didn't expire) return value""" diff --git a/portal/views/healthcheck.py b/portal/views/healthcheck.py index af96c5db3..be84061a8 100644 --- a/portal/views/healthcheck.py +++ b/portal/views/healthcheck.py @@ -3,12 +3,12 @@ from celery.exceptions import TimeoutError from celery.result import AsyncResult from flask import Blueprint, current_app -import redis from redis.exceptions import ConnectionError from sqlalchemy import text from ..database import db from ..factories.celery import create_celery +from ..factories.redis import create_redis HEALTHCHECK_FAILURE_STATUS_CODE = 200 @@ -23,7 +23,7 @@ def celery_beat_ping(): This allows us to monitor whether celery beat tasks are running """ try: - rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) rs.setex( name='last_celery_beat_ping', time=current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'], @@ -64,7 +64,7 @@ def celery_available(): def celery_beat_available(): """Determines whether celery beat is available""" try: - rs = redis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) # Celery beat feeds scheduled jobs (a la cron) to the respective # job queues (standard and low priority). As a monitor, a job @@ -109,7 +109,7 @@ def redis_available(): # is available. Otherwise we assume # it's not available try: - rs = redis.from_url(current_app.config["REDIS_URL"]) + rs = create_redis(current_app.config["REDIS_URL"]) rs.ping() return True, 'Redis is available.' except Exception as e: diff --git a/tests/test_healthcheck.py b/tests/test_healthcheck.py index 7ace20aa4..66bc368c6 100644 --- a/tests/test_healthcheck.py +++ b/tests/test_healthcheck.py @@ -29,21 +29,21 @@ def test_celery_available_fails_when_celery_ping_fails( results = celery_available() assert results[0] is False - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_celery_beat_available_fails_when_redis_var_none( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.get.return_value = None + create_redis_mock.return_value.get.return_value = None results = celery_beat_available() assert results[0] is False - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_celery_beat_available_succeeds_when_redis_var_set( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.get.return_value = \ + create_redis_mock.return_value.get.return_value = \ str(datetime.now()) results = celery_beat_available() assert results[0] is True @@ -68,21 +68,21 @@ def test_postgresql_available_fails_when_query_exception( results = postgresql_available() assert results[0] is False - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_redis_available_succeeds_when_ping_successful( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.ping.return_value = True + create_redis_mock.return_value.ping.return_value = True results = redis_available() assert results[0] is True - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_redis_available_fails_when_ping_throws_exception( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.ping.side_effect = \ + create_redis_mock.return_value.ping.side_effect = \ redis.ConnectionError() results = redis_available() assert results[0] is False