diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 2ed155cd8bbbb..b9f1c43daff49 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -674,7 +674,7 @@ by a key. In Scala, these operations are automatically available on RDDs containing [Tuple2](http://www.scala-lang.org/api/{{site.SCALA_VERSION}}/index.html#scala.Tuple2) objects (the built-in tuples in the language, created by simply writing `(a, b)`), as long as you -`import org.apache.spark.SparkContext._` in your program to enable Spark's implicit +import `org.apache.spark.SparkContext._` in your program to enable Spark's implicit conversions. The key-value pair operations are available in the [PairRDDFunctions](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) class, which automatically wraps around an RDD of tuples if you import the conversions. @@ -688,7 +688,7 @@ val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b) {% endhighlight %} -We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally +We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally `counts.collect()` to bring them back to the driver program as an array of objects. @@ -716,11 +716,11 @@ many times each line of text occurs in a file: {% highlight scala %} JavaRDD lines = sc.textFile("data.txt"); -JavaPairRDD pairs = lines.map(s -> new Tuple2(s, 1)); +JavaPairRDD pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD counts = pairs.reduceByKey((a, b) -> a + b); {% endhighlight %} -We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally +We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally `counts.collect()` to bring them back to the driver program as an array of objects. @@ -745,8 +745,8 @@ pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b) {% endhighlight %} -We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally -`counts.collect()` to bring them back to the driver program as an array of objects. +We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally +`counts.collect()` to bring them back to the driver program as a list of objects. @@ -755,7 +755,15 @@ We could also use `counts.sortByKey()`, for example, to sort the pairs by word, ### Transformations -The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details): +The following table lists some of the common transformations supported by Spark. Refer to the +RDD API doc +([Scala](api/scala/index.html#org.apache.spark.rdd.RDD), + [Java](api/java/org/apache/spark/api/java/JavaRDD.html), + [Python](api/python/pyspark.rdd.RDD-class.html)) +and pair RDD functions doc +([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), + [Java](api/java/org/apache/spark/api/java/JavaPairRDD.html)) +for details. @@ -774,12 +782,12 @@ The following tables list the transformations and actions currently supported (s + Iterator<T> => Iterator<U> when running on an RDD of type T. @@ -790,18 +798,23 @@ The following tables list the transformations and actions currently supported (s + + + + - @@ -814,22 +827,47 @@ The following tables list the transformations and actions currently supported (s - + - + + + + + + + + + + + + +
TransformationMeaning
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type - Iterator[T] => Iterator[U] when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of - the partition, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T. + the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
-Note: If you are grouping in order to perform an aggregation (such as a sum or - average) over each key, using reduceByKey or combineByKey will yield much better - performance. -
-Note: By default, if the RDD already has a partitioner, the task number is decided by the partition number of the partitioner, or else relies on the value of spark.default.parallelism if the property is set , otherwise depends on the partition number of the RDD. You can pass an optional numTasks argument to set a different number of tasks. +
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
+ Note: If you are grouping in order to perform an aggregation (such as a sum or + average) over each key, using reduceByKey or combineByKey will yield much better + performance. +
+ Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. + You can pass an optional numTasks argument to set a different number of tasks.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. + Outer joins are also supported through leftOuterJoin and rightOuterJoin. +
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called groupWith. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the + process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently + after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. + This always shuffles all data over the network.
-A complete list of transformations is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD). - ### Actions +The following table lists some of the common transformations supported by Spark. Refer to the +RDD API doc +([Scala](api/scala/index.html#org.apache.spark.rdd.RDD), + [Java](api/java/org/apache/spark/api/java/JavaRDD.html), + [Python](api/python/pyspark.rdd.RDD-class.html)) +and pair RDD functions doc +([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), + [Java](api/java/org/apache/spark/api/java/JavaPairRDD.html)) +for details. + @@ -856,17 +894,27 @@ A complete list of transformations is available in the [RDD API doc](api/scala/i + + + + - - + + + + + + - + @@ -874,31 +922,29 @@ A complete list of transformations is available in the [RDD API doc](api/scala/i
ActionMeaning
takeSample(withReplacement, num, seed) Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also + available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using + SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key. Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)
-A complete list of actions is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD). - ## RDD Persistence One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory -across operations. When you persist an RDD, each node stores any slices of it that it computes in +across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows -future actions to be much faster (often by more than 10x). Caching is a key tool for building -iterative algorithms with Spark and for interactive use from the interpreter. +future actions to be much faster (often by more than 10x). Caching is a key tool for +iterative algorithms and fast interactive use. You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time -it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- +it is computed in an action, it will be kept in memory on the nodes. Spark's cache is fault-tolerant -- if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. In addition, each persisted RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in [Tachyon](http://tachyon-project.org/). -These levels are chosen by passing a +These levels are set by passing a `StorageLevel` object ([Scala](api/scala/index.html#org.apache.spark.storage.StorageLevel), [Java](api/java/org/apache/spark/storage/StorageLevel.html), [Python](api/python/pyspark.storagelevel.StorageLevel-class.html)) to `persist()`. The `cache()` method is a shorthand for using the default storage level, -which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of -available storage levels is: +which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The full set of +storage levels is: @@ -942,9 +988,9 @@ available storage levels is:
Storage LevelMeaning
-**Note:** In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level. +**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.* -Spark also automatically persists intermediate results in shuffle operatons (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` if they plan to re-use an RDD iteratively. +Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it. ### Which Storage Level to Choose? @@ -958,7 +1004,7 @@ efficiency. We recommend going through the following process to select one: make the objects much more space-efficient, but still reasonably fast to access. * Don't spill to disk unless the functions that computed your datasets are expensive, or they filter -a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from +a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk. * Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve @@ -972,6 +1018,12 @@ mode has several advantages: * It significantly reduces garbage collection costs. * Cached data is not lost if individual executors crash. +### Removing Data + +Spark automatically monitors cache usage on each node and drops out old data partitions in a +least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for +it to fall out of the cache, use the `RDD.unpersist()` method. + # Shared Variables Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a @@ -1044,7 +1096,7 @@ MapReduce) or sums. Spark natively supports accumulators of numeric types, and p can add support for new types. An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks -running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala / Python). +running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method. @@ -1200,10 +1252,21 @@ cluster mode. The cluster location will be found based on HADOOP_CONF_DIR. # Where to Go from Here You can see some [example Spark programs](http://spark.apache.org/examples.html) on the Spark website. -In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `bin/run-example` script included in Spark; for example: +In addition, Spark includes several samples in the `examples` directory +([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples), + [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples), + [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python)). +Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what was changed to make the program run on a cluster. +You can run Java and Scala examples by passing the class name to Spark's `bin/run-example` script; for instance: ./bin/run-example SparkPi -For help on optimizing your program, the [configuration](configuration.html) and +For Python examples, use `spark-submit` instead: + + ./bin/spark-submit examples/src/main/python/pi.py + +For help on optimizing your programs, the [configuration](configuration.html) and [tuning](tuning.html) guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. +For help on deploying, the [cluster mode overview](cluster-overview.html) describes the components involved +in distributed operation and supported cluster managers.