Skip to content

Commit

Permalink
Handle Double.Nan in json
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 29, 2016
1 parent 27e3b73 commit f6d60df
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,13 @@ trait ProgressReporter extends Logging {

val sourceProgress = sources.map { source =>
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
val inputRecordsPerSecond = if (inputTimeSec > 0) {
numRecords / inputTimeSec
} else {
Double.MaxValue
}
val processedRecordsPerSecond = if (processingTimeSec > 0) {
numRecords / processingTimeSec
} else {
Double.MaxValue
}
new SourceProgress(
description = source.toString,
startOffset = committedOffsets.get(source).map(_.json).orNull,
endOffset = availableOffsets.get(source).map(_.json).orNull,
numInputRows = numRecords,
inputRowsPerSecond = inputRecordsPerSecond,
processedRowsPerSecond = processedRecordsPerSecond
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ class SourceProgress protected[sql](
override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}

("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> JDouble(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> JDouble(processedRowsPerSecond))
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
}

private def tryParse(json: String) = try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,16 @@ class StreamingQueryProgress private[sql](
override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}

("id" -> JString(id.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JInt(timestamp)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> JDouble(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> JDouble(processedRowsPerSecond)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("durationMs" -> durationMs
.asScala
.map { case (k, v) => k -> JInt(v.toLong): JObject }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class StreamingQueryProgressSuite extends SparkFunSuite {
| "timestamp" : 1,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
| "processedRowsPerSecond" : 1.7976931348623157E308,
| "durationMs" : {
| "total" : 0
| },
Expand All @@ -54,17 +53,16 @@ class StreamingQueryProgressSuite extends SparkFunSuite {
| "startOffset" : 123,
| "endOffset" : 456,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
| "processedRowsPerSecond" : 1.7976931348623157E308
| "inputRowsPerSecond" : 10.0
| } ]
|}
""".stripMargin.trim)
assert(parse(json) === testProgress.jsonValue)
assert(compact(parse(json)) === testProgress.json)

}

test("json") {
assert(parse(testProgress.json) === testProgress.jsonValue)
assert(compact(parse(testProgress.json)) === testProgress.json)
}

test("toString") {
Expand All @@ -88,7 +86,7 @@ object StreamingQueryProgressSuite {
endOffset = "456",
numInputRows = 678,
inputRowsPerSecond = 10.0,
processedRowsPerSecond = Double.MaxValue
processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
)
)
)
Expand Down

0 comments on commit f6d60df

Please sign in to comment.