Skip to content

Commit

Permalink
Include server protocol version on mismatch
Browse files Browse the repository at this point in the history
So that a future implementation of client can retry with the
server version.
  • Loading branch information
Andrew Or committed Feb 5, 2015
1 parent 09f873a commit 9fee16f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ private[spark] class ErrorServlet extends StandaloneRestServlet {
protected override def service(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
val path = request.getPathInfo
val parts = path.stripPrefix("/").split("/").toSeq
var versionMismatch = false
var msg =
parts match {
case Nil =>
Expand All @@ -391,15 +391,22 @@ private[spark] class ErrorServlet extends StandaloneRestServlet {
"Missing an action: please specify one of /create, /kill, or /status."
case unknownVersion :: _ =>
// http://host:port/unknown-version/*
// Use a special response code in case the client wants to retry with a different version
response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
versionMismatch = true
s"Unknown protocol version '$unknownVersion'."
case _ =>
// never reached
s"Malformed path $path."
}
msg += s" Please submit requests through http://[host]:[port]/$expectedVersion/submissions/..."
val error = handleError(msg)
// If there is a version mismatch, include the highest protocol version that
// this server supports in case the client wants to retry with our version
if (versionMismatch) {
error.protocolVersion = expectedVersion
response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
} else {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
}
sendResponse(error, response)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
import com.fasterxml.jackson.annotation._
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -136,8 +136,9 @@ private[spark] abstract class SubmitRestProtocolMessage {
private[spark] object SubmitRestProtocolMessage {
private val packagePrefix = this.getClass.getPackage.getName
private val mapper = new ObjectMapper()
.registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.enable(SerializationFeature.INDENT_OUTPUT)
.registerModule(DefaultScalaModule)

/**
* Parse the value of the action field from the given JSON.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ private[spark] abstract class SubmitRestProtocolResponse extends SubmitRestProto
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(serverSparkVersion, "serverSparkVersion")
assertFieldIsSet(success, "success")
assertFieldIsBoolean(success, "success")
}
}
Expand All @@ -36,6 +35,10 @@ private[spark] abstract class SubmitRestProtocolResponse extends SubmitRestProto
*/
private[spark] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(success, "success")
}
}

/**
Expand All @@ -46,6 +49,7 @@ private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse {
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(submissionId, "submissionId")
assertFieldIsSet(success, "success")
}
}

Expand All @@ -61,20 +65,17 @@ private[spark] class SubmissionStatusResponse extends SubmitRestProtocolResponse
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(submissionId, "submissionId")
assertFieldIsSet(success, "success")
}
}

/**
* An error response message used in the REST application submission protocol.
*/
private[spark] class ErrorResponse extends SubmitRestProtocolResponse {

// request was unsuccessful
success = "false"

var protocolVersion: String = null
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(message, "message")
assert(!success.toBoolean, s"The 'success' field must be false in $messageType.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ class SubmitRestProtocolSuite extends FunSuite {
|{
| "action" : "ErrorResponse",
| "message" : "Field not found in submit request: X",
| "serverSparkVersion" : "1.2.3",
| "success": "false"
| "serverSparkVersion" : "1.2.3"
|}
""".stripMargin

Expand Down

0 comments on commit 9fee16f

Please sign in to comment.