Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-3741
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/network/nio/Connection.scala
	core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
  • Loading branch information
zsxwing committed Oct 9, 2014
2 parents 764aec5 + 35afdfd commit 0b8a61c
Show file tree
Hide file tree
Showing 276 changed files with 7,063 additions and 2,631 deletions.
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,5 +349,15 @@
</plugins>
</build>
</profile>
<profile>
<id>kinesis-asl</id>
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${commons.httpclient.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
2 changes: 1 addition & 1 deletion bagel/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
8 changes: 7 additions & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"

rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%

if not "x%SPARK_CONF_DIR%"=="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
) else (
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
)

if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
Expand Down
8 changes: 7 additions & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

. "$FWDIR"/bin/load-spark-env.sh

CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"

# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
if [ -n "$SPARK_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR"
else
CLASSPATH="$CLASSPATH:$FWDIR/conf"
fi

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

Expand Down
24 changes: 12 additions & 12 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,20 @@ fi

# Figure out which Python executable to use
if [[ -z "$PYSPARK_PYTHON" ]]; then
PYSPARK_PYTHON="python"
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON="ipython"
else
PYSPARK_PYTHON="python"
fi
fi
export PYSPARK_PYTHON

if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
Expand All @@ -64,11 +74,6 @@ export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
export PYTHONSTARTUP="$FWDIR/python/pyspark/shell.py"

# If IPython options are specified, assume user wants to run IPython
if [[ -n "$IPYTHON_OPTS" ]]; then
IPYTHON=1
fi

# Build up arguments list manually to preserve quotes and backslashes.
# We export Spark submit arguments as an environment variable because shell.py must run as a
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
Expand Down Expand Up @@ -106,10 +111,5 @@ if [[ "$1" =~ \.py$ ]]; then
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON"
fi
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
fi
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
)
if [%FOUND_JAR%] == [0] (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
echo You need to build Spark before running this program.
goto exit
)
:skip_build_test
Expand Down
2 changes: 1 addition & 1 deletion bin/run-example2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ if exist "%FWDIR%RELEASE" (
)
if "x%SPARK_EXAMPLES_JAR%"=="x" (
echo Failed to find Spark examples assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
echo You need to build Spark before running this program.
goto exit
)

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
echo "You need to build spark before running $1." 1>&2
echo "You need to build Spark before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
)
if "%FOUND_JAR%"=="0" (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
echo You need to build Spark before running this program.
goto exit
)
:skip_build_test
Expand Down
12 changes: 1 addition & 11 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down Expand Up @@ -53,13 +52,4 @@ source "$FWDIR"/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

"$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
exit_status=$?

if [[ exit_status -eq CLASS_NOT_FOUND_EXIT_STATUS ]]; then
echo
echo "Failed to load Spark SQL CLI main class $CLASS."
echo "You need to build Spark with -Phive."
fi

exit $exit_status
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
2 changes: 1 addition & 1 deletion bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

# Gather all all spark-submit options into SUBMISSION_OPTS
# Gather all spark-submit options into SUBMISSION_OPTS
function gatherSparkSubmitOpts() {

if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then
Expand Down
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,17 @@
</tasks>
</configuration>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${basedir}/../python/build</directory>
</fileset>
</filesets>
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ table.sortable thead {
cursor: pointer;
}

table.sortable td {
word-wrap: break-word;
max-width: 600px;
}

.progress {
margin-bottom: 0px; position: relative
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)

/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
def jobIds: Seq[Int]

}


Expand Down Expand Up @@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
}

/** Get the corresponding job id for this action. */
def jobId = jobWaiter.jobId
def jobIds = Seq(jobWaiter.jobId)
}


Expand All @@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// is cancelled before the action was even run (and thus we have no thread to interrupt).
@volatile private var _cancelled: Boolean = false

@volatile private var jobs: Seq[Int] = Nil

// A promise used to signal the future.
private val p = promise[T]()

Expand Down Expand Up @@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}

this.jobs = jobs ++ job.jobIds

// Wait for the job to complete. If the action is cancelled (with an interrupt),
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
Expand Down Expand Up @@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
override def isCompleted: Boolean = p.isCompleted

override def value: Option[Try[T]] = p.future.value

def jobIds = jobs

}
29 changes: 1 addition & 28 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}

private[spark] object MapOutputTracker {
private val LOG_BASE = 1.1

// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
Expand Down Expand Up @@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
(status.location, status.getSizeForBlock(reduceId))
}
}
}

/**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support
* sizes up to 35 GB with at most 10% error.
*/
def compressSize(size: Long): Byte = {
if (size == 0) {
0
} else if (size <= 1L) {
1
} else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
}
}

/**
* Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
*/
def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) {
0
} else {
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
}
}
}
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* If its acting as a client and trying to send a message to another ConnectionManager,
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake.
* and waits for the response from the server and does the handshake before sending
* the real message.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
Expand Down
Loading

0 comments on commit 0b8a61c

Please sign in to comment.