Skip to content

Commit

Permalink
Merge branch 'master' into upload_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 26, 2018
2 parents fd62f61 + e07aee2 commit 58d52b9
Show file tree
Hide file tree
Showing 238 changed files with 9,890 additions and 2,425 deletions.
13 changes: 7 additions & 6 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,24 @@ The text of each license is also included at licenses/LICENSE-[project].txt.

(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
(BSD) JLine (jline:jline:0.9.94 - http://jline.sourceforge.net)
(BSD) JLine (jline:jline:2.14.3 - https://github.com/jline/jline2)
(BSD) ParaNamer Core (com.thoughtworks.paranamer:paranamer:2.3 - http://paranamer.codehaus.org/paranamer)
(BSD) ParaNamer Core (com.thoughtworks.paranamer:paranamer:2.6 - http://paranamer.codehaus.org/paranamer)
(BSD 3 Clause) Scala (http://www.scala-lang.org/download/#License)
(Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.12 - http://www.scala-lang.org/)
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org)
(BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org)
(BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org)
Expand Down
10 changes: 7 additions & 3 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,18 @@ function build {
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}

docker build "${BUILD_ARGS[@]}" \
docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$BASEDOCKERFILE" .

docker build "${BINDING_BUILD_ARGS[@]}" \
docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
}

function push {
docker push "$(image_ref spark)"
docker push "$(image_ref spark-py)"
}

function usage {
Expand All @@ -99,6 +100,7 @@ Options:
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
Expand Down Expand Up @@ -127,14 +129,16 @@ REPO=
TAG=
BASEDOCKERFILE=
PYDOCKERFILE=
while getopts f:mr:t: option
NOCACHEARG=
while getopts f:mr:t:n option
do
case "${option}"
in
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
m)
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,4 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
echo "Using \`mvn\` from path: $MVN_BIN" 1>&2

# Last, call the `mvn` command as usual
${MVN_BIN} -DzincPort=${ZINC_PORT} "$@"
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,15 @@ protected void deallocate() {
}

private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
ByteBuffer buffer = buf.nioBuffer();
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
target.write(buffer) : writeNioBuffer(target, buffer);
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
int written = target.write(buffer);
buf.skipBytes(written);
return written;
}

private int writeNioBuffer(
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int ret = 0;

try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = writeCh.write(buf);
} finally {
buf.limit(originalLimit);
}

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
// must be stored in the same memory page.
// (8 byte key length) (key) (value) (8 byte pointer to next value)
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + uaoSize)) {
return false;
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext

import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.util.ssl.SslContextFactory

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -163,11 +164,16 @@ private[spark] object SSLOptions extends Logging {
* missing in SparkConf, the corresponding setting is used from the default configuration.
*
* @param conf Spark configuration object where the settings are collected from
* @param hadoopConf Hadoop configuration to get settings
* @param ns the namespace name
* @param defaults the default configuration
* @return [[org.apache.spark.SSLOptions]] object
*/
def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
def parse(
conf: SparkConf,
hadoopConf: Configuration,
ns: String,
defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

val port = conf.getWithSubstitution(s"$ns.port").map(_.toInt)
Expand All @@ -179,9 +185,11 @@ private[spark] object SSLOptions extends Logging {
.orElse(defaults.flatMap(_.keyStore))

val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_)))
.orElse(defaults.flatMap(_.keyStorePassword))

val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_)))
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
Expand All @@ -194,6 +202,7 @@ private[spark] object SSLOptions extends Logging {
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_)))
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import java.nio.charset.StandardCharsets.UTF_8
import javax.net.ssl._

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -111,11 +111,14 @@ private[spark] class SecurityManager(
)
}

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
private val defaultSSLOptions =
SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None)

def getSSLOptions(module: String): SSLOptions = {
val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
val opts =
SSLOptions.parse(sparkConf, hadoopConf, s"spark.ssl.$module", Some(defaultSSLOptions))
logDebug(s"Created SSL options for $module: $opts")
opts
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1306,11 +1306,12 @@ class SparkContext(config: SparkConf) extends Logging {

/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty)
val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSet
if (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, nonEmptyRdds)
} else {
new UnionRDD(this, rdds)
new UnionRDD(this, nonEmptyRdds)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ private[spark] object PythonEvalType {
val SQL_SCALAR_PANDAS_UDF = 200
val SQL_GROUPED_MAP_PANDAS_UDF = 201
val SQL_GROUPED_AGG_PANDAS_UDF = 202
val SQL_WINDOW_AGG_PANDAS_UDF = 203

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
case SQL_WINDOW_AGG_PANDAS_UDF => "SQL_WINDOW_AGG_PANDAS_UDF"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class HistoryServer(

attachHandler(ApiRootResource.getServletHandler(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)

val contextHandler = new ServletContextHandler
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
Expand Down Expand Up @@ -152,7 +152,6 @@ class HistoryServer(
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
handlers.synchronized {
ui.getHandlers.foreach(attachHandler)
addFilters(ui.getHandlers, conf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MasterWebUI(
val masterPage = new MasterPage(this)
attachPage(new ApplicationPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
addStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WorkerWebUI(
val logPage = new LogPage(this)
attachPage(logPage)
attachPage(new WorkerPage(this))
attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
addStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE)
attachHandler(createServletHandler("/log",
(request: HttpServletRequest) => logPage.renderLog(request),
worker.securityMgr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,11 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1h")

private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
.internal()
.doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(2000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber

executeTask(
context = context,
config = config,
jobTrackerId = jobTrackerId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = context.attemptNumber,
sparkAttemptNumber = attemptId,
committer = committer,
iterator = iter)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val taskAttemptNumber = TaskContext.get().attemptNumber()
val stageId = TaskContext.get().stageId()
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
val ctx = TaskContext.get()
val canCommit = outputCommitCoordinator.canCommit(ctx.stageId(), ctx.stageAttemptNumber(),
splitId, ctx.attemptNumber())

if (canCommit) {
performCommit()
Expand All @@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber())
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private[scheduler] class BlacklistTracker (

}

private[scheduler] object BlacklistTracker extends Logging {
private[spark] object BlacklistTracker extends Logging {

private val DEFAULT_TIMEOUT = "1h"

Expand Down
Loading

0 comments on commit 58d52b9

Please sign in to comment.