Skip to content

Commit

Permalink
Add more API functions
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed May 28, 2014
1 parent 6a030a9 commit e11a0da
Showing 1 changed file with 100 additions and 37 deletions.
137 changes: 100 additions & 37 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ by a key.
In Scala, these operations are automatically available on RDDs containing
[Tuple2](http://www.scala-lang.org/api/{{site.SCALA_VERSION}}/index.html#scala.Tuple2) objects
(the built-in tuples in the language, created by simply writing `(a, b)`), as long as you
`import org.apache.spark.SparkContext._` in your program to enable Spark's implicit
import `org.apache.spark.SparkContext._` in your program to enable Spark's implicit
conversions. The key-value pair operations are available in the
[PairRDDFunctions](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) class,
which automatically wraps around an RDD of tuples if you import the conversions.
Expand All @@ -688,7 +688,7 @@ val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
{% endhighlight %}

We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
`counts.collect()` to bring them back to the driver program as an array of objects.

</div>
Expand Down Expand Up @@ -716,11 +716,11 @@ many times each line of text occurs in a file:

{% highlight scala %}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.map(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
{% endhighlight %}

We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
`counts.collect()` to bring them back to the driver program as an array of objects.


Expand All @@ -745,8 +745,8 @@ pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
{% endhighlight %}

We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally
`counts.collect()` to bring them back to the driver program as an array of objects.
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
`counts.collect()` to bring them back to the driver program as a list of objects.

</div>

Expand All @@ -755,7 +755,15 @@ We could also use `counts.sortByKey()`, for example, to sort the pairs by word,

### Transformations

The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details):
The following table lists some of the common transformations supported by Spark. Refer to the
RDD API doc
([Scala](api/scala/index.html#org.apache.spark.rdd.RDD),
[Java](api/java/org/apache/spark/api/java/JavaRDD.html),
[Python](api/python/pyspark.rdd.RDD-class.html))
and pair RDD functions doc
([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),
[Java](api/java/org/apache/spark/api/java/JavaPairRDD.html))
for details.

<table class="table">
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
Expand All @@ -774,12 +782,12 @@ The following tables list the transformations and actions currently supported (s
<tr>
<td> <b>mapPartitions</b>(<i>func</i>) </td>
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
</tr>
<tr>
<td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
<td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
the partition, so <i>func</i> must be of type (Int, Iterator&lt;T&gt;) => Iterator&lt;U&gt; when running on an RDD of type T.
</td>
</tr>
<tr>
Expand All @@ -790,18 +798,23 @@ The following tables list the transformations and actions currently supported (s
<td> <b>union</b>(<i>otherDataset</i>) </td>
<td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td>
</tr>
<tr>
<td> <b>intersection</b>(<i>otherDataset</i>) </td>
<td> Return a new RDD that contains the intersection of elements in the source dataset and the argument. </td>
</tr>
<tr>
<td> <b>distinct</b>([<i>numTasks</i>])) </td>
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
</tr>
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
performance.
<br />
<b>Note:</b> By default, if the RDD already has a partitioner, the task number is decided by the partition number of the partitioner, or else relies on the value of <code>spark.default.parallelism</code> if the property is set , otherwise depends on the partition number of the RDD. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
performance.
<br />
<b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD.
You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
</td>
</tr>
<tr>
Expand All @@ -814,22 +827,47 @@ The following tables list the transformations and actions currently supported (s
</tr>
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>.
</td>
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called <code>groupWith</code>. </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable&lt;V&gt;, Iterable&lt;W&gt;) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
<td> <b>cartesian</b>(<i>otherDataset</i>) </td>
<td> When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). </td>
</tr>
<tr>
<td> <b>pipe</b>(<i>command</i>, <i>[envVars]</i>) </td>
<td> Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
</tr>
<tr>
<td> <b>coalesce</b>(<i>numPartitions</i>) </td>
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
after filtering down a large dataset. </td>
</tr>
<tr>
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network. </td>
</tr>
</table>

A complete list of transformations is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD).

### Actions

The following table lists some of the common transformations supported by Spark. Refer to the
RDD API doc
([Scala](api/scala/index.html#org.apache.spark.rdd.RDD),
[Java](api/java/org/apache/spark/api/java/JavaRDD.html),
[Python](api/python/pyspark.rdd.RDD-class.html))
and pair RDD functions doc
([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),
[Java](api/java/org/apache/spark/api/java/JavaPairRDD.html))
for details.

<table class="table">
<tr><th>Action</th><th>Meaning</th></tr>
<tr>
Expand All @@ -856,49 +894,57 @@ A complete list of transformations is available in the [RDD API doc](api/scala/i
<td> <b>takeSample</b>(<i>withReplacement</i>, <i>num</i>, <i>seed</i>) </td>
<td> Return an array with a random sample of <i>num</i> elements of the dataset, with or without replacement, using the given random number generator seed. </td>
</tr>
<tr>
<td> <b>takeOrdered</b>(<i>n</i>, <i>[ordering]</i>) </td>
<td> Return the first <i>n</i> elements of the RDD using either their natural order or a custom comparator. </td>
</tr>
<tr>
<td> <b>saveAsTextFile</b>(<i>path</i>) </td>
<td> Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. </td>
</tr>
<tr>
<td> <b>saveAsSequenceFile</b>(<i>path</i>) </td>
<td> Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). </td>
<td> <b>saveAsSequenceFile</b>(<i>path</i>) <br /> (Java and Scala) </td>
<td> Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also
available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). </td>
</tr>
<tr>
<td> <b>saveAsObjectFile</b>(<i>path</i>) <br /> (Java and Scala) </td>
<td> Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using
<code>SparkContext.objectFile()</code>. </td>
</tr>
<tr>
<td> <b>countByKey</b>() </td>
<td> Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key. </td>
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
</tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems. </td>
</tr>
</table>

A complete list of actions is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD).

## RDD Persistence

One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
across operations. When you persist an RDD, each node stores any slices of it that it computes in
across operations. When you persist an RDD, each node stores any partitions of it that it computes in
memory and reuses them in other actions on that dataset (or datasets derived from it). This allows
future actions to be much faster (often by more than 10x). Caching is a key tool for building
iterative algorithms with Spark and for interactive use from the interpreter.
future actions to be much faster (often by more than 10x). Caching is a key tool for
iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time
it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant --
it is computed in an action, it will be kept in memory on the nodes. Spark's cache is fault-tolerant --
if any partition of an RDD is lost, it will automatically be recomputed using the transformations
that originally created it.

In addition, each persisted RDD can be stored using a different *storage level*, allowing you, for example,
to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space),
replicate it across nodes, or store it off-heap in [Tachyon](http://tachyon-project.org/).
These levels are chosen by passing a
These levels are set by passing a
`StorageLevel` object ([Scala](api/scala/index.html#org.apache.spark.storage.StorageLevel),
[Java](api/java/org/apache/spark/storage/StorageLevel.html),
[Python](api/python/pyspark.storagelevel.StorageLevel-class.html))
to `persist()`. The `cache()` method is a shorthand for using the default storage level,
which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of
available storage levels is:
which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The full set of
storage levels is:

<table class="table">
<tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
Expand Down Expand Up @@ -942,9 +988,9 @@ available storage levels is:
</tr>
</table>

**Note:** In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.*

Spark also automatically persists intermediate results in shuffle operatons (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` if they plan to re-use an RDD iteratively.
Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.

### Which Storage Level to Choose?

Expand All @@ -958,7 +1004,7 @@ efficiency. We recommend going through the following process to select one:
make the objects much more space-efficient, but still reasonably fast to access.

* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from
a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from
disk.

* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve
Expand All @@ -972,6 +1018,12 @@ mode has several advantages:
* It significantly reduces garbage collection costs.
* Cached data is not lost if individual executors crash.

### Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a
least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for
it to fall out of the cache, use the `RDD.unpersist()` method.

# Shared Variables

Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a
Expand Down Expand Up @@ -1044,7 +1096,7 @@ MapReduce) or sums. Spark natively supports accumulators of numeric types, and p
can add support for new types.

An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala / Python).
running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala and Python).
However, they cannot read its value.
Only the driver program can read the accumulator's value, using its `value` method.

Expand Down Expand Up @@ -1200,10 +1252,21 @@ cluster mode. The cluster location will be found based on HADOOP_CONF_DIR.
# Where to Go from Here

You can see some [example Spark programs](http://spark.apache.org/examples.html) on the Spark website.
In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `bin/run-example` script included in Spark; for example:
In addition, Spark includes several samples in the `examples` directory
([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples),
[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples),
[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python)).
Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what was changed to make the program run on a cluster.
You can run Java and Scala examples by passing the class name to Spark's `bin/run-example` script; for instance:

./bin/run-example SparkPi

For help on optimizing your program, the [configuration](configuration.html) and
For Python examples, use `spark-submit` instead:

./bin/spark-submit examples/src/main/python/pi.py

For help on optimizing your programs, the [configuration](configuration.html) and
[tuning](tuning.html) guides provide information on best practices. They are especially important for
making sure that your data is stored in memory in an efficient format.
For help on deploying, the [cluster mode overview](cluster-overview.html) describes the components involved
in distributed operation and supported cluster managers.

0 comments on commit e11a0da

Please sign in to comment.