Skip to content

Commit

Permalink
feat(key-value): add superset metastore cache (apache#19232)
Browse files Browse the repository at this point in the history
  • Loading branch information
villebro authored and michael_hoffman committed Mar 23, 2022
1 parent daba497 commit d603128
Show file tree
Hide file tree
Showing 17 changed files with 540 additions and 45 deletions.
17 changes: 12 additions & 5 deletions docs/docs/installation/cache.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version: 1

## Caching

Superset uses [Flask-Caching](https://flask-caching.readthedocs.io/) for caching purpose. Configuring caching is as easy as providing a custom cache config in your
Superset uses [Flask-Caching](https://flask-caching.readthedocs.io/) for caching purposes. Configuring caching is as easy as providing a custom cache config in your
`superset_config.py` that complies with [the Flask-Caching specifications](https://flask-caching.readthedocs.io/en/latest/#configuring-flask-caching).
Flask-Caching supports various caching backends, including Redis, Memcached, SimpleCache (in-memory), or the
local filesystem. Custom cache backends are also supported. See [here](https://flask-caching.readthedocs.io/en/latest/#custom-cache-backends) for specifics.
Expand All @@ -18,10 +18,17 @@ The following cache configurations can be customized:
- Dashboard filter state (required): `FILTER_STATE_CACHE_CONFIG`.
- Explore chart form data (required): `EXPLORE_FORM_DATA_CACHE_CONFIG`

Please note, that Dashboard and Explore caching is required. When running Superset in debug mode, both Explore and Dashboard caches will default to `SimpleCache`;
However, trying to run Superset in non-debug mode without defining a cache for these will cause the application to fail on startup. When running
superset in single-worker mode, any cache backend is supported. However, when running Superset in on a multi-worker setup, a dedicated cache is required. For this
we recommend using either Redis or Memcached:
Please note, that Dashboard and Explore caching is required. If these caches are undefined, Superset falls back to using a built-in cache that stores data
in the metadata database. While it is recommended to use a dedicated cache, the built-in cache can also be used to cache other data.
For example, to use the built-in cache to store chart data, use the following config:

```python
DATA_CACHE_CONFIG = {
"CACHE_TYPE": "SupersetMetastoreCache",
"CACHE_KEY_PREFIX": "superset_results", # make sure this string is unique to avoid collisions
"CACHE_DEFAULT_TIMEOUT": 86400, # 60 seconds * 60 minutes * 24 hours
}
```

- Redis (recommended): we recommend the [redis](https://pypi.python.org/pypi/redis) Python package
- Memcached: we recommend using [pylibmc](https://pypi.org/project/pylibmc/) client library as
Expand Down
5 changes: 4 additions & 1 deletion superset/dashboards/permalink/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def run(self) -> str:
"state": self.state,
}
return CreateKeyValueCommand(
self.actor, self.resource, value, self.key_type
actor=self.actor,
resource=self.resource,
value=value,
key_type=self.key_type,
).run()
except SQLAlchemyError as ex:
logger.exception("Error running create command")
Expand Down
2 changes: 1 addition & 1 deletion superset/dashboards/permalink/commands/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run(self) -> Optional[DashboardPermalinkValue]:
self.validate()
try:
command = GetKeyValueCommand(
self.resource, self.key, key_type=self.key_type
resource=self.resource, key=self.key, key_type=self.key_type
)
value: Optional[DashboardPermalinkValue] = command.run()
if value:
Expand Down
5 changes: 4 additions & 1 deletion superset/explore/permalink/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def run(self) -> str:
"state": self.state,
}
command = CreateKeyValueCommand(
self.actor, self.resource, value, self.key_type
actor=self.actor,
resource=self.resource,
value=value,
key_type=self.key_type,
)
return command.run()
except SQLAlchemyError as ex:
Expand Down
2 changes: 1 addition & 1 deletion superset/explore/permalink/commands/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run(self) -> Optional[ExplorePermalinkValue]:
self.validate()
try:
value: Optional[ExplorePermalinkValue] = GetKeyValueCommand(
self.resource, self.key, key_type=self.key_type
resource=self.resource, key=self.key, key_type=self.key_type
).run()
if value:
chart_id: Optional[int] = value.get("chartId")
Expand Down
3 changes: 2 additions & 1 deletion superset/extensions.py → superset/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import json
import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

import celery
Expand Down Expand Up @@ -108,7 +109,7 @@ def init_app(self, app: Flask) -> None:
app.wsgi_app = SupersetProfiler(app.wsgi_app, self.interval) # type: ignore


APP_DIR = os.path.dirname(__file__)
APP_DIR = os.path.join(os.path.dirname(__file__), os.path.pardir)
appbuilder = AppBuilder(update_perms=False)
async_query_manager = AsyncQueryManager()
cache_manager = CacheManager()
Expand Down
117 changes: 117 additions & 0 deletions superset/extensions/metastore_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta
from hashlib import md5
from typing import Any, Dict, List, Optional
from uuid import UUID, uuid3

from flask import Flask
from flask_caching import BaseCache

from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import KeyType

RESOURCE = "superset_metastore_cache"
KEY_TYPE: KeyType = "uuid"


class SupersetMetastoreCache(BaseCache):
def __init__(self, namespace: UUID, default_timeout: int = 300) -> None:
super().__init__(default_timeout)
self.namespace = namespace

@classmethod
def factory(
cls, app: Flask, config: Dict[str, Any], args: List[Any], kwargs: Dict[str, Any]
) -> BaseCache:
# base namespace for generating deterministic UUIDs
md5_obj = md5()
seed = config.get("CACHE_KEY_PREFIX", "")
md5_obj.update(seed.encode("utf-8"))
kwargs["namespace"] = UUID(md5_obj.hexdigest())
return cls(*args, **kwargs)

def get_key(self, key: str) -> str:
return str(uuid3(self.namespace, key))

@staticmethod
def _prune() -> None:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete_expired import (
DeleteExpiredKeyValueCommand,
)

DeleteExpiredKeyValueCommand(resource=RESOURCE).run()

def _get_expiry(self, timeout: Optional[int]) -> Optional[datetime]:
timeout = self._normalize_timeout(timeout)
if timeout is not None and timeout > 0:
return datetime.now() + timedelta(seconds=timeout)
return None

def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.upsert import UpsertKeyValueCommand

UpsertKeyValueCommand(
resource=RESOURCE,
key_type=KEY_TYPE,
key=self.get_key(key),
value=value,
expires_on=self._get_expiry(timeout),
).run()
return True

def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.create import CreateKeyValueCommand

try:
CreateKeyValueCommand(
resource=RESOURCE,
value=value,
key_type=KEY_TYPE,
key=self.get_key(key),
expires_on=self._get_expiry(timeout),
).run()
self._prune()
return True
except KeyValueCreateFailedError:
return False

def get(self, key: str) -> Any:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.get import GetKeyValueCommand

return GetKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
).run()

def has(self, key: str) -> bool:
entry = self.get(key)
if entry:
return True
return False

def delete(self, key: str) -> Any:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete import DeleteKeyValueCommand

return DeleteKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
).run()
25 changes: 21 additions & 4 deletions superset/key_value/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pickle
from datetime import datetime
from typing import Any, Optional
from uuid import UUID

from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError
Expand All @@ -33,18 +34,20 @@


class CreateKeyValueCommand(BaseCommand):
actor: User
actor: Optional[User]
resource: str
value: Any
key_type: KeyType
key: Optional[str]
expires_on: Optional[datetime]

def __init__(
self,
actor: User,
resource: str,
value: Any,
key_type: KeyType,
key_type: KeyType = "uuid",
actor: Optional[User] = None,
key: Optional[str] = None,
expires_on: Optional[datetime] = None,
):
"""
Expand All @@ -53,19 +56,23 @@ def __init__(
:param resource: the resource (dashboard, chart etc)
:param value: the value to persist in the key-value store
:param key_type: the type of the key to return
:param actor: the user performing the command
:param key: id of entry (autogenerated if undefined)
:param expires_on: entry expiration time
:return: the key associated with the persisted value
"""
self.resource = resource
self.actor = actor
self.value = value
self.key_type = key_type
self.key = key
self.expires_on = expires_on

def run(self) -> str:
try:
return self.create()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running create command")
raise KeyValueCreateFailedError() from ex

Expand All @@ -77,9 +84,19 @@ def create(self) -> str:
resource=self.resource,
value=pickle.dumps(self.value),
created_on=datetime.now(),
created_by_fk=None if self.actor.is_anonymous else self.actor.id,
created_by_fk=None
if self.actor is None or self.actor.is_anonymous
else self.actor.id,
expires_on=self.expires_on,
)
if self.key is not None:
try:
if self.key_type == "uuid":
entry.uuid = UUID(self.key)
else:
entry.id = int(self.key)
except ValueError as ex:
raise KeyValueCreateFailedError() from ex
db.session.add(entry)
db.session.commit()
return extract_key(entry, self.key_type)
6 changes: 3 additions & 3 deletions superset/key_value/commands/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Optional

from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError
Expand All @@ -30,13 +31,12 @@


class DeleteKeyValueCommand(BaseCommand):
actor: User
key: str
key_type: KeyType
resource: str

def __init__(
self, actor: User, resource: str, key: str, key_type: KeyType = "uuid"
self, resource: str, key: str, key_type: KeyType = "uuid",
):
"""
Delete a key-value pair
Expand All @@ -47,14 +47,14 @@ def __init__(
:return: was the entry deleted or not
"""
self.resource = resource
self.actor = actor
self.key = key
self.key_type = key_type

def run(self) -> bool:
try:
return self.delete()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running delete command")
raise KeyValueDeleteFailedError() from ex

Expand Down
60 changes: 60 additions & 0 deletions superset/key_value/commands/delete_expired.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.base import BaseCommand
from superset.key_value.exceptions import KeyValueDeleteFailedError
from superset.key_value.models import KeyValueEntry

logger = logging.getLogger(__name__)


class DeleteExpiredKeyValueCommand(BaseCommand):
resource: str

def __init__(self, resource: str):
"""
Delete all expired key-value pairs
:param resource: the resource (dashboard, chart etc)
:return: was the entry deleted or not
"""
self.resource = resource

def run(self) -> None:
try:
self.delete_expired()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running delete command")
raise KeyValueDeleteFailedError() from ex

def validate(self) -> None:
pass

@staticmethod
def delete_expired() -> None:
(
db.session.query(KeyValueEntry)
.filter(KeyValueEntry.expires_on <= datetime.now())
.delete()
)
db.session.commit()
Loading

0 comments on commit d603128

Please sign in to comment.