Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
[DCOS-38138] Update Spark CLI for shell-escape fix (#388) (#401)
Browse files Browse the repository at this point in the history
Backport of #388 && #405

This commit makes it possible to pass multi-argument configuration strings in the spark-cli.
  • Loading branch information
samvantran authored Sep 18, 2018
1 parent 08af2a0 commit 1cb4cd6
Show file tree
Hide file tree
Showing 15 changed files with 789 additions and 78 deletions.
2 changes: 1 addition & 1 deletion cli/dcos-spark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func handleCommands(app *kingpin.Application) {

run := app.Command("run", "Submit a job to the Spark Mesos Dispatcher").Action(cmd.runSubmit)
run.Flag("submit-args", fmt.Sprintf("Arguments matching what would be sent to 'spark-submit': %s",
sparkSubmitHelp())).Required().PlaceHolder("ARGS").StringVar(&cmd.submitArgs)
sparkSubmitHelp())).Required().PlaceHolder("\"ARGS\"").StringVar(&cmd.submitArgs)
// TODO this should be moved to submit args
run.Flag("docker-image", "Docker image to run the job within").
Default("").
Expand Down
149 changes: 81 additions & 68 deletions cli/dcos-spark/submit_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
"log"
"net/url"
"os"
"regexp"
"strings"

"github.com/mattn/go-shellwords"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
)

var keyWhitespaceValPattern = regexp.MustCompile("(.+)\\s+(.+)")
Expand Down Expand Up @@ -146,8 +148,10 @@ Args:
StringVar(&args.mainClass) // note: spark-submit can autodetect, but only for file://local.jar
submit.Flag("properties-file", "Path to file containing whitespace-separated Spark property defaults.").
PlaceHolder("PATH").ExistingFileVar(&args.propertiesFile)
submit.Flag("conf", "Custom Spark configuration properties.").
PlaceHolder("PROP=VALUE").StringMapVar(&args.properties)
submit.Flag("conf", "Custom Spark configuration properties. "+
"If submitting properties with multiple values, "+
"wrap in single quotes e.g. --conf prop='val1 val2'").
PlaceHolder("prop=value").StringMapVar(&args.properties)
submit.Flag("kerberos-principal", "Principal to be used to login to KDC.").
PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal)
submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers").
Expand Down Expand Up @@ -280,75 +284,84 @@ func parseApplicationFile(args *sparkArgs) error {
return nil
}

func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {

// collapse two or more spaces to one.
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// we use Kingpin to parse CLI commands and options
// spark-submit by convention uses '--arg val' while kingpin only supports --arg=val
// transformSubmitArgs turns the former into the latter
func transformSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
// HACK: spark-submit uses '--arg val' by convention, while kingpin only supports '--arg=val'.
// translate the former into the latter for kingpin to parse.
args := strings.Split(argsCleaned, " ")
argsEquals := make([]string, 0)
appFlags := make([]string, 0)
i := 0
ARGLOOP:
for i < len(args) {
arg := args[i]
if !strings.HasPrefix(arg, "-") {
// looks like we've exited the flags entirely, and are now at the jar and/or args.
// any arguments without a dash at the front should've been joined to preceding keys.
// flush the rest and exit.
for i < len(args) {
arg = args[i]
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
// if it's not of the format --flag=val which scopt allows
if strings.HasPrefix(arg, "-") {
appFlags = append(appFlags, arg)
if strings.Contains(arg, "=") || (i+1) >= len(args) {
i += 1
} else {
// if there's a value with this flag, add it
if !strings.HasPrefix(args[i+1], "-") {
appFlags = append(appFlags, args[i+1])
i += 1
}
i += 1
}
} else {
argsEquals = append(argsEquals, arg)
i += 1
}
argsStr = strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsStr, " "))
// collapse two or more spaces to one
argsStr = collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// parse argsStr into []string args maintaining shell escaped sequences
args, err := shellwords.Parse(argsStr)
if err != nil {
log.Fatalf("Could not parse string args correctly. Error: %v", err)
}
sparkArgs, appArgs := make([]string, 0), make([]string, 0)
LOOP:
for i := 0; i < len(args); {
current := strings.TrimSpace(args[i])
switch {
// The main assumption with --submit-args is that all spark-submit flags come before the spark jar URL
// if current is a spark jar/app, we've processed all flags
case isSparkApp(current):
sparkArgs = append(sparkArgs, args[i])
appArgs = append(appArgs, args[i+1:]...)
break LOOP
case strings.HasPrefix(current, "--"):
if isBoolFlag(boolVals, current) {
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
break
}
// join this arg to the next arg if...:
// 1. we're not at the last arg in the array
// 2. we start with "--"
// 3. we don't already contain "=" (already joined)
// 4. we aren't a boolean value (no val to join)
if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") {
// check for boolean:
for _, boolVal := range boolVals {
if boolVal.flagName == arg[2:] {
argsEquals = append(argsEquals, arg)
i += 1
continue ARGLOOP
}
if strings.Contains(current, "=") {
// already in the form arg=val, no merge required
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
// merge this --key against the following val to get --key=val
argsEquals = append(argsEquals, arg+"="+args[i+1])
// otherwise, merge current with next into form arg=val; eg --driver-memory=512m
next := args[i+1]
sparkArgs = append(sparkArgs, current+"="+next)
i += 2
} else {
// already joined or at the end, pass through:
argsEquals = append(argsEquals, arg)
i += 1
default:
// if not a flag or jar, current is a continuation of the last arg and should not have been split
// eg extraJavaOptions="-Dparam1 -Dparam2" was parsed as [extraJavaOptions, -Dparam1, -Dparam2]
combined := sparkArgs[len(sparkArgs)-1] + " " + current
sparkArgs = append(sparkArgs[:len(sparkArgs)-1], combined)
i++
}
}
client.PrintVerbose("Translated spark-submit arguments: '%s'", argsEquals)
client.PrintVerbose("Translated application arguments: '%s'", appFlags)
if config.Verbose {
client.PrintVerbose("Translated spark-submit arguments: '%s'", strings.Join(sparkArgs, ", "))
client.PrintVerbose("Translated application arguments: '%s'", strings.Join(appArgs, ", "))
}
return sparkArgs, appArgs
}

return argsEquals, appFlags
var acceptedSparkAppExtensions = []string{
".jar",
".py",
".R",
}

func isSparkApp(str string) bool {
for _, ext := range acceptedSparkAppExtensions {
if strings.HasSuffix(str, ext) {
return true
}
}
return false
}

// check if string is a boolean flag (eg --supervise)
func isBoolFlag(boolVals []*sparkVal, str string) bool {
for _, boolVal := range boolVals {
if boolVal.flagName == str[2:] {
return true
}
}
return false
}

func getValsFromPropertiesFile(path string) map[string]string {
Expand Down Expand Up @@ -416,7 +429,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
// then map flags
submit, args := sparkSubmitArgSetup() // setup
// convert and get application flags, add them to the args passed to the spark app
submitArgs, appFlags := cleanUpSubmitArgs(cmd.submitArgs, args.boolVals)
submitArgs, appFlags := transformSubmitArgs(cmd.submitArgs, args.boolVals)
args.appArgs = append(args.appArgs, appFlags...)
_, err := submit.Parse(submitArgs)

Expand Down
53 changes: 49 additions & 4 deletions cli/dcos-spark/submit_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"os"
"testing"
Expand Down Expand Up @@ -37,10 +38,10 @@ func TestCliTestSuite(t *testing.T) {
}

// test spaces
func (suite *CliTestSuite) TestCleanUpSubmitArgs() {
func (suite *CliTestSuite) TestTransformSubmitArgs() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf spark.app.name=kerberosStreaming --conf spark.cores.max=8"
submitArgs, _ := cleanUpSubmitArgs(inputArgs, args.boolVals)
inputArgs := "--conf spark.app.name=kerberosStreaming --conf spark.cores.max=8 main.jar 100"
submitArgs, _ := transformSubmitArgs(inputArgs, args.boolVals)
if "--conf=spark.app.name=kerberosStreaming" != submitArgs[0] {
suite.T().Errorf("Failed to reduce spaces while cleaning submit args.")
}
Expand All @@ -50,12 +51,54 @@ func (suite *CliTestSuite) TestCleanUpSubmitArgs() {
}
}

func (suite *CliTestSuite) TestTransformSubmitArgsWithMulitpleValues() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf spark.driver.extraJavaOptions='-XX:+PrintGC -Dparam1=val1 -Dparam2=val2' main.py 100"
expected := "--conf=spark.driver.extraJavaOptions=-XX:+PrintGC -Dparam1=val1 -Dparam2=val2"
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual[0])
}

func (suite *CliTestSuite) TestTransformSubmitArgsWithSpecialCharacters() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf spark.driver.extraJavaOptions='-Dparam1=\"val 1?\" -Dparam2=val\\ 2! -Dmulti.dot.param3=\"val\\ 3\" -Dpath=$PATH' main.py 100"
expected := "--conf=spark.driver.extraJavaOptions=-Dparam1=\"val 1?\" -Dparam2=val\\ 2! -Dmulti.dot.param3=\"val\\ 3\" -Dpath=$PATH"
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual[0])
}

func (suite *CliTestSuite) TestTransformSubmitArgsConfsAlreadyHasEquals() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf=spark.driver.extraJavaOptions='-Dparam1=val1' main.py 100"
expected := "--conf=spark.driver.extraJavaOptions=-Dparam1=val1"
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual[0])
}

func (suite *CliTestSuite) TestTransformSubmitArgsMultilines() {
_, args := sparkSubmitArgSetup()
inputArgs := `--conf spark.driver.extraJavaOptions='-XX:+PrintGC -XX:+PrintGCTimeStamps' \
--supervise --driver-memory 1g \
main.py 100`
expected := []string{"--conf=spark.driver.extraJavaOptions=-XX:+PrintGC -XX:+PrintGCTimeStamps", "--supervise", "--driver-memory=1g", "main.py"}
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual)
}

func (suite *CliTestSuite) TestIsSparkApp() {
assert.True(suite.T(), isSparkApp("mainApp.jar"))
assert.True(suite.T(), isSparkApp("pythonFile.py"))
assert.True(suite.T(), isSparkApp("file.R"))
assert.False(suite.T(), isSparkApp("app.c"))
assert.False(suite.T(), isSparkApp("randomFlag"))
}

// test scopts pattern for app args when have full submit args
func (suite *CliTestSuite) TestScoptAppArgs() {
_, args := sparkSubmitArgSetup()
inputArgs := `--driver-cores 1 --conf spark.cores.max=1 --driver-memory 512M
--class org.apache.spark.examples.SparkPi http://spark-example.jar --input1 value1 --input2 value2`
submitArgs, appFlags := cleanUpSubmitArgs(inputArgs, args.boolVals)
submitArgs, appFlags := transformSubmitArgs(inputArgs, args.boolVals)

if "--input1" != appFlags[0] {
suite.T().Errorf("Failed to parse app args.")
Expand Down Expand Up @@ -91,6 +134,7 @@ func (suite *CliTestSuite) TestPayloadSimple() {
"--driver-cores %s "+
"--conf spark.cores.max=%s "+
"--driver-memory %s "+
"--conf spark.driver.extraJavaOptions='-XX:+PrintGC -Dparam1=val1 -Dparam2=val2' "+
"--class %s "+
"%s --input1 value1 --input2 value2", driverCores, maxCores, driverMemory, mainClass, appJar)

Expand Down Expand Up @@ -124,6 +168,7 @@ func (suite *CliTestSuite) TestPayloadSimple() {
"spark.submit.deployMode": "cluster",
"spark.mesos.driver.labels": fmt.Sprintf("DCOS_SPACE:%s", marathonAppId),
"spark.driver.memory": driverMemory,
"spark.driver.extraJavaOptions": "-XX:+PrintGC -Dparam1=val1 -Dparam2=val2",
"spark.jars": appJar,
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions cli/dcos-spark/vendor/github.com/mattn/go-shellwords/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions cli/dcos-spark/vendor/github.com/mattn/go-shellwords/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1cb4cd6

Please sign in to comment.