Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into submit-driver-e…
Browse files Browse the repository at this point in the history
…xtra
  • Loading branch information
andrewor14 committed Aug 7, 2014
2 parents 8e552b7 + 47ccd5e commit c13a2cb
Show file tree
Hide file tree
Showing 76 changed files with 1,378 additions and 481 deletions.
29 changes: 7 additions & 22 deletions bin/beeline
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,14 @@
# limitations under the License.
#

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
#
# Shell script for starting BeeLine

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Enter posix mode for bash
set -o posix

# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
exec "$FWDIR/bin/spark-class" $CLASS "$@"
66 changes: 62 additions & 4 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,72 @@
# Enter posix mode for bash
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
function usage {
echo "Usage: ./sbin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|--help"
pattern+="\|======="

$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "CLI options:"
$FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
}

function ensure_arg_number {
arg_number=$1
at_least=$2

if [[ $arg_number -lt $at_least ]]; then
usage
exit 1
fi
}

if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
usage
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
CLI_ARGS=()
SUBMISSION_ARGS=()

while (($#)); do
case $1 in
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=($1); shift
;;

-e)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=(\"$1\"); shift
;;

-s | --silent)
CLI_ARGS+=($1); shift
;;

-v | --verbose)
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
CLI_ARGS+=($1)
SUBMISSION_ARGS+=($1); shift
;;

*)
SUBMISSION_ARGS+=($1); shift
;;
esac
done

eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
var inSparkOpts = true
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r

// Delineates parsing of Spark options from parsing of user options.
parse(opts)
Expand Down Expand Up @@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
verbose = true
parse(tail)

case EQ_SEPARATED_OPT(opt, value) :: tail =>
parse(opt :: value :: tail)

case value :: tail if value.startsWith("-") =>
SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")

case value :: tail =>
if (inSparkOpts) {
value match {
// convert --foo=bar to --foo bar
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
val parts = v.split("=")
parse(Seq(parts(0), parts(1)) ++ tail)
case v if v.startsWith("-") =>
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v
}
inSparkOpts = false
isPython = SparkSubmit.isPython(v)
parse(tail)
primaryResource =
if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
Utils.resolveURI(value).toString
} else {
value
}
} else {
if (!value.isEmpty) {
childArgs += value
}
parse(tail)
}
isPython = SparkSubmit.isPython(value)
childArgs ++= tail

case Nil =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
var shuffleBytesWritten: Long = _
@volatile var shuffleBytesWritten: Long = _

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
var shuffleWriteTime: Long = _
@volatile var shuffleWriteTime: Long = _
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
val security = if (isSecurityNeg) 1 else 0
if (size == 0 && !gotChunkForSendingOnce) {
val newChunk = new MessageChunk(
new MessageChunkHeader(typ, id, 0, 0, ackId, security, senderAddress), null)
new MessageChunkHeader(typ, id, 0, 0, ackId, hasError, security, senderAddress), null)
gotChunkForSendingOnce = true
return Some(newChunk)
}
Expand All @@ -66,7 +66,8 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
}
buffer.position(buffer.position + newBuffer.remaining)
val newChunk = new MessageChunk(new MessageChunkHeader(
typ, id, size, newBuffer.remaining, ackId, security, senderAddress), newBuffer)
typ, id, size, newBuffer.remaining, ackId,
hasError, security, senderAddress), newBuffer)
gotChunkForSendingOnce = true
return Some(newChunk)
}
Expand All @@ -88,7 +89,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
val newBuffer = buffer.slice().limit(chunkSize).asInstanceOf[ByteBuffer]
buffer.position(buffer.position + newBuffer.remaining)
val newChunk = new MessageChunk(new MessageChunkHeader(
typ, id, size, newBuffer.remaining, ackId, security, senderAddress), newBuffer)
typ, id, size, newBuffer.remaining, ackId, hasError, security, senderAddress), newBuffer)
return Some(newChunk)
}
None
Expand Down
Loading

0 comments on commit c13a2cb

Please sign in to comment.