Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1699: Python relative independent, becomes a subproject #631

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -173,6 +168,21 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>python</id>
<dependencies>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>python-api_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down
47 changes: 0 additions & 47 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>pyrolite</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down Expand Up @@ -294,48 +289,6 @@
</environmentVariables>
</configuration>
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
22 changes: 2 additions & 20 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import akka.actor._
import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
Expand Down Expand Up @@ -67,15 +66,14 @@ class SparkEnv (
// A mapping of thread ID to amount of memory used for shuffle in bytes
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
val closeables = mutable.ListBuffer[java.io.Closeable]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
closeables.toList.foreach(_.close())
httpFileServer.stop()
mapOutputTracker.stop()
shuffleFetcher.stop()
Expand All @@ -89,22 +87,6 @@ class SparkEnv (
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()
}

private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}

private[spark]
def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers(key).stop()
}
}
}

object SparkEnv extends Logging {
Expand Down
40 changes: 29 additions & 11 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ SPARK_HADOOP_VERSION=1.0.4
SPARK_YARN=false
SPARK_HIVE=false
SPARK_TACHYON=false
SPARK_PYTHON=true
MAKE_TGZ=false
NAME=none

Expand Down Expand Up @@ -105,6 +106,12 @@ else
echo "YARN disabled"
fi

if [ "$SPARK_PYTHON" == "true" ]; then
echo "Python enabled"
else
echo "Python disabled"
fi

if [ "$SPARK_TACHYON" == "true" ]; then
echo "Tachyon Enabled"
else
Expand All @@ -122,22 +129,31 @@ else
MAYBE_HIVE=""
fi

if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then
MAYBE_HADOOP023="-Phadoop-0.23"
else
MAYBE_HADOOP023=""
fi

if [ "$SPARK_YARN" == "true" ]; then
if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then
mvn clean package -DskipTests -Pyarn-alpha -Dhadoop.version=$SPARK_HADOOP_VERSION \
-Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE -Phadoop-0.23
if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." || "$SPARK_HADOOP_VERSION" =~ "2.0." ]]; then
MAYBE_YARN="-Pyarn-alpha -Dyarn.version=$SPARK_HADOOP_VERSION"
else
mvn clean package -DskipTests -Pyarn -Dhadoop.version=$SPARK_HADOOP_VERSION \
-Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE
MAYBE_YARN="-Pyarn -Dyarn.version=$SPARK_HADOOP_VERSION"
fi
else
if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then
mvn clean package -Phadoop-0.23 -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE
else
mvn clean package -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE
fi
MAYBE_YARN=""
fi

if [ "$SPARK_PYTHON" == "true" ]; then
MAYBE_PYTHON="-Ppython"
else
MAYBE_PYTHON=""
fi

mvn clean package -Dhadoop.version=$SPARK_HADOOP_VERSION \
-DskipTests $MAYBE_HIVE $MAYBE_HADOOP023 $MAYBE_YARN $MAYBE_PYTHON

# Make directories
rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/lib"
Expand All @@ -152,9 +168,11 @@ mkdir "$DISTDIR"/conf
cp "$FWDIR"/conf/*.template "$DISTDIR"/conf
cp "$FWDIR"/conf/slaves "$DISTDIR"/conf
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/python" "$DISTDIR"
cp -r "$FWDIR/sbin" "$DISTDIR"

if [ "$SPARK_PYTHON" == "true" ]; then
cp -r "$FWDIR/python" "$DISTDIR"
fi

# Download and copy in tachyon, if requested
if [ "$SPARK_TACHYON" == "true" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package org.apache.spark.mllib.recommendation

import org.jblas._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.api.python.PythonMLLibAPI


/**
Expand Down Expand Up @@ -68,20 +65,6 @@ class MatrixFactorizationModel(
}
}

/**
* :: DeveloperApi ::
* Predict the rating of many users for many products.
* This is a Java stub for python predictAll()
*
* @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
* @return JavaRDD of serialized Rating objects.
*/
def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
val pythonAPI = new PythonMLLibAPI()
val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes))
predict(usersProducts).map(rate => pythonAPI.serializeRating(rate))
}

// TODO: Figure out what other good bulk prediction methods would look like.
// Probably want a way to get the top users for a product or vice-versa.
}
8 changes: 6 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
<module>streaming</module>
<module>sql/catalyst</module>
<module>sql/core</module>
<module>sql/hive</module>
<module>repl</module>
<module>assembly</module>
<module>external/twitter</module>
Expand All @@ -103,6 +102,8 @@
<module>external/zeromq</module>
<module>external/mqtt</module>
<module>examples</module>
<module>sql/hive</module>
<module>python-api</module>
</modules>

<properties>
Expand Down Expand Up @@ -739,6 +740,10 @@
<filereports>${project.build.directory}/SparkTestSuite.txt</filereports>
<argLine>-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
<stderr/>
<environmentVariables>
<SPARK_HOME>${session.executionRootDirectory}</SPARK_HOME>
<SPARK_TESTING>1</SPARK_TESTING>
</environmentVariables>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -952,6 +957,5 @@
</dependency>
</dependencies>
</profile>

</profiles>
</project>
Loading