Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into event-log-tests
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/util/Utils.scala
  • Loading branch information
andrewor14 committed May 1, 2014
2 parents c3afcea + 55100da commit 2883837
Show file tree
Hide file tree
Showing 24 changed files with 134 additions and 46 deletions.
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
42 changes: 42 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,48 @@
</environmentVariables>
</configuration>
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()

// Redirect the worker's stderr to ours
Expand Down Expand Up @@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()

// Redirect the stderr to ours
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import org.apache.spark.util.{IntParam, MemoryParam, Utils}

/**
* Command-line parser for the master.
* Command-line parser for the worker.
*/
private[spark] class WorkerArguments(args: Array[String]) {
var host = Utils.localHostName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ private[spark] class SecurityMessage() extends Logging {
* @return BufferMessage
*/
def toBufferMessage: BufferMessage = {
val startTime = System.currentTimeMillis
val buffers = new ArrayBuffer[ByteBuffer]()

// 4 bytes for the length of the connectionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub
} else if (outputsMerged == 0) {
new HashMap[T, BoundedDouble]
} else {
val p = outputsMerged.toDouble / totalOutputs
val studentTCacher = new StudentTCacher(confidence)
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.entrySet.iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ object BlockFetcherIterator {
})
bytesInFlight += req.size
val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
val fetchStart = System.currentTimeMillis()
val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
future.onSuccess {
case Some(message) => {
val fetchDone = System.currentTimeMillis()
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
for (blockMessage <- blockMessageArray) {
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.reflect.ClassTag
import scala.util.Try

import com.google.common.io.Files
import org.apache.commons.lang.SystemUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
Expand All @@ -50,7 +49,7 @@ private[spark] object Utils extends Logging {
val random = new Random()

def sparkBin(sparkHome: String, which: String): File = {
val suffix = if (SystemUtils.IS_OS_WINDOWS) ".cmd" else ""
val suffix = if (isWindows) ".cmd" else ""
new File(sparkHome + File.separator + "bin", which + suffix)
}

Expand Down Expand Up @@ -614,7 +613,7 @@ private[spark] object Utils extends Logging {
*/
def isSymlink(file: File): Boolean = {
if (file == null) throw new NullPointerException("File must not be null")
if (SystemUtils.IS_OS_WINDOWS) return false
if (isWindows) return false
val fileInCanonicalDir = if (file.getParent() == null) {
file
} else {
Expand Down Expand Up @@ -1018,7 +1017,7 @@ private[spark] object Utils extends Logging {
throw new IOException("Destination must be relative")
}
var cmdSuffix = ""
val linkCmd = if (SystemUtils.IS_OS_WINDOWS) {
val linkCmd = if (isWindows) {
// refer to http://technet.microsoft.com/en-us/library/cc771254.aspx
cmdSuffix = " /s /e /k /h /y /i"
"cmd /c xcopy "
Expand Down Expand Up @@ -1071,10 +1070,17 @@ private[spark] object Utils extends Logging {
new Path(path)
}

/**
* Return true if this is Windows.
*/
def isWindows = {
Option(System.getProperty("os.name")).exists(_.startsWith("Windows"))
}

/**
* Indicates whether Spark is currently running unit tests.
*/
private[spark] def isTesting = {
def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[spark] object XORShiftRandom {
val xorRand = new XORShiftRandom(seed)

// this is just to warm up the JIT - we're not timing anything
timeIt(1e6.toInt) {
timeIt(million) {
javaRand.nextInt()
xorRand.nextInt()
}
Expand Down
3 changes: 3 additions & 0 deletions docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.

# Running PySpark on YARN

To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".

# Interactive Use

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public String toString() {

public static Tuple3<String, String, String> extractKey(String line) {
Matcher m = apacheLogRegex.matcher(line);
List<String> key = Collections.emptyList();
if (m.find()) {
String ip = m.group(1);
String user = m.group(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
}
});

long oldCount = 0;
long oldCount;
long nextCount = tc.count();
do {
oldCount = nextCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,15 @@ class ALS private (
// Compute the XtX and Xy values for each user by adding products it rated in each product
// block
for (productBlock <- 0 until numBlocks) {
for (p <- 0 until blockFactors(productBlock).length) {
var p = 0
while (p < blockFactors(productBlock).length) {
val x = wrapDoubleArray(blockFactors(productBlock)(p))
tempXtX.fill(0.0)
dspr(1.0, x, tempXtX)
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
for (i <- 0 until us.length) {
if (implicitPrefs) {
if (implicitPrefs) {
var i = 0
while (i < us.length) {
// Extension to the original paper to handle rs(i) < 0. confidence is a function
// of |rs(i)| instead so that it is never negative:
val confidence = 1 + alpha * abs(rs(i))
Expand All @@ -489,11 +491,17 @@ class ALS private (
if (rs(i) > 0) {
SimpleBlas.axpy(confidence, x, userXy(us(i)))
}
} else {
i += 1
}
} else {
var i = 0
while (i < us.length) {
userXtX(us(i)).addi(tempXtX)
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
i += 1
}
}
p += 1
}
}

Expand All @@ -502,7 +510,11 @@ class ALS private (
// Compute the full XtX matrix from the lower-triangular part we got above
fillFullMatrix(userXtX(index), fullXtX)
// Add regularization
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
var i = 0
while (i < rank) {
fullXtX.data(i * rank + i) += lambda
i += 1
}
// Solve the resulting matrix, which is symmetric and positive-definite
if (implicitPrefs) {
Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
Expand Down
3 changes: 3 additions & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.pyc
docs/
pyspark.egg-info
build/
dist/
1 change: 0 additions & 1 deletion python/lib/PY4J_VERSION.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@
Main entry point for accessing data stored in Apache Hive..
"""



import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))


from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
Expand Down
29 changes: 27 additions & 2 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

set_env_vars_for_yarn()

def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
Expand Down Expand Up @@ -70,3 +71,27 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
return gateway

def set_env_vars_for_yarn():
# Add the spark jar, which includes the pyspark files, to the python path
env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
if "PYTHONPATH" in env_map:
env_map["PYTHONPATH"] += ":spark.jar"
else:
env_map["PYTHONPATH"] = "spark.jar"

os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())

def parse_env(env_str):
# Turns a comma-separated of env settings into a dict that maps env vars to
# their values.
env = {}
for var_str in env_str.split(","):
parts = var_str.split("=")
if len(parts) == 2:
env[parts[0]] = parts[1]
elif len(var_str) > 0:
print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
sys.exit(1)

return env
6 changes: 3 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ def pipe(self, command, env={}):
"""
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
['1', '2', '3']
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
['1', '2', '', '3']
"""
def func(iterator):
pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
Expand All @@ -547,7 +547,7 @@ def pipe_objs(out):
out.write(str(obj).rstrip('\n') + '\n')
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
return (x.rstrip('\n') for x in pipe.stdout)
return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
return self.mapPartitions(func)

def foreach(self, f):
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@

from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int


SPARK_HOME = os.environ["SPARK_HOME"]


class PySparkTestCase(unittest.TestCase):

def setUp(self):
Expand Down
3 changes: 3 additions & 0 deletions sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ this="$config_bin/$script"
export SPARK_PREFIX=`dirname "$this"`/..
export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
Loading

0 comments on commit 2883837

Please sign in to comment.