Skip to content


[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operati…
Browse files Browse the repository at this point in the history

... that do not change schema

Author: Kan Zhang <[email protected]>

Closes #448 from kanzhang/SPARK-1460 and squashes the following commits:

111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD
91dc787 [Kan Zhang] Taking into account newly added Ordering param
79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
  • Loading branch information
kanzhang authored and pwendell committed May 7, 2014
1 parent 3eb53bd commit 967635a
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 22 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
@transient var name: String = null

/** Assign a name to this RDD */
def setName(_name: String): RDD[T] = {
def setName(_name: String): this.type = {
name = _name
Expand All @@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
def persist(newLevel: StorageLevel): RDD[T] = {
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
Expand All @@ -152,18 +152,18 @@ abstract class RDD[T: ClassTag](

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
def cache(): this.type = persist()

* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
def unpersist(blocking: Boolean = true): RDD[T] = {
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
Expand Down
10 changes: 2 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](

override def collect(): Array[Edge[ED]] =

override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
override def persist(newLevel: StorageLevel): this.type = {

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()

override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
override def unpersist(blocking: Boolean = true): this.type = {
Expand Down
10 changes: 2 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
override protected def getPreferredLocations(s: Partition): Seq[String] =

override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
override def persist(newLevel: StorageLevel): this.type = {

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexRDD[VD] = persist()

override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
override def unpersist(blocking: Boolean = true): this.type = {
Expand Down
2 changes: 2 additions & 0 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ object MimaBuild {
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("graphx.EdgeRDD") ++
excludeSparkClass("graphx.VertexRDD") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,35 @@ def getCheckpointFile(self):
return None

def coalesce(self, numPartitions, shuffle=False):
rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
return SchemaRDD(rdd, self.sql_ctx)

def distinct(self):
rdd = self._jschema_rdd.distinct()
return SchemaRDD(rdd, self.sql_ctx)

def intersection(self, other):
if (other.__class__ is SchemaRDD):
rdd = self._jschema_rdd.intersection(other._jschema_rdd)
return SchemaRDD(rdd, self.sql_ctx)
raise ValueError("Can only intersect with another SchemaRDD")

def repartition(self, numPartitions):
rdd = self._jschema_rdd.repartition(numPartitions)
return SchemaRDD(rdd, self.sql_ctx)

def subtract(self, other, numPartitions=None):
if (other.__class__ is SchemaRDD):
if numPartitions is None:
rdd = self._jschema_rdd.subtract(other._jschema_rdd)
rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
return SchemaRDD(rdd, self.sql_ctx)
raise ValueError("Can only subtract another SchemaRDD")

def _test():
import doctest
from pyspark.context import SparkContext
Expand Down
67 changes: 66 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.spark.sql

import net.razorvine.pickle.Pickler

import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import java.util.{Map => JMap}

Expand Down Expand Up @@ -296,6 +298,13 @@ class SchemaRDD(
def toSchemaRDD = this

* Returns this RDD as a JavaSchemaRDD.
* @group schema
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)

private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] =
this.mapPartitions { iter =>
Expand All @@ -314,4 +323,60 @@ class SchemaRDD(

* Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
* of base RDD functions that do not change schema.
* @param rdd RDD derived from this one and has same schema
* @group schema
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))

// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================

// Transformations (return a new RDD)

override def coalesce(numPartitions: Int, shuffle: Boolean = false)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.coalesce(numPartitions, shuffle)(ord))

override def distinct(): SchemaRDD =

override def distinct(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =

override def filter(f: Row => Boolean): SchemaRDD =

override def intersection(other: RDD[Row]): SchemaRDD =

override def intersection(other: RDD[Row], partitioner: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.intersection(other, partitioner)(ord))

override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.intersection(other, numPartitions))

override def repartition(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =

override def subtract(other: RDD[Row]): SchemaRDD =

override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.subtract(other, numPartitions))

override def subtract(other: RDD[Row], p: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.subtract(other, p)(ord))
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@


import org.apache.spark.Partitioner
import{JavaRDDLike, JavaRDD}
import{Function => JFunction}
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.rdd.RDD

* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
Expand All @@ -45,4 +48,141 @@ class JavaSchemaRDD(
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)

val rdd = Row(_))

override def toString: String = baseSchemaRDD.toString

// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================

// Common RDD functions

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaSchemaRDD = {

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): JavaSchemaRDD = {

* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
def persist(newLevel: StorageLevel): JavaSchemaRDD = {

* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
def unpersist(blocking: Boolean = true): JavaSchemaRDD = {

/** Assign a name to this RDD */
def setName(name: String): JavaSchemaRDD = {

// Transformations (return a new RDD)

* Return a new RDD that is reduced into `numPartitions` partitions.
def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD

* Return a new RDD containing the distinct elements in this RDD.
def distinct(): JavaSchemaRDD =

* Return a new RDD containing the distinct elements in this RDD.
def distinct(numPartitions: Int): JavaSchemaRDD =

* Return a new RDD containing only the elements that satisfy a predicate.
def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
baseSchemaRDD.filter(x => Row(x)).booleanValue()).toJavaSchemaRDD

* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did.
* Note that this method performs a shuffle internally.
def intersection(other: JavaSchemaRDD): JavaSchemaRDD =

* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did.
* Note that this method performs a shuffle internally.
* @param partitioner Partitioner to use for the resulting RDD
def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD

* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
* Note that this method performs a shuffle internally.
* @param numPartitions How many partitions to use in the resulting RDD
def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD

* Return a new RDD that has exactly `numPartitions` partitions.
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
def repartition(numPartitions: Int): JavaSchemaRDD =

* Return an RDD with the elements from `this` that are not in `other`.
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
def subtract(other: JavaSchemaRDD): JavaSchemaRDD =

* Return an RDD with the elements from `this` that are not in `other`.
def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD

* Return an RDD with the elements from `this` that are not in `other`.
def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD

0 comments on commit 967635a

Please sign in to comment.