Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into handle-configs-…
Browse files Browse the repository at this point in the history
…bash
  • Loading branch information
andrewor14 committed Aug 19, 2014
2 parents 1ea6bbe + cd0720c commit d6488f9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.broadcast

import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
ObjectInputStream, ObjectOutputStream, OutputStream}
import java.io._

import scala.reflect.ClassTag
import scala.util.Random
Expand Down Expand Up @@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](

private val broadcastId = BroadcastBlockId(id)

TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

@transient private var arrayOfBlocks: Array[TorrentBlock] = null
@transient private var totalBlocks = -1
Expand Down Expand Up @@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
// Store meta-info
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)

// Store individual pieces
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}

Expand Down Expand Up @@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0

case None =>
Thread.sleep(500)
}
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0

case None =>
Thread.sleep(500)
}
attemptId -= 1
}

if (totalBlocks == -1) {
return false
}
Expand All @@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}

Expand Down Expand Up @@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}

Expand Down
17 changes: 13 additions & 4 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ if [ -n "$AMPLAB_JENKINS" ]; then
diffs=`git diff --name-only master | grep "^sql/"`
if [ -n "$diffs" ]; then
echo "Detected changes in SQL. Will run Hive test suite."
export _RUN_SQL_TESTS=true # exported for PySpark tests
_RUN_SQL_TESTS=true
fi
fi

Expand Down Expand Up @@ -89,13 +89,22 @@ echo "========================================================================="
echo "Running Spark unit tests"
echo "========================================================================="

# Build Spark; we always build with Hive because the PySpark SparkSQL tests need it.
# echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver "
echo -e "q\n" | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled:
if [ -n "$_RUN_SQL_TESTS" ]; then
SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver"
fi
# echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r,
# echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package assembly/assembly test | \
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS test | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

echo ""
Expand Down
4 changes: 2 additions & 2 deletions docs/mllib-decision-tree.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ Section 9.2.4 in
[Elements of Statistical Machine Learning](http://statweb.stanford.edu/~tibs/ElemStatLearn/) for
details). For example, for a binary classification problem with one categorical feature with three
categories A, B and C with corresponding proportion of label 1 as 0.2, 0.6 and 0.4, the categorical
features are ordered as A followed by C followed B or A, B, C. The two split candidates are A \| C, B
and A , B \| C where \| denotes the split. A similar heuristic is used for multiclass classification
features are ordered as A followed by C followed B or A, C, B. The two split candidates are A \| C, B
and A , C \| B where \| denotes the split. A similar heuristic is used for multiclass classification
when `$2^(M-1)-1$` is greater than the number of bins -- the impurity for each categorical feature value
is used for ordering.

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,7 @@ def _jrdd(self):
self._jrdd_deserializer = NoOpSerializer()
command = (self.func, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
ser = CompressedSerializer(CloudPickleSerializer())
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def main(infile, outfile):
value = ser._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)

command = ser._read_with_length(infile)
command = pickleSer._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
Expand Down
4 changes: 1 addition & 3 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ $PYSPARK_PYTHON --version
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
if [ -n "$_RUN_SQL_TESTS" ]; then
run_test "pyspark/sql.py"
fi
run_test "pyspark/sql.py"
# These tests are included in the module-level docs, and so must
# be handled on a higher level rather than within the python file.
export PYSPARK_DOC_TEST=1
Expand Down

0 comments on commit d6488f9

Please sign in to comment.