Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jun 21, 2014
2 parents df47265 + 0a432d6 commit 7de5ef9
Show file tree
Hide file tree
Showing 22 changed files with 693 additions and 584 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

def rdd: RDD[T]

/** Set of partitions in this RDD. */
@deprecated("Use partitions() instead.", "1.1.0")
def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)

/** Set of partitions in this RDD. */
def partitions: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)

/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
def context: SparkContext = rdd.context
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark;

import java.io.*;
import java.net.URI;
import java.util.*;

import scala.Tuple2;
Expand Down Expand Up @@ -741,7 +742,7 @@ public void persist() {
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
TaskContext context = new TaskContext(0, 0, 0, false, new TaskMetrics());
Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
}

@Test
Expand All @@ -768,7 +769,7 @@ public void textFiles() throws IOException {
}

@Test
public void wholeTextFiles() throws IOException {
public void wholeTextFiles() throws Exception {
byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");

Expand All @@ -784,7 +785,7 @@ public void wholeTextFiles() throws IOException {
List<Tuple2<String, String>> result = readRDD.collect();

for (Tuple2<String, String> res : result) {
Assert.assertEquals(res._2(), container.get(res._1()));
Assert.assertEquals(res._2(), container.get(new URI(res._1()).getPath()));
}
}

Expand Down
1 change: 1 addition & 0 deletions dev/mima
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#

set -o pipefail
set -e

# Go to the Spark project root directory
FWDIR="$(cd `dirname $0`/..; pwd)"
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,13 @@
<version>3.1</version>
<scope>test</scope>
</dependency>
<!-- Needed by cglib which is needed by easymock. -->
<dependency>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
<version>3.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ object MimaExcludes {
case v if v.startsWith("1.1") =>
Seq(MimaBuild.excludeSparkPackage("graphx")) ++
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
// them, and use the following to tell Mima to not care about them.
Expand Down
10 changes: 6 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ object SparkBuild extends Build {
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymockclassextension" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.9.0" % "test",
"junit" % "junit" % "4.10" % "test"
"junit" % "junit" % "4.10" % "test",
// Needed by cglib which is needed by easymock.
"asm" % "asm" % "3.3.1" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down Expand Up @@ -461,7 +463,7 @@ object SparkBuild extends Build {

def toolsSettings = sharedSettings ++ Seq(
name := "spark-tools",
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v ),
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v),
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
) ++ assemblySettings ++ extraAssemblySettings

Expand Down Expand Up @@ -630,9 +632,9 @@ object SparkBuild extends Build {
scalaVersion := "2.10.4",
retrieveManaged := true,
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
"spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
"spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
"spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
"spark-core").map(sparkPreviousArtifact(_).get intransitive())
)

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
[0, 1, 16, 25]
"""
if partitions == None:
partitions = range(rdd._jrdd.splits().size())
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)

# Implementation note: This is implemented as a mapPartitions followed
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def getNumPartitions(self):
>>> rdd.getNumPartitions()
2
"""
return self._jrdd.splits().size()
return self._jrdd.partitions().size()

def filter(self, f):
"""
Expand Down Expand Up @@ -922,7 +922,7 @@ def take(self, num):
[91, 92, 93]
"""
items = []
totalParts = self._jrdd.splits().size()
totalParts = self._jrdd.partitions().size()
partsScanned = 0

while len(items) < num and partsScanned < totalParts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
}

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext
val sparkContext: SparkContext = self.sparkContext

val sqlContext: SQLContext = self

def numPartitions = self.numShufflePartitions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.SQLContext

/**
* :: DeveloperApi ::
Expand All @@ -41,7 +42,7 @@ case class Aggregate(
partial: Boolean,
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: SparkPlan)(@transient sc: SparkContext)
child: SparkPlan)(@transient sqlContext: SQLContext)
extends UnaryNode with NoBind {

override def requiredChildDistribution =
Expand All @@ -55,7 +56,7 @@ case class Aggregate(
}
}

override def otherCopyArgs = sc :: Nil
override def otherCopyArgs = sqlContext :: Nil

// HACK: Generators don't correctly preserve their output through serializations so we grab
// out child's output attributes statically here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// no predicate can be evaluated by matching hash keys
case logical.Join(left, right, LeftSemi, condition) =>
execution.LeftSemiJoinBNL(
planLater(left), planLater(right), condition)(sparkContext) :: Nil
planLater(left), planLater(right), condition)(sqlContext) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -103,7 +103,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
partial = true,
groupingExpressions,
partialComputation,
planLater(child))(sparkContext))(sparkContext) :: Nil
planLater(child))(sqlContext))(sqlContext) :: Nil
} else {
Nil
}
Expand All @@ -115,7 +115,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, joinType, condition) =>
execution.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil
planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -143,7 +143,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object TakeOrdered extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
execution.TakeOrdered(limit, order, planLater(child))(sparkContext) :: Nil
execution.TakeOrdered(limit, order, planLater(child))(sqlContext) :: Nil
case _ => Nil
}
}
Expand All @@ -155,9 +155,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
InsertIntoParquetTable(table, planLater(child), overwrite)(sqlContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
Expand Down Expand Up @@ -186,7 +186,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
projectList,
filters,
prunePushedDownFilters,
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
ParquetTableScan(_, relation, filters)(sqlContext)) :: Nil

case _ => Nil
}
Expand All @@ -211,7 +211,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
execution.Aggregate(
partial = false, child.output, child.output, planLater(child))(sparkContext) :: Nil
partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
Expand All @@ -224,7 +224,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child))(sparkContext) :: Nil
execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
Expand All @@ -233,9 +233,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row))
execution.ExistingRdd(output, dataAsRdd) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.Limit(limit, planLater(child))(sparkContext) :: Nil
execution.Limit(limit, planLater(child))(sqlContext) :: Nil
case Unions(unionChildren) =>
execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil
execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.NoRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.sql.execution
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -70,12 +71,12 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child:
* :: DeveloperApi ::
*/
@DeveloperApi
case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan {
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output = children.head.output
override def execute() = sc.union(children.map(_.execute()))
override def execute() = sqlContext.sparkContext.union(children.map(_.execute()))

override def otherCopyArgs = sc :: Nil
override def otherCopyArgs = sqlContext :: Nil
}

/**
Expand All @@ -87,11 +88,12 @@ case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends
* data to a single partition to compute the global limit.
*/
@DeveloperApi
case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode {
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)
extends UnaryNode {
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
// partition local limit -> exchange into one partition -> partition local limit again

override def otherCopyArgs = sc :: Nil
override def otherCopyArgs = sqlContext :: Nil

override def output = child.output

Expand All @@ -117,8 +119,8 @@ case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) exte
*/
@DeveloperApi
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sc: SparkContext) extends UnaryNode {
override def otherCopyArgs = sc :: Nil
(@transient sqlContext: SQLContext) extends UnaryNode {
override def otherCopyArgs = sqlContext :: Nil

override def output = child.output

Expand All @@ -129,7 +131,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute() = sc.makeRDD(executeCollect(), 1)
override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}

/**
Expand Down
Loading

0 comments on commit 7de5ef9

Please sign in to comment.