Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SparkSubmitTask and deprecated SparkJob, Spark1xJob and PySpark1xJob #812

Merged
merged 1 commit into from
Mar 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ client.cfg
hadoop_test.py
minicluster.py
mrrunner.py
pig_property_file

packages.tar

Expand Down Expand Up @@ -70,7 +71,7 @@ coverage.xml
*.log

# Sphinx documentation
docs/_build/
doc/_build/

# PyBuilder
target/
Expand Down
79 changes: 72 additions & 7 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,78 @@ worker-disconnect-delay
[spark]
-------

Parameters controlling the running of Spark jobs
Parameters controlling the default execution of ``SparkSubmitTask``:

.. deprecated:: 1.1.1
``SparkJob``, ``Spark1xJob`` and ``PySpark1xJob`` are deprecated. Please use ``SparkSubmitTask``.

spark-submit
Command to run in order to submit spark jobs. Default: spark-submit

master
Master url to use for spark-submit. Example: local[*]. Default: Spark default (Prior to 1.1.0: yarn-client)

deploy-mode
Whether to launch the driver programs locally ("client") or on one of the worker machines inside the cluster ("cluster"). Default: Spark default

jars
Comma-separated list of local jars to include on the driver and executor classpaths. Default: Spark default

py-files
Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. Default: Spark default

files
Comma-separated list of files to be placed in the working directory of each executor. Default: Spark default

conf:
Arbitrary Spark configuration property in the form Prop=Value|Prop2=Value2. Default: Spark default

properties-file
Path to a file from which to load extra properties. Default: Spark default

driver-memory
Memory for driver (e.g. 1000M, 2G). Default: Spark default

driver-java-options
Extra Java options to pass to the driver. Default: Spark default

driver-library-path
Extra library path entries to pass to the driver. Default: Spark default

driver-class-path
Extra class path entries to pass to the driver. Default: Spark default

executor-memory
Memory per executor (e.g. 1000M, 2G). Default: Spark default

*Configuration for Spark submit jobs on Spark standalone with cluster deploy mode only:*

driver-cores
Cores for driver. Default: Spark default

supervise
If given, restarts the driver on failure. Default: Spark default

*Configuration for Spark submit jobs on Spark standalone and Mesos only:*

total-executor-cores
Total cores for all executors. Default: Spark default

*Configuration for Spark submit jobs on YARN only:*

executor-cores
Number of cores per executor. Default: Spark default

queue
The YARN queue to submit to. Default: Spark default

num-executors
Number of executors to launch. Default: Spark default

archives
Comma separated list of archives to be extracted into the working directory of each executor. Default: Spark default

*Parameters controlling the execution of SparkJob jobs (deprecated):*

spark-jar
Location of the spark jar. Sets SPARK_JAR environment variable when
Expand All @@ -460,12 +531,6 @@ hadoop-conf-dir
spark-class
Location of script to invoke. Example: /usr/share/spark/spark-class

spark-submit
Command to run in order to submit spark jobs. Default: spark-submit

spark-master
Master url to use for spark-submit. Example: local[*]. Default: yarn-client


[task_history]
--------------
Expand Down
72 changes: 72 additions & 0 deletions examples/pyspark_wc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask


class PySparkWordCount(SparkSubmitTask):
"""
This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
over the target data in :py:meth:`wordcount.input` (a file in S3) and
writes the result into its :py:meth:`wordcount.output` target (a file in S3).
This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
Example luigi configuration::
[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: spark://spark.example.org:7077
deploy-mode: client
"""
driver_memory = '2g'
executor_memory = '3g'
total_executor_cores = luigi.IntParameter(default=100)

name = "PySpark Word Count"
app = 'wordcount.py'

def app_options(self):
# These are passed to the Spark main args in the defined order.
return [self.input().path, self.output().path]

def input(self):
return S3Target("s3n://bucket.example.org/wordcount.input")

def output(self):
return S3Target('s3n://bucket.example.org/wordcount.output')


"""
// Corresponding example Spark Job, running Word count with Spark's Python API
// This file would have to be saved into wordcount.py
import sys
from pyspark import SparkContext
if __name__ == "__main__":
sc = SparkContext()
sc.textFile(sys.argv[1]) \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.saveAsTextFile(sys.argv[2])
"""
45 changes: 23 additions & 22 deletions examples/spark_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import luigi
import luigi.format
import luigi.hdfs
from luigi.contrib.spark import SparkJob
from luigi.contrib.spark import SparkSubmitTask


class UserItemMatrix(luigi.Task):
Expand Down Expand Up @@ -57,19 +57,33 @@ def output(self):
return luigi.hdfs.HdfsTarget('data-matrix', format=luigi.format.Gzip)


class SparkALS(SparkJob):
class SparkALS(SparkSubmitTask):
"""
This task runs a :py:class:`luigi.contrib.spark.SparkJob` task
This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
over the target data returned by :py:meth:`~/.UserItemMatrix.output` and
writes the result into its :py:meth:`~.SparkALS.output` target (a file in HDFS).
This class uses :py:meth:`luigi.contrib.spark.SparkJob.run`.
"""
This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
Example luigi configuration::
[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: yarn-client
"""
data_size = luigi.IntParameter(default=1000)
spark_workers = '100'
spark_master_memory = '2g'
spark_worker_memory = '3g'

driver_memory = '2g'
executor_memory = '3g'
num_executors = luigi.IntParameter(default=100)

app = 'my-spark-assembly.jar'
entry_class = 'com.spotify.spark.ImplicitALS'

def app_options(self):
# These are passed to the Spark main args in the defined order.
return [self.input().path, self.output().path]

def requires(self):
"""
Expand All @@ -81,18 +95,6 @@ def requires(self):
"""
return UserItemMatrix(self.data_size)

def jar(self):
# Jar containing job_class.
return 'my-spark-assembly.jar'

def job_class(self):
# The name of the Spark job object.
return 'com.spotify.spark.ImplicitALS'

def job_args(self):
# These are passed to the Spark main args in the defined order.
return ['yarn-standalone', self.input().path, self.output().path]

def output(self):
"""
Returns the target output for this task.
Expand All @@ -102,8 +104,7 @@ def output(self):
:rtype: object (:py:class:`~luigi.target.Target`)
"""
# The corresponding Spark job outputs as GZip format.
return luigi.hdfs.HdfsTarget(
'%s/als-output/*' % self.item_type, format=luigi.format.Gzip)
return luigi.hdfs.HdfsTarget('%s/als-output/*' % self.item_type, format=luigi.format.Gzip)


'''
Expand Down
Loading