Skip to content

Commit

Permalink
New section on basics and function syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed May 28, 2014
1 parent e38f559 commit 1c81477
Showing 1 changed file with 256 additions and 2 deletions.
258 changes: 256 additions & 2 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ JavaRDD<Integer> distData = sc.parallelize(data);
Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -> a + b)` to add up the elements of the list.
We describe operations on distributed datasets later on.

**Note:** *In this guide, we'll often use the concise Java 8 lambda syntax to specify Java functions, but
in older versions of Java you can implement the interfaces in the
[org.apache.spark.api.java.function](api/java/org/apache/spark/api/java/function/package-summary.html) package.
For example, for the `reduce` above, we could create a
[Function2](api/java/org/apache/spark/api/java/function/Function2.html) that adds two numbers.
We describe [writing functions in Java](#java-functions) in more detail below.*

</div>

<div data-lang="python" markdown="1">
Expand Down Expand Up @@ -307,7 +314,7 @@ scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
{% endhighlight %}

Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(_.size).reduce((a, b) => a + b)`.
Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(s => s.length).reduce((a, b) => a + b)`.

Some notes on reading files with Spark:

Expand Down Expand Up @@ -398,10 +405,257 @@ All transformations in Spark are <i>lazy</i>, in that they do not compute their

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details):
### Basics

<div class="codetabs">

<div data-lang="scala" markdown="1">

To illustrate RDD basics, consider the simple program below:

{% highlight scala %}
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
{% endhighlight %}

The first line defines a base RDD from an external file. This dataset is not loaded in memory or
otherwise acted on: `lines` is merely a pointer to the file.
The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths`
is *not* immediately computed, due to laziness.
Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks
to run on separate machines, and each machine runs both its part of the map and a local reduction,
returning only its answer to the driver program.

If we also wanted to use `lineLengths` again later, we could add:

{% highlight scala %}
lineLengths.persist()
{% endhighlight %}

which would cause it to be saved in memory after the first time it is computed.

<h4 id="scala-functions">Passing Functions in Scala</h4>

Spark's API relies heavily on passing functions in the driver program to run on the cluster.
There are two recommended ways to do this:

* [Anonymous function syntax](http://docs.scala-lang.org/tutorials/tour/anonymous-function-syntax.html),
which can be used for short pieces of code.
* Static methods in a global singleton object. For example, you can define `object MyFunctions` and then
pass `MyFunctions.func1`, as follows:

{% highlight scala %}
object MyFunctions {
def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)
{% endhighlight %}

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to
a singleton object), this requires sending the object that contains that class along with the method.
For example, consider:

{% highlight scala %}
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
{% endhighlight %}

Here, if we create a `new MyClass` and call `doStuff` on it, the `map` inside there references the
`func1` method *of that `MyClass` instance*, so the whole object needs to be sent to the cluster. It is
similar to writing `rdd.map(x => this.func1(x))`.

In a similar way, accessing fields of the outer object will reference the whole object:

{% highlight scala %}
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
{% endhighlight %}

is equilvalent to writing `rdd.map(x => this.field + x)`, which references all of `this`. To avoid this
issue, the simplest way is to copy `field` into a local variable instead of accessing it externally:

{% highlight scala %}
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

To illustrate RDD basics, consider the simple program below:

{% highlight java %}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
{% endhighlight %}

The first line defines a base RDD from an external file. This dataset is not loaded in memory or
otherwise acted on: `lines` is merely a pointer to the file.
The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths`
is *not* immediately computed, due to laziness.
Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks
to run on separate machines, and each machine runs both its part of the map and a local reduction,
returning only its answer to the driver program.

If we also wanted to use `lineLengths` again later, we could add:

{% highlight java %}
lineLengths.persist();
{% endhighlight %}

which would cause it to be saved in memory after the first time it is computed.

<h4 id="java-functions">Passing Functions in Java</h4>

Spark's API relies heavily on passing functions in the driver program to run on the cluster.
In Java, functions are represented by classes implementing the interfaces in the
[org.apache.spark.api.java.function](api/java/org/apache/spark/api/java/function/package-summary.html) package.
There are two ways to create such functions:

* Implement the Function interfaces in your own class, either as an anonymous inner class or a named one,
and pass an instance of it to Spark.
* In Java 8, use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
to concisely define an implementation.

While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs
in long-form. For example, we could have written our code above as follows:

{% highlight java %}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
{% endhighlight %}

Or, if writing the functions inline is unwieldy:

{% highlight java %}
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
{% endhighlight %}

Note that anonymous inner classes in Java can also access variables in the enclosing scope as long
as they are marked `final`. Spark will ship copies of these variables to each worker node as it does
for other languages.

</div>

<div data-lang="python" markdown="1">

To illustrate RDD basics, consider the simple program below:

{% highlight python %}
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
{% endhighlight %}

The first line defines a base RDD from an external file. This dataset is not loaded in memory or
otherwise acted on: `lines` is merely a pointer to the file.
The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths`
is *not* immediately computed, due to laziness.
Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks
to run on separate machines, and each machine runs both its part of the map and a local reduction,
returning only its answer to the driver program.

If we also wanted to use `lineLengths` again later, we could add:

{% highlight scala %}
lineLengths.persist()
{% endhighlight %}

which would cause it to be saved in memory after the first time it is computed.

<h4 id="python-functions">Passing Functions in Python</h4>

Spark's API relies heavily on passing functions in the driver program to run on the cluster.
There are three recommended ways to do this:

* [Lambda expressions](https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions),
for simple functions that can be written as an expression. (Lambdas do not support multi-statement
functions or statements that do not return a value.)
* Local `def`s inside the function calling into Spark, for longer code.
* Top-level functions in a module.

For example, to pass a longer function than can be supported using a `lambda`, consider
the code below:

{% highlight python %}
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)

sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
{% endhighlight %}

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to
a singleton object), this requires sending the object that contains that class along with the method.
For example, consider:

{% highlight python %}
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
{% endhighlight %}

Here, if we create a `new MyClass` and call `doStuff` on it, the `map` inside there references the
`func` method *of that `MyClass` instance*, so the whole object needs to be sent to the cluster.

In a similar way, accessing fields of the outer object will reference the whole object:

{% highlight python %}
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + x)
{% endhighlight %}

To avoid this issue, the simplest way is to copy `field` into a local variable instead
of accessing it externally:

{% highlight python %}
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + x)
{% endhighlight %}

</div>

</div>

### Working with Key-Value Pairs

### Transformations

The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details):

<table class="table">
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
<tr>
Expand Down

0 comments on commit 1c81477

Please sign in to comment.