Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(SIP-39): Async query support for charts #11499

Merged
merged 50 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
67f7d5f
Generate JWT in Flask app
robdiciuccio Oct 21, 2020
1e8c039
Refactor chart data API query logic, add JWT validation and async worker
robdiciuccio Oct 22, 2020
a50750b
Add redis stream implementation, refactoring
robdiciuccio Oct 27, 2020
64fbfae
Add chart data cache endpoint, refactor QueryContext caching
robdiciuccio Oct 30, 2020
219ce77
Merge branch master
robdiciuccio Nov 10, 2020
e377b31
Typing, linting, refactoring
robdiciuccio Nov 10, 2020
f7ac5b6
pytest fixes and openapi schema update
robdiciuccio Nov 11, 2020
3a16283
Merge branch master
robdiciuccio Nov 18, 2020
6805180
Enforce caching be configured for async query init
robdiciuccio Nov 19, 2020
d5eef4f
Async query processing for explore_json endpoint
robdiciuccio Nov 19, 2020
467c6bb
Add /api/v1/async_event endpoint
robdiciuccio Nov 20, 2020
f867cd5
Async frontend for dashboards [WIP]
robdiciuccio Nov 24, 2020
8111ea6
Chart async error message support, refactoring
robdiciuccio Nov 30, 2020
3b41f16
Abstract asyncEvent middleware
robdiciuccio Nov 30, 2020
49b6b52
Async chart loading for Explore
robdiciuccio Nov 30, 2020
0e5c09e
Merge branch master
robdiciuccio Nov 30, 2020
e2dc30e
Pylint fixes
robdiciuccio Nov 30, 2020
276c84e
asyncEvent middleware -> TypeScript, JS linting
robdiciuccio Dec 1, 2020
4566c01
Chart data API: enforce forced_cache, add tests
robdiciuccio Dec 2, 2020
9eba0c4
Add tests for explore_json endpoints
robdiciuccio Dec 3, 2020
d2e4529
Add test for chart data cache enpoint (no login)
robdiciuccio Dec 3, 2020
0bd7d67
Consolidate set_and_log_cache and add STORE_CACHE_KEYS_IN_METADATA_DB…
robdiciuccio Dec 3, 2020
4441459
Add tests for tasks/async_queries and address PR comments
robdiciuccio Dec 3, 2020
5896799
Bypass non-JSON result formats for async queries
robdiciuccio Dec 3, 2020
5999bf3
Add tests for redux middleware
robdiciuccio Dec 4, 2020
17215a3
Merge branch master
robdiciuccio Dec 4, 2020
27e4548
Remove debug statement
robdiciuccio Dec 4, 2020
119cae7
Skip force_cached if no queryObj
robdiciuccio Dec 4, 2020
9666408
SunburstViz: don't modify self.form_data
robdiciuccio Dec 4, 2020
d2ef464
Merge branch 'rd/async-queries-mvp' of github.com:preset-io/incubator…
robdiciuccio Dec 4, 2020
a8607c0
Merge branch master
robdiciuccio Dec 7, 2020
e40eb45
Fix failing annotation test
robdiciuccio Dec 7, 2020
f491789
Resolve merge/lint issues
robdiciuccio Dec 7, 2020
f01740e
Reduce polling delay
robdiciuccio Dec 8, 2020
9e02017
Merge branch 'master' into rd/async-queries-mvp
robdiciuccio Dec 8, 2020
838c526
Fix new getClientErrorObject reference
robdiciuccio Dec 8, 2020
f0de265
Fix flakey unit tests
robdiciuccio Dec 8, 2020
066504f
/api/v1/async_event: increment redis stream ID, add tests
robdiciuccio Dec 9, 2020
d6c8a1d
PR feedback: refactoring, configuration
robdiciuccio Dec 9, 2020
fc5753a
Merge branch master
robdiciuccio Dec 9, 2020
088a49c
Fixup: remove debugging
robdiciuccio Dec 9, 2020
c9b871e
Fix typescript errors due to redux upgrade
robdiciuccio Dec 10, 2020
89925a5
Update UPDATING.md
robdiciuccio Dec 10, 2020
0ad7234
Fix failing py tests
robdiciuccio Dec 10, 2020
c72b4c6
asyncEvent_spec.js -> asyncEvent_spec.ts
robdiciuccio Dec 10, 2020
1fb7489
Refactor flakey Python 3.7 mock assertions
robdiciuccio Dec 10, 2020
024da76
Fix another shared state issue in Py tests
robdiciuccio Dec 10, 2020
887754b
Use 'sub' claim in JWT for user_id
robdiciuccio Dec 11, 2020
601ec51
Refactor async middleware config
robdiciuccio Dec 11, 2020
df673cf
Fixup: restore FeatureFlag boolean type
robdiciuccio Dec 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ combine_as_imports = true
include_trailing_comma = true
line_length = 88
known_first_party = superset
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,humanize,isodate,jinja2,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,polyline,prison,pyarrow,pyhive,pytest,pytz,retry,selenium,setuptools,simplejson,slack,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,polyline,prison,pyarrow,pyhive,pytest,pytz,redis,retry,selenium,setuptools,simplejson,slack,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
multi_line_output = 3
order_by_type = false

Expand Down
7 changes: 7 additions & 0 deletions superset/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
_event_logger,
APP_DIR,
appbuilder,
async_query_manager,
cache_manager,
celery_app,
csrf,
Expand Down Expand Up @@ -485,6 +486,7 @@ def init_app_in_ctx(self) -> None:
self.configure_url_map_converters()
self.configure_data_sources()
self.configure_auth_provider()
self.configure_async_queries()

# Hook that provides administrators a handle on the Flask APP
# after initialization
Expand Down Expand Up @@ -639,6 +641,11 @@ def configure_wtf(self) -> None:
for ex in csrf_exempt_list:
csrf.exempt(ex)

def configure_async_queries(self) -> None:
if feature_flag_manager.is_feature_enabled("GLOBAL_ASYNC_QUERIES"):
logger.info("*************** Init async queries")
async_query_manager.init_app(self.flask_app)

def register_blueprints(self) -> None:
for bp in self.config["BLUEPRINTS"]:
try:
Expand Down
136 changes: 118 additions & 18 deletions superset/charts/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
from superset import is_feature_enabled, thumbnail_cache
from superset.charts.commands.bulk_delete import BulkDeleteChartCommand
from superset.charts.commands.create import CreateChartCommand
from superset.charts.commands.data import ChartDataCommand
from superset.charts.commands.delete import DeleteChartCommand
from superset.charts.commands.exceptions import (
ChartBulkDeleteFailedError,
ChartCreateFailedError,
ChartDataCacheLoadError,
ChartDataQueryFailedError,
ChartDataValidationError,
ChartDeleteFailedError,
ChartForbiddenError,
ChartInvalidError,
Expand All @@ -48,7 +52,6 @@
from superset.charts.filters import ChartAllTextFilter, ChartFavoriteFilter, ChartFilter
from superset.charts.schemas import (
CHART_SCHEMAS,
ChartDataQueryContextSchema,
ChartPostSchema,
ChartPutSchema,
get_delete_ids_schema,
Expand All @@ -59,9 +62,10 @@
)
from superset.constants import RouteMethod
from superset.exceptions import SupersetSecurityException
from superset.extensions import event_logger
from superset.extensions import async_query_manager, event_logger
from superset.models.slice import Slice
from superset.tasks.thumbnails import cache_chart_thumbnail
from superset.utils.async_query_manager import AsyncQueryTokenException
from superset.utils.core import ChartDataResultFormat, json_int_dttm_ser
from superset.utils.screenshots import ChartScreenshot
from superset.utils.urls import get_url_path
Expand All @@ -87,6 +91,7 @@ class ChartRestApi(BaseSupersetModelRestApi):
RouteMethod.RELATED,
"bulk_delete", # not using RouteMethod since locally defined
"data",
"data_from_cache",
"viz_types",
}
class_permission_name = "SliceModelView"
Expand Down Expand Up @@ -463,8 +468,12 @@ def data(self) -> Response:
application/json:
schema:
$ref: "#/components/schemas/ChartDataResponseSchema"
202:
$ref: '#/components/responses/202'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
500:
$ref: '#/components/responses/500'
"""
Expand All @@ -475,41 +484,132 @@ def data(self) -> Response:
json_body = json.loads(request.form["form_data"])
else:
return self.response_400(message="Request is not JSON")

try:
query_context = ChartDataQueryContextSchema().load(json_body)
except KeyError:
return self.response_400(message="Request is incorrect")
except ValidationError as error:
return self.response_400(
message=_("Request is incorrect: %(error)s", error=error.messages)
command = ChartDataCommand()
command.validate(json_body)
except ChartDataValidationError as exc:
return self.response_400(message=exc.message)
except SupersetSecurityException:
return self.response_401()

if is_feature_enabled("GLOBAL_ASYNC_QUERIES"):
try:
command.validate_request(request)
except AsyncQueryTokenException:
return self.response_401()

result = command.run_async()
return self.response(202, **result)

# TODO: DRY
try:
result = command.run()
except ChartDataQueryFailedError as exc:
return self.response_400(message=exc.message)

result_format = result["query_context"].result_format
response = self.response_400(
message=f"Unsupported result_format: {result_format}"
)

if result_format == ChartDataResultFormat.CSV:
# return the first result
data = result["queries"][0]["data"]
response = CsvResponse(
data,
status=200,
headers=generate_download_headers("csv"),
mimetype="application/csv",
)

if result_format == ChartDataResultFormat.JSON:
response_data = simplejson.dumps(
{"result": result["queries"]},
default=json_int_dttm_ser,
ignore_nan=True,
)
resp = make_response(response_data, 200)
resp.headers["Content-Type"] = "application/json; charset=utf-8"
response = resp

return response

@expose("/data/<cache_key>", methods=["GET"])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New endpoint to fetch cached data based on a cache key passed to the FE via the async event payload.

@event_logger.log_this
@protect()
@safe
@statsd_metrics
def data_from_cache(self, cache_key: str) -> Response:
"""
Takes a query context cache key returns payload
data response for the given query.
---
get:
description: >-
Takes a query context constructed in the client and returns payload data
response for the given query.
parameters:
- in: path
schema:
type: string
name: cache_key
responses:
200:
description: Query result
content:
application/json:
schema:
$ref: "#/components/schemas/ChartDataResponseSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
command = ChartDataCommand()
try:
query_context.raise_for_access()
except SupersetSecurityException:
cached_data = command.load_query_context_from_cache(cache_key)
command.validate(cached_data)
except ChartDataCacheLoadError:
return self.response_404()
except ChartDataValidationError as exc:
return self.response_400(message=exc.message)
except SupersetSecurityException as exc:
logger.info(exc)
return self.response_401()
payload = query_context.get_payload()
for query in payload:
if query.get("error"):
return self.response_400(message=f"Error: {query['error']}")
result_format = query_context.result_format

# TODO: DRY
try:
result = command.run()
except ChartDataCacheLoadError as exc:
return self.response_400(message=exc.message)
except ChartDataQueryFailedError as exc:
return self.response_400(message=exc.message)

result_format = result["query_context"].result_format
response = self.response_400(
message=f"Unsupported result_format: {result_format}"
)

if result_format == ChartDataResultFormat.CSV:
# return the first result
result = payload[0]["data"]
data = result["queries"][0]["data"]
response = CsvResponse(
result,
data,
status=200,
headers=generate_download_headers("csv"),
mimetype="application/csv",
)

if result_format == ChartDataResultFormat.JSON:
response_data = simplejson.dumps(
{"result": payload}, default=json_int_dttm_ser, ignore_nan=True
{"result": result["queries"]},
default=json_int_dttm_ser,
ignore_nan=True,
)
resp = make_response(response_data, 200)
resp.headers["Content-Type"] = "application/json; charset=utf-8"
Expand Down
100 changes: 100 additions & 0 deletions superset/charts/commands/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Any, Dict, Optional

from marshmallow import ValidationError

from superset import cache
from superset.charts.commands.exceptions import (
ChartDataCacheLoadError,
ChartDataQueryFailedError,
ChartDataValidationError,
)
from superset.charts.schemas import ChartDataQueryContextSchema
from superset.commands.base import BaseCommand
from superset.common.query_context import QueryContext
from superset.exceptions import CacheLoadError
from superset.extensions import async_query_manager
from superset.tasks.async_queries import load_chart_data_into_cache

logger = logging.getLogger(__name__)


class ChartDataCommand(BaseCommand):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primary command module for running chart queries (sync or async) via new chart/data API.

def __init__(self):
self._form_data = None
self._query_context: Optional[QueryContext] = None
self._async_channel_id = None

def run(self, **kwargs):
# caching is handled in query_context.get_df_payload (also evals `force` property)
cache_query_context = kwargs["cache"] if "cache" in kwargs else False
force_cached = kwargs["force_cached"] if "force_cached" in kwargs else False
try:
payload = self._query_context.get_payload(
cache_query_context=cache_query_context, force_cached=force_cached
)
except CacheLoadError:
raise ChartDataCacheLoadError()

for query in payload["queries"]:
if query.get("error"):
raise ChartDataQueryFailedError(f"Error: {query['error']}")

return_value = {
"query_context": self._query_context,
"queries": payload["queries"],
}
if cache_query_context:
return_value.update(cache_key=payload["cache_key"])

return return_value

def run_async(self):
# TODO: confirm cache backend is configured
job_metadata = async_query_manager.init_job(self._async_channel_id)
load_chart_data_into_cache.delay(job_metadata, self._form_data)

return job_metadata

def set_query_context(self, form_data: Dict) -> None:
self._form_data = form_data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest doing this "init" logic in the ctor, as these set_xxx methods may be missed, and adding extra functionality down the road will require all usages of this CMD to be updated to call these

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryContext was created via the constructor originally, but varying use cases required breaking it out into it's own method.

try:
self._query_context = ChartDataQueryContextSchema().load(self._form_data)
except KeyError:
raise ChartDataValidationError("Request is incorrect")
except ValidationError as error:
raise ChartDataValidationError(
"Request is incorrect: %(error)s", error=error.messages
)

def validate(self, form_data: Dict) -> None:
self.set_query_context(form_data)
self._query_context.raise_for_access()

def validate_request(self, request: Dict):
jwt_data = async_query_manager.parse_jwt_from_request(request)
self._async_channel_id = jwt_data["channel"]

def load_query_context_from_cache(self, cache_key: str) -> Dict[str, Any]:
if cache_key and cache:
cache_value = cache.get(cache_key)
if cache_value:
return cache_value["data"]
else:
raise ChartDataCacheLoadError("Cached data not found")
12 changes: 12 additions & 0 deletions superset/charts/commands/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,15 @@ class ChartForbiddenError(ForbiddenError):

class ChartBulkDeleteFailedError(CreateFailedError):
message = _("Charts could not be deleted.")


class ChartDataValidationError(CommandException):
pass


class ChartDataQueryFailedError(CommandException):
pass


class ChartDataCacheLoadError(CommandException):
pass
Loading