From e3cf7776feb136e9e7c37d5628514312ee2b8c59 Mon Sep 17 00:00:00 2001 From: Thierry Jossermoz Date: Thu, 5 Mar 2015 00:29:57 +1300 Subject: [PATCH] Added SparkSubmitTask and deprecated SparkJob, Spark1xJob and PySpark1xJob --- .gitignore | 3 +- doc/configuration.rst | 79 +++++- examples/pyspark_wc.py | 72 +++++ examples/spark_als.py | 45 +-- luigi/contrib/spark.py | 467 ++++++++++++++++++++----------- test/{ => contrib}/spark_test.py | 205 +++++++++++--- 6 files changed, 637 insertions(+), 234 deletions(-) create mode 100644 examples/pyspark_wc.py rename test/{ => contrib}/spark_test.py (58%) diff --git a/.gitignore b/.gitignore index 0d47938e13..a4021efed1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ client.cfg hadoop_test.py minicluster.py mrrunner.py +pig_property_file packages.tar @@ -70,7 +71,7 @@ coverage.xml *.log # Sphinx documentation -docs/_build/ +doc/_build/ # PyBuilder target/ diff --git a/doc/configuration.rst b/doc/configuration.rst index 7f5ed34c3b..d1e9f9f02f 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -446,7 +446,78 @@ worker-disconnect-delay [spark] ------- -Parameters controlling the running of Spark jobs +Parameters controlling the default execution of ``SparkSubmitTask``: + +.. deprecated:: 1.1.0 + ``SparkJob``, ``Spark1xJob`` and ``PySpark1xJob`` are deprecated. Please use ``SparkSubmitTask``. + +spark-submit + Command to run in order to submit spark jobs. Default: spark-submit + +master + Master url to use for spark-submit. Example: local[*]. Default: Spark default (Prior to 1.1.0: yarn-client) + +deploy-mode + Whether to launch the driver programs locally ("client") or on one of the worker machines inside the cluster ("cluster"). Default: Spark default + +jars + Comma-separated list of local jars to include on the driver and executor classpaths. Default: Spark default + +py-files + Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. Default: Spark default + +files + Comma-separated list of files to be placed in the working directory of each executor. Default: Spark default + +conf: + Arbitrary Spark configuration property in the form Prop=Value|Prop2=Value2. Default: Spark default + +properties-file + Path to a file from which to load extra properties. Default: Spark default + +driver-memory + Memory for driver (e.g. 1000M, 2G). Default: Spark default + +driver-java-options + Extra Java options to pass to the driver. Default: Spark default + +driver-library-path + Extra library path entries to pass to the driver. Default: Spark default + +driver-class-path + Extra class path entries to pass to the driver. Default: Spark default + +executor-memory + Memory per executor (e.g. 1000M, 2G). Default: Spark default + +*Configuration for Spark submit jobs on Spark standalone with cluster deploy mode only:* + +driver-cores + Cores for driver. Default: Spark default + +supervise + If given, restarts the driver on failure. Default: Spark default + +*Configuration for Spark submit jobs on Spark standalone and Mesos only:* + +total-executor-cores + Total cores for all executors. Default: Spark default + +*Configuration for Spark submit jobs on YARN only:* + +executor-cores + Number of cores per executor. Default: Spark default + +queue + The YARN queue to submit to. Default: Spark default + +num-executors + Number of executors to launch. Default: Spark default + +archives + Comma separated list of archives to be extracted into the working directory of each executor. Default: Spark default + +*Parameters controlling the execution of SparkJob jobs (deprecated):* spark-jar Location of the spark jar. Sets SPARK_JAR environment variable when @@ -460,12 +531,6 @@ hadoop-conf-dir spark-class Location of script to invoke. Example: /usr/share/spark/spark-class -spark-submit - Command to run in order to submit spark jobs. Default: spark-submit - -spark-master - Master url to use for spark-submit. Example: local[*]. Default: yarn-client - [task_history] -------------- diff --git a/examples/pyspark_wc.py b/examples/pyspark_wc.py new file mode 100644 index 0000000000..85fc8ee14b --- /dev/null +++ b/examples/pyspark_wc.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2012-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import luigi +from luigi.s3 import S3Target +from luigi.contrib.spark import SparkSubmitTask + + +class PySparkWordCount(SparkSubmitTask): + """ + This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task + over the target data in :py:meth:`wordcount.input` (a file in S3) and + writes the result into its :py:meth:`wordcount.output` target (a file in S3). + + This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`. + + Example luigi configuration:: + + [spark] + spark-submit: /usr/local/spark/bin/spark-submit + master: spark://spark.example.org:7077 + deploy-mode: client + + """ + driver_memory = '2g' + executor_memory = '3g' + total_executor_cores = luigi.IntParameter(default=100) + + name = "PySpark Word Count" + app = 'wordcount.py' + + def app_options(self): + # These are passed to the Spark main args in the defined order. + return [self.input().path, self.output().path] + + def input(self): + return S3Target("s3n://bucket.example.org/wordcount.input") + + def output(self): + return S3Target('s3n://bucket.example.org/wordcount.output') + + +""" +// Corresponding example Spark Job, running Word count with Spark's Python API +// This file would have to be saved into wordcount.py + +import sys +from pyspark import SparkContext + +if __name__ == "__main__": + + sc = SparkContext() + sc.textFile(sys.argv[1]) \ + .flatMap(lambda line: line.split()) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a + b) \ + .saveAsTextFile(sys.argv[2]) + +""" diff --git a/examples/spark_als.py b/examples/spark_als.py index ea3f5cfddc..b2fcda2214 100644 --- a/examples/spark_als.py +++ b/examples/spark_als.py @@ -20,7 +20,7 @@ import luigi import luigi.format import luigi.hdfs -from luigi.contrib.spark import SparkJob +from luigi.contrib.spark import SparkSubmitTask class UserItemMatrix(luigi.Task): @@ -57,19 +57,33 @@ def output(self): return luigi.hdfs.HdfsTarget('data-matrix', format=luigi.format.Gzip) -class SparkALS(SparkJob): +class SparkALS(SparkSubmitTask): """ - This task runs a :py:class:`luigi.contrib.spark.SparkJob` task + This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task over the target data returned by :py:meth:`~/.UserItemMatrix.output` and writes the result into its :py:meth:`~.SparkALS.output` target (a file in HDFS). - This class uses :py:meth:`luigi.contrib.spark.SparkJob.run`. - """ + This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`. + + Example luigi configuration:: + + [spark] + spark-submit: /usr/local/spark/bin/spark-submit + master: yarn-client + """ data_size = luigi.IntParameter(default=1000) - spark_workers = '100' - spark_master_memory = '2g' - spark_worker_memory = '3g' + + driver_memory = '2g' + executor_memory = '3g' + num_executors = luigi.IntParameter(default=100) + + app = 'my-spark-assembly.jar' + entry_class = 'com.spotify.spark.ImplicitALS' + + def app_options(self): + # These are passed to the Spark main args in the defined order. + return [self.input().path, self.output().path] def requires(self): """ @@ -81,18 +95,6 @@ def requires(self): """ return UserItemMatrix(self.data_size) - def jar(self): - # Jar containing job_class. - return 'my-spark-assembly.jar' - - def job_class(self): - # The name of the Spark job object. - return 'com.spotify.spark.ImplicitALS' - - def job_args(self): - # These are passed to the Spark main args in the defined order. - return ['yarn-standalone', self.input().path, self.output().path] - def output(self): """ Returns the target output for this task. @@ -102,8 +104,7 @@ def output(self): :rtype: object (:py:class:`~luigi.target.Target`) """ # The corresponding Spark job outputs as GZip format. - return luigi.hdfs.HdfsTarget( - '%s/als-output/*' % self.item_type, format=luigi.format.Gzip) + return luigi.hdfs.HdfsTarget('%s/als-output/*' % self.item_type, format=luigi.format.Gzip) ''' diff --git a/luigi/contrib/spark.py b/luigi/contrib/spark.py index eea408c16a..6868b9e9db 100644 --- a/luigi/contrib/spark.py +++ b/luigi/contrib/spark.py @@ -25,7 +25,8 @@ import sys import tempfile import time - +import warnings +from luigi import six import luigi import luigi.format import luigi.hdfs @@ -33,23 +34,6 @@ logger = logging.getLogger('luigi-interface') -""" -Apache Spark on YARN support - -Example configuration section in client.cfg: - -[spark] -# assembly jar containing spark and dependencies -spark-jar: /usr/share/spark/jars/spark-assembly-0.8.1-incubating-hadoop2.2.0.jar - -# spark script to invoke -spark-class: /usr/share/spark/spark-class - -# directory containing the (client side) configuration files for the hadoop cluster -hadoop-conf-dir: /etc/hadoop/conf - -""" - class SparkRunContext(object): @@ -99,7 +83,247 @@ def __str__(self): return info +class SparkSubmitTask(luigi.Task): + """ + Template task for running a Spark job + + Supports running jobs on Spark local, standalone, Mesos or Yarn + + See http://spark.apache.org/docs/latest/submitting-applications.html + for more information + + """ + + # Application (.jar or .py file) + name = None + entry_class = None + app = None + + def app_options(self): + """ + Subclass this method to map your task parameters to the driver app's arguments + + """ + return [] + + @property + def spark_submit(self): + return configuration.get_config().get('spark', 'spark-submit', 'spark-submit') + + @property + def master(self): + return configuration.get_config().get("spark", "master", None) + + @property + def deploy_mode(self): + return configuration.get_config().get("spark", "deploy-mode", None) + + @property + def jars(self): + return configuration.get_config().get("spark", "jars", None) + + @property + def py_files(self): + return configuration.get_config().get("spark", "py-files", None) + + @property + def files(self): + return configuration.get_config().get("spark", "files", None) + + @property + def conf(self): + return configuration.get_config().get("spark", "conf", None) + + @property + def properties_file(self): + return configuration.get_config().get("spark", "properties-file", None) + + @property + def driver_memory(self): + return configuration.get_config().get("spark", "driver-memory", None) + + @property + def driver_java_options(self): + return configuration.get_config().get("spark", "driver-java-options", None) + + @property + def driver_library_path(self): + return configuration.get_config().get("spark", "driver-library-path", None) + + @property + def driver_class_path(self): + return configuration.get_config().get("spark", "driver-class-path", None) + + @property + def executor_memory(self): + return configuration.get_config().get("spark", "executor-memory", None) + + @property + def driver_cores(self): + return configuration.get_config().get("spark", "driver-cores", None) + + @property + def supervise(self): + return bool(configuration.get_config().get("spark", "supervise", False)) + + @property + def total_executor_cores(self): + return configuration.get_config().get("spark", "total-executor-cores", None) + + @property + def executor_cores(self): + return configuration.get_config().get("spark", "executor-cores", None) + + @property + def queue(self): + return configuration.get_config().get("spark", "queue", None) + + @property + def num_executors(self): + return configuration.get_config().get("spark", "num-executors", None) + + @property + def archives(self): + return configuration.get_config().get("spark", "archives", None) + + def spark_heartbeat(self, line, spark_run_context): + pass + + def spark_command(self): + command = [self.spark_submit] + if self.master: + command += ['--master', self.master] + if self.deploy_mode: + command += ['--deploy-mode', self.deploy_mode] + if self.name: + command += ['--name', self.name] + if self.entry_class: + command += ['--class', self.entry_class] + if self.jars: + if isinstance(self.jars, (list, tuple)): + command += ['--jars', ','.join(self.jars)] + elif isinstance(self.jars, six.string_types): + command += ['--jars', self.jars] + if self.py_files: + if isinstance(self.py_files, (list, tuple)): + command += ['--py-files', ','.join(self.py_files)] + elif isinstance(self.py_files, six.string_types): + command += ['--py-files', self.py_files] + if self.files: + if isinstance(self.files, (list, tuple)): + command += ['--files', ','.join(self.files)] + elif isinstance(self.files, six.string_types): + command += ['--files', self.files] + if self.archives: + if isinstance(self.archives, (list, tuple)): + command += ['--archives', ','.join(self.archives)] + elif isinstance(self.archives, six.string_types): + command += ['--archives', self.archives] + if self.conf: + conf = None + if isinstance(self.conf, six.string_types): + conf = dict(map(lambda i: i.split('='), self.conf.split('|'))) + if isinstance(self.conf, dict): + conf = self.conf + if conf: + for prop, value in conf.items(): + command += ['--conf', '{0}="{1}"'.format(prop, value)] + if self.properties_file: + command += ['--properties-file', self.properties_file] + if self.driver_memory: + command += ['--driver-memory', self.driver_memory] + if self.driver_java_options: + command += ['--driver-java-options', self.driver_java_options] + if self.driver_library_path: + command += ['--driver-library-path', self.driver_library_path] + if self.driver_class_path: + command += ['--driver-class-path', self.driver_class_path] + if self.executor_memory: + command += ['--executor-memory', self.executor_memory] + if self.driver_cores: + command += ['--driver-cores', self.driver_cores] + if self.supervise: + command += ['--supervise'] + if self.total_executor_cores: + command += ['--total-executor-cores', self.total_executor_cores] + if self.executor_cores: + command += ['--executor-cores', self.executor_cores] + if self.queue: + command += ['--queue', self.queue] + if self.num_executors: + command += ['--num-executors', self.num_executors] + return command + + def app_command(self): + if not self.app: + raise NotImplementedError("subclass should define an app (.jar or .py file)") + return [self.app] + self.app_options() + + def run(self): + args = map(str, self.spark_command() + self.app_command()) + env = os.environ.copy() + temp_stderr = tempfile.TemporaryFile() + logger.info('Running: %s', ' '.join(args)) + proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=temp_stderr, env=env, close_fds=True) + return_code, final_state, app_id = self.track_progress(proc) + if final_state == 'FAILED': + raise SparkJobError("Spark job failed: see your master's logs for {0}".format(app_id)) + elif return_code != 0: + temp_stderr.seek(0) + errors = "".join((x.decode('utf8') for x in temp_stderr.readlines())) + logger.error(errors) + raise SparkJobError('Spark job failed', err=errors) + + def track_progress(self, proc): + """ + The Spark client currently outputs a multiline status to stdout every second while the application is running. + + This instead captures status data and updates a single line of output until the application finishes. + """ + app_id = None + app_status = 'N/A' + url = 'N/A' + final_state = None + start = time.time() + re_app_id = re.compile('application identifier: (\w+)') + re_app_status = re.compile('yarnAppState: (\w+)') + re_url = re.compile('appTrackingUrl: (.+)') + re_final_state = re.compile('distributedFinalState: (\w+)') + with SparkRunContext() as context: + while proc.poll() is None: + s = proc.stdout.readline() + app_id_s = re_app_id.search(s) + if app_id_s: + app_id = app_id_s.group(1) + context.app_id = app_id + app_status_s = re_app_status.search(s) + if app_status_s: + app_status = app_status_s.group(1) + url_s = re_url.search(s) + if url_s: + url = url_s.group(1) + final_state_s = re_final_state.search(s) + if final_state_s: + final_state = final_state_s.group(1) + if not app_id: + logger.info(s.strip()) + else: + t_diff = time.time() - start + elapsed_mins, elapsed_secs = divmod(t_diff, 60) + status = ('[%0d:%02d] Status: %s Tracking: %s' % (elapsed_mins, elapsed_secs, app_status, url)) + sys.stdout.write("\r\x1b[K" + status) + sys.stdout.flush() + self.spark_heartbeat(s, context) + logger.info(proc.communicate()[0]) + return proc.returncode, final_state, app_id + + class SparkJob(luigi.Task): + """ + .. deprecated:: 1.1.0 + Use ``SparkSubmitTask`` instead. + + """ spark_workers = None spark_master_memory = None spark_worker_memory = None @@ -138,6 +362,7 @@ def output(self): raise NotImplementedError("subclass should define HDFS output path") def run(self): + warnings.warn("The use of SparkJob is deprecated. Please use SparkSubmitTask.", stacklevel=2) original_output_path = self.output().path path_no_slash = original_output_path[:-2] if original_output_path.endswith('/*') else original_output_path path_no_slash = original_output_path[:-1] if original_output_path[-1] == '/' else path_no_slash @@ -226,174 +451,84 @@ def track_progress(self, proc): return proc.returncode, final_state, app_id -class Spark1xJob(luigi.Task): +class Spark1xBackwardCompat(SparkSubmitTask): + """ + Adapts SparkSubmitTask interface to (Py)Spark1xJob interface - num_executors = None - driver_memory = None - executor_memory = None - executor_cores = None - deploy_mode = None - queue = None - spark_master = configuration.get_config().get("spark", "spark-master", "yarn-client") + """ + # Old interface + @property + def master(self): + return configuration.get_config().get("spark", "master", "yarn-client") - def jar(self): - raise NotImplementedError("subclass should define jar " - "containing job_class") + def output(self): + raise NotImplementedError("subclass should define an output target") - def dependency_jars(self): - """ - Override to provide a list of dependency jars. - """ + def spark_options(self): return [] - def job_class(self): - raise NotImplementedError("subclass should define Spark job_class") - - def spark_options(self): + def dependency_jars(self): return [] def job_args(self): return [] - def output(self): - raise NotImplementedError("subclass should define HDFS output path") + # New interface + @property + def jars(self): + return self.dependency_jars() - def spark_heartbeat(self, line, spark_run_context): - pass + def app_options(self): + return self.job_args() - def run(self): - spark_submit = configuration.get_config().get('spark', 'spark-submit', - 'spark-submit') - options = [ - '--class', self.job_class(), - ] - if self.num_executors is not None: - options += ['--num-executors', self.num_executors] - if self.driver_memory is not None: - options += ['--driver-memory', self.driver_memory] - if self.executor_memory is not None: - options += ['--executor-memory', self.executor_memory] - if self.executor_cores is not None: - options += ['--executor-cores', self.executor_cores] - if self.deploy_mode is not None: - options += ['--deploy-mode', self.deploy_mode] - if self.queue is not None: - options += ['--queue', self.queue] - if self.spark_master is not None: - options += ['--master', self.spark_master] - dependency_jars = self.dependency_jars() - if dependency_jars != []: - options += ['--jars', ','.join(dependency_jars)] - args = [spark_submit] + options + self.spark_options() + \ - [self.jar()] + list(self.job_args()) - args = map(str, args) - env = os.environ.copy() - temp_stderr = tempfile.TemporaryFile() - logger.info('Running: %s', repr(args)) - proc = subprocess.Popen(args, stdout=subprocess.PIPE, - stderr=temp_stderr, env=env, close_fds=True) - return_code, final_state, app_id = self.track_progress(proc) - if final_state == 'FAILED': - raise SparkJobError('Spark job failed: see yarn logs for {0}' - .format(app_id)) - elif return_code != 0: - temp_stderr.seek(0) - errors = "".join((x.decode('utf8') for x in temp_stderr.readlines())) - logger.error(errors) - raise SparkJobError('Spark job failed', err=errors) + def spark_command(self): + return super(Spark1xBackwardCompat, self).spark_command() + self.spark_options() - def track_progress(self, proc): - """ - The Spark client currently outputs a multiline status to stdout every second while the application is running. - This instead captures status data and updates a single line of output until the application finishes. - """ - app_id = None - app_status = 'N/A' - url = 'N/A' - final_state = None - start = time.time() - re_app_id = re.compile('application identifier: (\w+)') - re_app_status = re.compile('yarnAppState: (\w+)') - re_url = re.compile('appTrackingUrl: (.+)') - re_final_state = re.compile('distributedFinalState: (\w+)') - with SparkRunContext() as context: - while proc.poll() is None: - s = proc.stdout.readline() - app_id_s = re_app_id.search(s) - if app_id_s: - app_id = app_id_s.group(1) - context.app_id = app_id - app_status_s = re_app_status.search(s) - if app_status_s: - app_status = app_status_s.group(1) - url_s = re_url.search(s) - if url_s: - url = url_s.group(1) - final_state_s = re_final_state.search(s) - if final_state_s: - final_state = final_state_s.group(1) - if not app_id: - logger.info(s.strip()) - else: - t_diff = time.time() - start - elapsed_mins, elapsed_secs = divmod(t_diff, 60) - status = ('[%0d:%02d] Status: %s Tracking: %s' % - (elapsed_mins, elapsed_secs, app_status, url)) - sys.stdout.write("\r\x1b[K" + status) - sys.stdout.flush() - self.spark_heartbeat(s, context) - logger.info(proc.communicate()[0]) - return proc.returncode, final_state, app_id +class Spark1xJob(Spark1xBackwardCompat): + """ + .. deprecated:: 1.1.0 + Use ``SparkSubmitTask`` instead. + """ + # Old interface + def job_class(self): + raise NotImplementedError("subclass should define Spark job_class") -class PySpark1xJob(Spark1xJob): + def jar(self): + raise NotImplementedError("subclass should define jar containing job_class") + + # New interface + @property + def entry_class(self): + return self.job_class() + + @property + def app(self): + return self.jar() + + def run(self): + warnings.warn("The use of Spark1xJob is deprecated. Please use SparkSubmitTask.", stacklevel=2) + return super(Spark1xJob, self).run() - num_executors = None - driver_memory = None - executor_memory = None - executor_cores = None +class PySpark1xJob(Spark1xBackwardCompat): + """ + + .. deprecated:: 1.1.0 + Use ``SparkSubmitTask`` instead. + + """ + + # Old interface def program(self): raise NotImplementedError("subclass should define Spark .py file") - def py_files(self): - """ - Override to provide a list of py files. - """ - return [] + # New interface + @property + def app(self): + return self.program() def run(self): - spark_submit = configuration.get_config().get('spark', 'spark-submit', - 'spark-submit') - options = ['--master', self.spark_master] - if self.num_executors is not None: - options += ['--num-executors', self.num_executors] - if self.driver_memory is not None: - options += ['--driver-memory', self.driver_memory] - if self.executor_memory is not None: - options += ['--executor-memory', self.executor_memory] - if self.executor_cores is not None: - options += ['--executor-cores', self.executor_cores] - py_files = self.py_files() - if py_files != []: - options += ['--py-files', ','.join(py_files)] - dependency_jars = self.dependency_jars() - if dependency_jars != []: - options += ['--jars', ','.join(dependency_jars)] - args = [spark_submit] + options + self.spark_options() + \ - [self.program()] + list(self.job_args()) - args = map(str, args) - env = os.environ.copy() - temp_stderr = tempfile.TemporaryFile() - logger.info('Running: %s', repr(args)) - proc = subprocess.Popen(args, stdout=subprocess.PIPE, - stderr=temp_stderr, env=env, close_fds=True) - return_code, final_state, app_id = self.track_progress(proc) - if final_state == 'FAILED': - raise SparkJobError('Spark job failed: see yarn logs for %s', app_id) - elif return_code != 0: - temp_stderr.seek(0) - errors = "".join((x.decode('utf8') for x in temp_stderr.readlines())) - logger.error(errors) - raise SparkJobError('Spark job failed', err=errors) + warnings.warn("The use of PySpark1xJob is deprecated. Please use SparkSubmitTask.", stacklevel=2) + return super(PySpark1xJob, self).run() diff --git a/test/spark_test.py b/test/contrib/spark_test.py similarity index 58% rename from test/spark_test.py rename to test/contrib/spark_test.py index 40b57474dd..3ba39bbf16 100644 --- a/test/spark_test.py +++ b/test/contrib/spark_test.py @@ -25,7 +25,7 @@ import luigi import luigi.hdfs from helpers import with_config -from luigi.contrib.spark import PySpark1xJob, Spark1xJob, SparkJob, SparkJobError +from luigi.contrib.spark import PySpark1xJob, Spark1xJob, SparkJob, SparkJobError, SparkSubmitTask from luigi.mock import MockFile from mock import patch @@ -57,12 +57,6 @@ class SparkTest(unittest.TestCase): sj = 'sj-stub' sc = 'sc-sub' - def setUp(self): - pass - - def tearDown(self): - pass - @with_config({'spark': {'hadoop-conf-dir': hcd, 'yarn-conf-dir': ycd, 'spark-jar': sj, 'spark-class': sc}}) @patch('subprocess.Popen') def test_run(self, mock): @@ -156,19 +150,9 @@ def output(self): class Spark1xTest(unittest.TestCase): - hcd = 'hcd-stub' - ycd = 'ycd-stub' - sj = 'sj-stub' ss = 'ss-stub' - def setUp(self): - pass - - def tearDown(self): - pass - - @with_config({'spark': {'hadoop-conf-dir': hcd, 'yarn-conf-dir': ycd, - 'spark-jar': sj, 'spark-submit': ss}}) + @with_config({'spark': {'spark-submit': ss}}) @patch('subprocess.Popen') def test_run(self, mock): arglist_result = [] @@ -201,14 +185,11 @@ def communicate(self): job = Test1xJob() job.run() self.assertEqual(len(arglist_result), 1) - self.assertEqual(list(arglist_result[0])[0:6], - [self.ss, '--class', job.job_class(), - '--master', 'yarn-client', job.jar()]) + self.assertEqual(list(arglist_result[0])[0:6], [self.ss, '--master', 'yarn-client', '--class', job.job_class(), job.jar()]) finally: luigi.hdfs.HdfsTarget, subprocess.Popen = h, p # restore - @with_config({'spark': {'hadoop-conf-dir': hcd, 'yarn-conf-dir': ycd, - 'spark-jar': sj, 'spark-submit': ss}}) + @with_config({'spark': {'spark-submit': ss}}) def test_handle_failed_job(self): def Popen_fake(arglist, stdout=None, stderr=None, env=None, close_fds=True): @@ -261,19 +242,9 @@ def output(self): class PySpark1xTest(unittest.TestCase): - hcd = 'hcd-stub' - ycd = 'ycd-stub' - sj = 'sj-stub' ss = 'ss-stub' - def setUp(self): - pass - - def tearDown(self): - pass - - @with_config({'spark': {'hadoop-conf-dir': hcd, 'yarn-conf-dir': ycd, - 'spark-jar': sj, 'spark-submit': ss}}) + @with_config({'spark': {'spark-submit': ss}}) @patch('subprocess.Popen') def test_run(self, mock): arglist_result = [] @@ -306,13 +277,11 @@ def communicate(self): job = TestPySpark1xJob() job.run() self.assertEqual(len(arglist_result), 1) - self.assertEqual(list(arglist_result[0])[0:6], - [self.ss, '--master', 'yarn-client', job.program()]) + self.assertEqual(list(arglist_result[0])[0:6], [self.ss, '--master', 'yarn-client', job.program()]) finally: luigi.hdfs.HdfsTarget, subprocess.Popen = h, p # restore - @with_config({'spark': {'hadoop-conf-dir': hcd, 'yarn-conf-dir': ycd, - 'spark-jar': sj, 'spark-submit': ss}}) + @with_config({'spark': {'spark-submit': ss}}) def test_handle_failed_job(self): def Popen_fake(arglist, stdout=None, stderr=None, env=None, close_fds=True): @@ -350,3 +319,163 @@ def communicate(self): self.fail("Should have thrown SparkJobError") finally: subprocess.Popen = p + + +class TestSparkSubmitTask(SparkSubmitTask): + deploy_mode = "client" + name = "AppName" + entry_class = "org.test.MyClass" + jars = ["jars/my.jar"] + py_files = ["file1.py", "file2.py"] + files = ["file1", "file2"] + conf = {"Prop": "Value"} + properties_file = "conf/spark-defaults.conf" + driver_memory = "4G" + driver_java_options = "-Xopt" + driver_library_path = "library/path" + driver_class_path = "class/path" + executor_memory = "8G" + driver_cores = 8 + supervise = True + total_executor_cores = 150 + executor_cores = 10 + queue = "queue" + num_executors = 2 + archives = ["archive1", "archive2"] + app = "file" + + def app_options(self): + return ["arg1", "arg2"] + + def output(self): + return luigi.LocalTarget('output') + + +class TestDefaultSparkSubmitTask(SparkSubmitTask): + app = 'test.py' + + def output(self): + return luigi.LocalTarget('output') + + +class SparkSubmitTest(unittest.TestCase): + ss = 'ss-stub' + + @with_config({'spark': {'spark-submit': ss, 'spark-master': "local[*]"}}) + @patch('subprocess.Popen') + def test_run(self, mock): + arglist_result = [] + + def Popen_fake(arglist, stdout=None, stderr=None, env=None, close_fds=True): + arglist_result.append(arglist) + + class P(object): + + def wait(self): + pass + + def poll(self): + return 0 + + def communicate(self): + return 'end' + + p = P() + p.returncode = 0 + p.stderr = StringIO() + p.stdout = StringIO() + return p + + h, p = luigi.hdfs.HdfsTarget, subprocess.Popen + luigi.hdfs.HdfsTarget, subprocess.Popen = MockFile, Popen_fake + try: + MockFile.move = lambda *args, **kwargs: None + job = TestSparkSubmitTask() + job.run() + self.assertEqual(len(arglist_result), 1) + self.assertEqual(list(arglist_result[0]), + ['ss-stub', '--deploy-mode', 'client', '--name', 'AppName', '--class', 'org.test.MyClass', + '--jars', 'jars/my.jar', '--py-files', 'file1.py,file2.py', '--files', 'file1,file2', + '--archives', 'archive1,archive2', '--conf', 'Prop="Value"', '--properties-file', 'conf/spark-defaults.conf', + '--driver-memory', '4G', '--driver-java-options', '-Xopt', '--driver-library-path', 'library/path', + '--driver-class-path', 'class/path', '--executor-memory', '8G', '--driver-cores', '8', '--supervise', + '--total-executor-cores', '150', '--executor-cores', '10', '--queue', 'queue', '--num-executors', '2', + 'file', 'arg1', 'arg2']) + finally: + luigi.hdfs.HdfsTarget, subprocess.Popen = h, p # restore + + @with_config({'spark': {'spark-submit': ss}}) + def test_handle_failed_job(self): + def Popen_fake(arglist, stdout=None, stderr=None, env=None, close_fds=True): + class P(object): + + def wait(self): + pass + + def poll(self): + return 1 + + def communicate(self): + return 'end' + + p = P() + p.returncode = 1 + if stdout == subprocess.PIPE: + p.stdout = StringIO('stdout') + else: + stdout.write(b'stdout') + if stderr == subprocess.PIPE: + p.stderr = StringIO('stderr') + else: + stderr.write(b'stderr') + return p + + p = subprocess.Popen + subprocess.Popen = Popen_fake + try: + job = TestSparkSubmitTask() + job.run() + except SparkJobError as e: + self.assertEqual(e.err, 'stderr') + else: + self.fail("Should have thrown SparkJobError") + finally: + subprocess.Popen = p + + @with_config({'spark': {'spark-submit': ss, 'master': 'spark://host:7077', 'conf': 'prop1=val1', 'jars': 'jar1.jar,jar2.jar', + 'files': 'file1,file2', 'py-files': 'file1.py,file2.py', 'archives': 'archive1'}}) + def test_configuration(self): + arglist_result = [] + + def Popen_fake(arglist, stdout=None, stderr=None, env=None, close_fds=True): + arglist_result.append(arglist) + + class P(object): + + def wait(self): + pass + + def poll(self): + return 0 + + def communicate(self): + return 'end' + + p = P() + p.returncode = 0 + p.stderr = StringIO() + p.stdout = StringIO() + return p + + h, p = luigi.hdfs.HdfsTarget, subprocess.Popen + luigi.hdfs.HdfsTarget, subprocess.Popen = MockFile, Popen_fake + try: + MockFile.move = lambda *args, **kwargs: None + job = TestDefaultSparkSubmitTask() + job.run() + self.assertEqual(len(arglist_result), 1) + self.assertEqual(list(arglist_result[0]), ['ss-stub', '--master', 'spark://host:7077', '--jars', 'jar1.jar,jar2.jar', + '--py-files', 'file1.py,file2.py', '--files', 'file1,file2', '--archives', 'archive1', + '--conf', 'prop1="val1"', 'test.py']) + finally: + luigi.hdfs.HdfsTarget, subprocess.Popen = h, p # restore