-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(SIP-39): Async query support for charts #11499
Changes from 4 commits
67f7d5f
1e8c039
a50750b
64fbfae
219ce77
e377b31
f7ac5b6
3a16283
6805180
d5eef4f
467c6bb
f867cd5
8111ea6
3b41f16
49b6b52
0e5c09e
e2dc30e
276c84e
4566c01
9eba0c4
d2e4529
0bd7d67
4441459
5896799
5999bf3
17215a3
27e4548
119cae7
9666408
d2ef464
a8607c0
e40eb45
f491789
f01740e
9e02017
838c526
f0de265
066504f
d6c8a1d
fc5753a
088a49c
c9b871e
89925a5
0ad7234
c72b4c6
1fb7489
024da76
887754b
601ec51
df673cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
import logging | ||
from typing import Any, Dict, Optional | ||
|
||
from marshmallow import ValidationError | ||
|
||
from superset import cache | ||
from superset.charts.commands.exceptions import ( | ||
ChartDataCacheLoadError, | ||
ChartDataQueryFailedError, | ||
ChartDataValidationError, | ||
) | ||
from superset.charts.schemas import ChartDataQueryContextSchema | ||
from superset.commands.base import BaseCommand | ||
from superset.common.query_context import QueryContext | ||
from superset.exceptions import CacheLoadError | ||
from superset.extensions import async_query_manager | ||
from superset.tasks.async_queries import load_chart_data_into_cache | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ChartDataCommand(BaseCommand): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Primary command module for running chart queries (sync or async) via new |
||
def __init__(self): | ||
self._form_data = None | ||
self._query_context: Optional[QueryContext] = None | ||
self._async_channel_id = None | ||
|
||
def run(self, **kwargs): | ||
# caching is handled in query_context.get_df_payload (also evals `force` property) | ||
cache_query_context = kwargs["cache"] if "cache" in kwargs else False | ||
force_cached = kwargs["force_cached"] if "force_cached" in kwargs else False | ||
try: | ||
payload = self._query_context.get_payload( | ||
cache_query_context=cache_query_context, force_cached=force_cached | ||
) | ||
except CacheLoadError: | ||
raise ChartDataCacheLoadError() | ||
|
||
for query in payload["queries"]: | ||
if query.get("error"): | ||
raise ChartDataQueryFailedError(f"Error: {query['error']}") | ||
|
||
return_value = { | ||
"query_context": self._query_context, | ||
"queries": payload["queries"], | ||
} | ||
if cache_query_context: | ||
return_value.update(cache_key=payload["cache_key"]) | ||
|
||
return return_value | ||
|
||
def run_async(self): | ||
# TODO: confirm cache backend is configured | ||
job_metadata = async_query_manager.init_job(self._async_channel_id) | ||
load_chart_data_into_cache.delay(job_metadata, self._form_data) | ||
|
||
return job_metadata | ||
|
||
def set_query_context(self, form_data: Dict) -> None: | ||
self._form_data = form_data | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest doing this "init" logic in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
try: | ||
self._query_context = ChartDataQueryContextSchema().load(self._form_data) | ||
except KeyError: | ||
raise ChartDataValidationError("Request is incorrect") | ||
except ValidationError as error: | ||
raise ChartDataValidationError( | ||
"Request is incorrect: %(error)s", error=error.messages | ||
) | ||
|
||
def validate(self, form_data: Dict) -> None: | ||
self.set_query_context(form_data) | ||
self._query_context.raise_for_access() | ||
|
||
def validate_request(self, request: Dict): | ||
jwt_data = async_query_manager.parse_jwt_from_request(request) | ||
self._async_channel_id = jwt_data["channel"] | ||
|
||
def load_query_context_from_cache(self, cache_key: str) -> Dict[str, Any]: | ||
if cache_key and cache: | ||
cache_value = cache.get(cache_key) | ||
if cache_value: | ||
return cache_value["data"] | ||
else: | ||
raise ChartDataCacheLoadError("Cached data not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New endpoint to fetch cached data based on a cache key passed to the FE via the async event payload.