From b2066da3274e5128a37d767e1951d9e3e08ae111 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Fri, 28 Jul 2017 04:01:23 +0000 Subject: [PATCH] [bugfix] fix merge conflict that broke Hive support --- superset/config.py | 6 +++++- superset/db_engine_specs.py | 35 +++++++++++++++++++++++------------ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/superset/config.py b/superset/config.py index 2c27415bf2983..47126f8bd4f40 100644 --- a/superset/config.py +++ b/superset/config.py @@ -308,8 +308,12 @@ class CeleryConfig(object): # configuration. These blueprints will get integrated in the app BLUEPRINTS = [] -try: +# Provide a callable that receives a tracking_url and returns another +# URL. This is used to translate internal Hadoop job tracker URL +# into a proxied one +TRACKING_URL_TRANSFORMER = lambda x: x +try: if CONFIG_PATH_ENV_VAR in os.environ: # Explicitly import config module that is not in pythonpath; useful # for case where app is being executed via pex. diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 848145845ebcf..b43e94e911f86 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -31,8 +31,9 @@ from superset.utils import SupersetTemplateException from superset.utils import QueryStatus -from superset import utils -from superset import cache_util +from superset import conf, cache_util, utils + +tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER') Grain = namedtuple('Grain', 'name label function') @@ -683,7 +684,7 @@ def adjust_database_uri(cls, uri, selected_schema=None): @classmethod def progress(cls, log_lines): total_jobs = 1 # assuming there's at least 1 job - current_job = None + current_job = 1 stages = {} for line in log_lines: match = cls.jobs_stats_r.match(line) @@ -692,6 +693,7 @@ def progress(cls, log_lines): match = cls.launching_job_r.match(line) if match: current_job = int(match.groupdict()['job_number']) + total_jobs = int(match.groupdict()['max_jobs']) or 1 stages = {} match = cls.stage_progress_r.match(line) if match: @@ -701,10 +703,9 @@ def progress(cls, log_lines): stages[stage_number] = (map_progress + reduce_progress) / 2 logging.info( "Progress detail: {}, " - "total jobs: {}".format(stages, total_jobs)) + "current job {}, " + "total jobs: {}".format(stages, current_job, total_jobs)) - if not total_jobs or not current_job: - return 0 stage_progress = sum( stages.values()) / len(stages.values()) if stages else 0 @@ -731,18 +732,16 @@ def handle_cursor(cls, cursor, query, session): polled = cursor.poll() last_log_line = 0 tracking_url = None + job_id = None while polled.operationState in unfinished_states: query = session.query(type(query)).filter_by(id=query.id).one() if query.status == QueryStatus.STOPPED: cursor.cancel() break - resp = cursor.fetch_logs() - if resp and resp.log: - log = resp.log or '' - log_lines = resp.log.splitlines() - logging.info("\n".join(log_lines[last_log_line:])) - last_log_line = len(log_lines) - 1 + log = cursor.fetch_logs() or '' + if log: + log_lines = log.splitlines() progress = cls.progress(log_lines) logging.info("Progress total: {}".format(progress)) needs_commit = False @@ -754,8 +753,20 @@ def handle_cursor(cls, cursor, query, session): if tracking_url: logging.info( "Found the tracking url: {}".format(tracking_url)) + tracking_url = tracking_url_trans(tracking_url) + logging.info( + "Transformation applied: {}".format(tracking_url)) query.tracking_url = tracking_url + job_id = tracking_url.split('/')[-2] + logging.info("Job id: {}".format(job_id)) needs_commit = True + if job_id and len(log_lines) > last_log_line: + # Wait for job id before logging things out + # this allows for prefixing all log lines and becoming + # searchable in something like Kibana + for l in log_lines[last_log_line:]: + logging.info("[{}] {}".format(job_id, l)) + last_log_line = len(log_lines) if needs_commit: session.commit() time.sleep(5)