Skip to content

Commit

Permalink
Added SparkSubmitTask and deprecated SparkJob, Spark1xJob and PySpark…
Browse files Browse the repository at this point in the history
…1xJob
  • Loading branch information
Thierry Jossermoz committed Mar 7, 2015
1 parent fed8dd5 commit e3cf777
Show file tree
Hide file tree
Showing 6 changed files with 637 additions and 234 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ client.cfg
hadoop_test.py
minicluster.py
mrrunner.py
pig_property_file

packages.tar

Expand Down Expand Up @@ -70,7 +71,7 @@ coverage.xml
*.log

# Sphinx documentation
docs/_build/
doc/_build/

# PyBuilder
target/
Expand Down
79 changes: 72 additions & 7 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,78 @@ worker-disconnect-delay
[spark]
-------

Parameters controlling the running of Spark jobs
Parameters controlling the default execution of ``SparkSubmitTask``:

.. deprecated:: 1.1.0
``SparkJob``, ``Spark1xJob`` and ``PySpark1xJob`` are deprecated. Please use ``SparkSubmitTask``.

spark-submit
Command to run in order to submit spark jobs. Default: spark-submit

master
Master url to use for spark-submit. Example: local[*]. Default: Spark default (Prior to 1.1.0: yarn-client)

deploy-mode
Whether to launch the driver programs locally ("client") or on one of the worker machines inside the cluster ("cluster"). Default: Spark default

jars
Comma-separated list of local jars to include on the driver and executor classpaths. Default: Spark default

py-files
Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. Default: Spark default

files
Comma-separated list of files to be placed in the working directory of each executor. Default: Spark default

conf:
Arbitrary Spark configuration property in the form Prop=Value|Prop2=Value2. Default: Spark default

properties-file
Path to a file from which to load extra properties. Default: Spark default

driver-memory
Memory for driver (e.g. 1000M, 2G). Default: Spark default

driver-java-options
Extra Java options to pass to the driver. Default: Spark default

driver-library-path
Extra library path entries to pass to the driver. Default: Spark default

driver-class-path
Extra class path entries to pass to the driver. Default: Spark default

executor-memory
Memory per executor (e.g. 1000M, 2G). Default: Spark default

*Configuration for Spark submit jobs on Spark standalone with cluster deploy mode only:*

driver-cores
Cores for driver. Default: Spark default

supervise
If given, restarts the driver on failure. Default: Spark default

*Configuration for Spark submit jobs on Spark standalone and Mesos only:*

total-executor-cores
Total cores for all executors. Default: Spark default

*Configuration for Spark submit jobs on YARN only:*

executor-cores
Number of cores per executor. Default: Spark default

queue
The YARN queue to submit to. Default: Spark default

num-executors
Number of executors to launch. Default: Spark default

archives
Comma separated list of archives to be extracted into the working directory of each executor. Default: Spark default

*Parameters controlling the execution of SparkJob jobs (deprecated):*

spark-jar
Location of the spark jar. Sets SPARK_JAR environment variable when
Expand All @@ -460,12 +531,6 @@ hadoop-conf-dir
spark-class
Location of script to invoke. Example: /usr/share/spark/spark-class

spark-submit
Command to run in order to submit spark jobs. Default: spark-submit

spark-master
Master url to use for spark-submit. Example: local[*]. Default: yarn-client


[task_history]
--------------
Expand Down
72 changes: 72 additions & 0 deletions examples/pyspark_wc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask


class PySparkWordCount(SparkSubmitTask):
"""
This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
over the target data in :py:meth:`wordcount.input` (a file in S3) and
writes the result into its :py:meth:`wordcount.output` target (a file in S3).
This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
Example luigi configuration::
[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: spark://spark.example.org:7077
deploy-mode: client
"""
driver_memory = '2g'
executor_memory = '3g'
total_executor_cores = luigi.IntParameter(default=100)

name = "PySpark Word Count"
app = 'wordcount.py'

def app_options(self):
# These are passed to the Spark main args in the defined order.
return [self.input().path, self.output().path]

def input(self):
return S3Target("s3n://bucket.example.org/wordcount.input")

def output(self):
return S3Target('s3n://bucket.example.org/wordcount.output')


"""
// Corresponding example Spark Job, running Word count with Spark's Python API
// This file would have to be saved into wordcount.py
import sys
from pyspark import SparkContext
if __name__ == "__main__":
sc = SparkContext()
sc.textFile(sys.argv[1]) \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.saveAsTextFile(sys.argv[2])
"""
45 changes: 23 additions & 22 deletions examples/spark_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import luigi
import luigi.format
import luigi.hdfs
from luigi.contrib.spark import SparkJob
from luigi.contrib.spark import SparkSubmitTask


class UserItemMatrix(luigi.Task):
Expand Down Expand Up @@ -57,19 +57,33 @@ def output(self):
return luigi.hdfs.HdfsTarget('data-matrix', format=luigi.format.Gzip)


class SparkALS(SparkJob):
class SparkALS(SparkSubmitTask):
"""
This task runs a :py:class:`luigi.contrib.spark.SparkJob` task
This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
over the target data returned by :py:meth:`~/.UserItemMatrix.output` and
writes the result into its :py:meth:`~.SparkALS.output` target (a file in HDFS).
This class uses :py:meth:`luigi.contrib.spark.SparkJob.run`.
"""
This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
Example luigi configuration::
[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: yarn-client
"""
data_size = luigi.IntParameter(default=1000)
spark_workers = '100'
spark_master_memory = '2g'
spark_worker_memory = '3g'

driver_memory = '2g'
executor_memory = '3g'
num_executors = luigi.IntParameter(default=100)

app = 'my-spark-assembly.jar'
entry_class = 'com.spotify.spark.ImplicitALS'

def app_options(self):
# These are passed to the Spark main args in the defined order.
return [self.input().path, self.output().path]

def requires(self):
"""
Expand All @@ -81,18 +95,6 @@ def requires(self):
"""
return UserItemMatrix(self.data_size)

def jar(self):
# Jar containing job_class.
return 'my-spark-assembly.jar'

def job_class(self):
# The name of the Spark job object.
return 'com.spotify.spark.ImplicitALS'

def job_args(self):
# These are passed to the Spark main args in the defined order.
return ['yarn-standalone', self.input().path, self.output().path]

def output(self):
"""
Returns the target output for this task.
Expand All @@ -102,8 +104,7 @@ def output(self):
:rtype: object (:py:class:`~luigi.target.Target`)
"""
# The corresponding Spark job outputs as GZip format.
return luigi.hdfs.HdfsTarget(
'%s/als-output/*' % self.item_type, format=luigi.format.Gzip)
return luigi.hdfs.HdfsTarget('%s/als-output/*' % self.item_type, format=luigi.format.Gzip)


'''
Expand Down
Loading

0 comments on commit e3cf777

Please sign in to comment.