-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement cache invalidation api #10761
Changes from all commits
49f9ffb
9e90958
2877170
90f2a1d
b87299f
8d31609
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
import logging | ||
|
||
from flask import request, Response | ||
from flask_appbuilder import expose | ||
from flask_appbuilder.api import safe | ||
from flask_appbuilder.models.sqla.interface import SQLAInterface | ||
from flask_appbuilder.security.decorators import protect | ||
from marshmallow.exceptions import ValidationError | ||
from sqlalchemy.exc import SQLAlchemyError | ||
|
||
from superset.cachekeys.schemas import CacheInvalidationRequestSchema | ||
from superset.connectors.connector_registry import ConnectorRegistry | ||
from superset.extensions import cache_manager, db, event_logger | ||
from superset.models.cache import CacheKey | ||
from superset.views.base_api import BaseSupersetModelRestApi, statsd_metrics | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CacheRestApi(BaseSupersetModelRestApi): | ||
datamodel = SQLAInterface(CacheKey) | ||
resource_name = "cachekey" | ||
allow_browser_login = True | ||
class_permission_name = "CacheRestApi" | ||
include_route_methods = { | ||
"invalidate", | ||
} | ||
|
||
openapi_spec_component_schemas = (CacheInvalidationRequestSchema,) | ||
|
||
@expose("/invalidate", methods=["POST"]) | ||
@event_logger.log_this | ||
@protect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this API is not actually protected---it's probably missing a
Is this by design? cc @bkyryliuk |
||
@safe | ||
@statsd_metrics | ||
def invalidate(self) -> Response: | ||
""" | ||
Takes a list of datasources, finds the associated cache records and | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Side note (outside the scope of this PR), we'll be renaming datasource to dataset. I think it's mostly done in the UI layer, but has yet to be done in the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch, I can do it in the separate PR if that works for you, would rename it in API & CacheKey model for consistency. |
||
invalidates them and removes the database records | ||
|
||
--- | ||
post: | ||
description: >- | ||
Takes a list of datasources, finds the associated cache records and | ||
invalidates them and removes the database records | ||
requestBody: | ||
description: >- | ||
A list of datasources uuid or the tuples of database and datasource names | ||
required: true | ||
content: | ||
application/json: | ||
schema: | ||
$ref: "#/components/schemas/CacheInvalidationRequestSchema" | ||
responses: | ||
201: | ||
description: cache was successfully invalidated | ||
400: | ||
$ref: '#/components/responses/400' | ||
500: | ||
$ref: '#/components/responses/500' | ||
""" | ||
try: | ||
datasources = CacheInvalidationRequestSchema().load(request.json) | ||
except KeyError: | ||
return self.response_400(message="Request is incorrect") | ||
except ValidationError as error: | ||
return self.response_400(message=str(error)) | ||
datasource_uids = set(datasources.get("datasource_uids", [])) | ||
for ds in datasources.get("datasources", []): | ||
ds_obj = ConnectorRegistry.get_datasource_by_name( | ||
session=db.session, | ||
datasource_type=ds.get("datasource_type"), | ||
datasource_name=ds.get("datasource_name"), | ||
schema=ds.get("schema"), | ||
database_name=ds.get("database_name"), | ||
) | ||
if ds_obj: | ||
datasource_uids.add(ds_obj.uid) | ||
|
||
cache_key_objs = ( | ||
db.session.query(CacheKey) | ||
.filter(CacheKey.datasource_uid.in_(datasource_uids)) | ||
.all() | ||
) | ||
cache_keys = [c.cache_key for c in cache_key_objs] | ||
if cache_key_objs: | ||
all_keys_deleted = cache_manager.cache.delete_many(*cache_keys) | ||
|
||
if not all_keys_deleted: | ||
# expected behavior as keys may expire and cache is not a | ||
# persistent storage | ||
logger.info( | ||
"Some of the cache keys were not deleted in the list %s", cache_keys | ||
) | ||
|
||
try: | ||
delete_stmt = CacheKey.__table__.delete().where( # pylint: disable=no-member | ||
CacheKey.cache_key.in_(cache_keys) | ||
) | ||
db.session.execute(delete_stmt) | ||
db.session.commit() | ||
except SQLAlchemyError as ex: # pragma: no cover | ||
logger.error(ex) | ||
db.session.rollback() | ||
return self.response_500(str(ex)) | ||
db.session.commit() | ||
return self.response(201) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
# RISON/JSON schemas for query parameters | ||
from marshmallow import fields, Schema, validate | ||
|
||
from superset.charts.schemas import ( | ||
datasource_name_description, | ||
datasource_type_description, | ||
datasource_uid_description, | ||
) | ||
|
||
|
||
class Datasource(Schema): | ||
database_name = fields.String(description="Datasource name",) | ||
datasource_name = fields.String(description=datasource_name_description,) | ||
schema = fields.String(description="Datasource schema",) | ||
datasource_type = fields.String( | ||
description=datasource_type_description, | ||
validate=validate.OneOf(choices=("druid", "table", "view")), | ||
required=True, | ||
) | ||
|
||
|
||
class CacheInvalidationRequestSchema(Schema): | ||
datasource_uids = fields.List( | ||
fields.String(), description=datasource_uid_description, | ||
) | ||
datasources = fields.List( | ||
fields.Nested(Datasource), | ||
description="A list of the data source and database names", | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
from unittest.mock import Mock, patch | ||
|
||
import pandas as pd | ||
import pytest | ||
from flask import Response | ||
from flask_appbuilder.security.sqla import models as ab_models | ||
from flask_testing import TestCase | ||
|
@@ -42,6 +43,7 @@ | |
from superset.views.base_api import BaseSupersetModelRestApi | ||
|
||
FAKE_DB_NAME = "fake_db_100" | ||
test_client = app.test_client() | ||
|
||
|
||
def login(client: Any, username: str = "admin", password: str = "general"): | ||
|
@@ -69,6 +71,39 @@ def get_resp( | |
return resp.data.decode("utf-8") | ||
|
||
|
||
def post_assert_metric( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the same but on a different place? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes same code, moved it into this function for reuse in the pytests |
||
client: Any, uri: str, data: Dict[str, Any], func_name: str | ||
) -> Response: | ||
""" | ||
Simple client post with an extra assertion for statsd metrics | ||
|
||
:param client: test client for superset api requests | ||
:param uri: The URI to use for the HTTP POST | ||
:param data: The JSON data payload to be posted | ||
:param func_name: The function name that the HTTP POST triggers | ||
for the statsd metric assertion | ||
:return: HTTP Response | ||
""" | ||
with patch.object( | ||
BaseSupersetModelRestApi, "incr_stats", return_value=None | ||
) as mock_method: | ||
rv = client.post(uri, json=data) | ||
if 200 <= rv.status_code < 400: | ||
mock_method.assert_called_once_with("success", func_name) | ||
else: | ||
mock_method.assert_called_once_with("error", func_name) | ||
return rv | ||
|
||
|
||
@pytest.fixture | ||
def logged_in_admin(): | ||
"""Fixture with app context and logged in admin user.""" | ||
with app.app_context(): | ||
login(test_client, username="admin") | ||
yield | ||
test_client.get("/logout/", follow_redirects=True) | ||
|
||
|
||
class SupersetTestCase(TestCase): | ||
|
||
default_schema_backend_map = { | ||
|
@@ -84,6 +119,15 @@ class SupersetTestCase(TestCase): | |
def create_app(self): | ||
return app | ||
|
||
@staticmethod | ||
def get_birth_names_dataset(): | ||
example_db = get_example_database() | ||
return ( | ||
db.session.query(SqlaTable) | ||
.filter_by(database=example_db, table_name="birth_names") | ||
.one() | ||
) | ||
|
||
@staticmethod | ||
def create_user_with_roles(username: str, roles: List[str]): | ||
user_to_create = security_manager.find_user(username) | ||
|
@@ -422,24 +466,7 @@ def delete_assert_metric(self, uri: str, func_name: str) -> Response: | |
def post_assert_metric( | ||
self, uri: str, data: Dict[str, Any], func_name: str | ||
) -> Response: | ||
""" | ||
Simple client post with an extra assertion for statsd metrics | ||
|
||
:param uri: The URI to use for the HTTP POST | ||
:param data: The JSON data payload to be posted | ||
:param func_name: The function name that the HTTP POST triggers | ||
for the statsd metric assertion | ||
:return: HTTP Response | ||
""" | ||
with patch.object( | ||
BaseSupersetModelRestApi, "incr_stats", return_value=None | ||
) as mock_method: | ||
rv = self.client.post(uri, json=data) | ||
if 200 <= rv.status_code < 400: | ||
mock_method.assert_called_once_with("success", func_name) | ||
else: | ||
mock_method.assert_called_once_with("error", func_name) | ||
return rv | ||
return post_assert_metric(self.client, uri, data, func_name) | ||
|
||
def put_assert_metric( | ||
self, uri: str, data: Dict[str, Any], func_name: str | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the route
/api/v1/cachekey/invalidate
is not ideal. I'm struggling to find something good thoughThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, I had the same struggles, definitely open to suggestions.
/api/v1/cache/invalidate could be slightly better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
DELETE /api/v1/cachekey/<cachekey>
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be too much struggle to fetch cache keys though the API and then issue deletes for those.
I think we could expose GET / DELETE later once those use cases will arise