Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
villebro committed Mar 19, 2022
1 parent 90a659c commit 58fd5ba
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/docs/installation/cache.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ For example, to use the built-in cache to store chart data, use the following co

```python
DATA_CACHE_CONFIG = {
"CACHE_TYPE": "SupersetCache",
"CACHE_TYPE": "SupersetMetastoreCache",
"CACHE_KEY_PREFIX": "superset_results", # make sure this string is unique to avoid collisions
"CACHE_DEFAULT_TIMEOUT": 86400, # 60 seconds * 60 minutes * 24 hours
}
Expand Down
3 changes: 2 additions & 1 deletion superset/extensions.py → superset/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import json
import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

import celery
Expand Down Expand Up @@ -108,7 +109,7 @@ def init_app(self, app: Flask) -> None:
app.wsgi_app = SupersetProfiler(app.wsgi_app, self.interval) # type: ignore


APP_DIR = os.path.dirname(__file__)
APP_DIR = os.path.join(os.path.dirname(__file__), os.path.pardir)
appbuilder = AppBuilder(update_perms=False)
async_query_manager = AsyncQueryManager()
cache_manager = CacheManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import KeyType

RESOURCE = "superset_cache"
RESOURCE = "superset_metastore_cache"
KEY_TYPE: KeyType = "uuid"


class SupersetCache(BaseCache):
class SupersetMetastoreCache(BaseCache):
def __init__(self, namespace: UUID, default_timeout: int = 300) -> None:
super().__init__(default_timeout)
self.namespace = namespace
Expand Down Expand Up @@ -58,17 +58,24 @@ def _prune() -> None:

DeleteExpiredKeyValueCommand(resource=RESOURCE).run()

def get_expiry(self, timeout: Optional[int]) -> datetime:
return datetime.now() + timedelta(seconds=timeout or self.default_timeout)
def _get_expiry(self, timeout: Optional[int]) -> Optional[datetime]:
timeout = self._normalize_timeout(timeout)
if timeout is not None and timeout > 0:
return datetime.now() + timedelta(seconds=timeout)
return None

def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete import DeleteKeyValueCommand

DeleteKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
from superset.key_value.commands.upsert import UpsertKeyValueCommand

UpsertKeyValueCommand(
resource=RESOURCE,
key_type=KEY_TYPE,
key=self.get_key(key),
value=value,
expires_on=self._get_expiry(timeout),
).run()
return self.add(key, value, timeout)
return True

def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
Expand All @@ -80,7 +87,7 @@ def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
value=value,
key_type=KEY_TYPE,
key=self.get_key(key),
expires_on=self.get_expiry(timeout),
expires_on=self._get_expiry(timeout),
).run()
self._prune()
return True
Expand Down
1 change: 1 addition & 0 deletions superset/key_value/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
:param resource: the resource (dashboard, chart etc)
:param value: the value to persist in the key-value store
:param key_type: the type of the key to return
:param actor: the user performing the command
:param key: id of entry (autogenerated if undefined)
:param expires_on: entry expiration time
:return: the key associated with the persisted value
Expand Down
9 changes: 6 additions & 3 deletions superset/key_value/commands/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


class UpdateKeyValueCommand(BaseCommand):
actor: User
actor: Optional[User]
resource: str
value: Any
key: str
Expand All @@ -43,10 +43,10 @@ class UpdateKeyValueCommand(BaseCommand):

def __init__(
self,
actor: User,
resource: str,
key: str,
value: Any,
actor: Optional[User] = None,
key_type: KeyType = "uuid",
expires_on: Optional[datetime] = None,
):
Expand All @@ -56,6 +56,7 @@ def __init__(
:param resource: the resource (dashboard, chart etc)
:param key: the key to update
:param value: the value to persist in the key-value store
:param actor: the user performing the command
:param key_type: the type of the key to update
:param expires_on: entry expiration time
:return: the key associated with the updated value
Expand Down Expand Up @@ -90,7 +91,9 @@ def update(self) -> Optional[str]:
entry.value = pickle.dumps(self.value)
entry.expires_on = self.expires_on
entry.changed_on = datetime.now()
entry.changed_by_fk = None if self.actor.is_anonymous else self.actor.id
entry.changed_by_fk = (
None if self.actor is None or self.actor.is_anonymous else self.actor.id
)
db.session.merge(entry)
db.session.commit()
return extract_key(entry, self.key_type)
Expand Down
109 changes: 109 additions & 0 deletions superset/key_value/commands/upsert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import pickle
from datetime import datetime
from typing import Any, Optional

from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.base import BaseCommand
from superset.key_value.commands.create import CreateKeyValueCommand
from superset.key_value.exceptions import KeyValueUpdateFailedError
from superset.key_value.models import KeyValueEntry
from superset.key_value.types import KeyType
from superset.key_value.utils import extract_key, get_filter

logger = logging.getLogger(__name__)


class UpsertKeyValueCommand(BaseCommand):
actor: Optional[User]
resource: str
value: Any
key: str
key_type: KeyType
expires_on: Optional[datetime]

def __init__(
self,
resource: str,
key: str,
value: Any,
actor: Optional[User] = None,
key_type: KeyType = "uuid",
expires_on: Optional[datetime] = None,
):
"""
Upsert a key value entry
:param resource: the resource (dashboard, chart etc)
:param key: the key to update
:param value: the value to persist in the key-value store
:param key_type: the type of the key to update
:param actor: the user performing the command
:param expires_on: entry expiration time
:return: the key associated with the updated value
"""
self.actor = actor
self.resource = resource
self.key = key
self.value = value
self.key_type = key_type
self.expires_on = expires_on

def run(self) -> Optional[str]:
try:
return self.upsert()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running update command")
raise KeyValueUpdateFailedError() from ex

def validate(self) -> None:
pass

def upsert(self) -> Optional[str]:
filter_ = get_filter(self.resource, self.key, self.key_type)
entry: KeyValueEntry = (
db.session.query(KeyValueEntry)
.filter_by(**filter_)
.autoflush(False)
.first()
)
if entry:
entry.value = pickle.dumps(self.value)
entry.expires_on = self.expires_on
entry.changed_on = datetime.now()
entry.changed_by_fk = (
None if self.actor is None or self.actor.is_anonymous else self.actor.id
)
db.session.merge(entry)
db.session.commit()
return extract_key(entry, self.key_type)
else:
return CreateKeyValueCommand(
resource=self.resource,
value=self.value,
key_type=self.key_type,
actor=self.actor,
key=self.key,
expires_on=self.expires_on,
).run()
4 changes: 2 additions & 2 deletions superset/utils/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _init_cache(
) -> None:
cache_config = app.config[cache_config_key]
cache_type = cache_config.get("CACHE_TYPE")
if required and cache_type in (None, "SupersetCache"):
if required and cache_type in (None, "SupersetMetastoreCache"):
if cache_type is None:
logger.warning(
"Falling back to the built-in cache, that stores data in the "
Expand All @@ -50,7 +50,7 @@ def _init_cache(
cache_key_prefix = cache_config.get("CACHE_KEY_PREFIX", cache_config_key)
cache_config.update(
{
"CACHE_TYPE": "superset.key_value.cache.SupersetCache",
"CACHE_TYPE": "superset.extensions.metastore_cache.SupersetMetastoreCache",
"CACHE_KEY_PREFIX": cache_key_prefix,
}
)
Expand Down
16 changes: 16 additions & 0 deletions tests/integration_tests/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from freezegun import freeze_time

if TYPE_CHECKING:
from superset.key_value.cache import SupersetCache
from superset.extensions.metastore_cache import SupersetMetastoreCache

FIRST_KEY = "foo"
FIRST_KEY_INITIAL_VALUE = {"foo": "bar"}
Expand All @@ -36,15 +36,15 @@


@pytest.fixture
def cache() -> SupersetCache:
from superset.key_value.cache import SupersetCache
def cache() -> SupersetMetastoreCache:
from superset.extensions.metastore_cache import SupersetMetastoreCache

return SupersetCache(
return SupersetMetastoreCache(
namespace=UUID("ee173d1b-ccf3-40aa-941c-985c15224496"), default_timeout=600,
)


def test_caching_flow(app_context: AppContext, cache: SupersetCache) -> None:
def test_caching_flow(app_context: AppContext, cache: SupersetMetastoreCache) -> None:
assert cache.has(FIRST_KEY) is False
assert cache.add(FIRST_KEY, FIRST_KEY_INITIAL_VALUE) is True
assert cache.has(FIRST_KEY) is True
Expand All @@ -62,7 +62,7 @@ def test_caching_flow(app_context: AppContext, cache: SupersetCache) -> None:
assert cache.get(SECOND_KEY) == SECOND_VALUE


def test_expiry(app_context: AppContext, cache: SupersetCache) -> None:
def test_expiry(app_context: AppContext, cache: SupersetMetastoreCache) -> None:
delta = timedelta(days=90)
dttm = datetime(2022, 3, 18, 0, 0, 0)
with freeze_time(dttm):
Expand Down
Loading

0 comments on commit 58fd5ba

Please sign in to comment.