Skip to content

Commit

Permalink
[SPARK-14231] [SQL] JSON data source infers floating-point values as …
Browse files Browse the repository at this point in the history
…a double when they do not fit in a decimal

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-14231

Currently, JSON data source supports to infer `DecimalType` for big numbers and `floatAsBigDecimal` option which reads floating-point values as `DecimalType`.

But there are few restrictions in Spark `DecimalType` below:

1. The precision cannot be bigger than 38.
2. scale cannot be bigger than precision.

Currently, both restrictions are not being handled.

This PR handles the cases by inferring them as `DoubleType`. Also, the option name was changed from `floatAsBigDecimal` to `prefersDecimal` as suggested [here](https://issues.apache.org/jira/browse/SPARK-14231?focusedCommentId=15215579&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15215579).

So, the codes below:

```scala
def doubleRecords: RDD[String] =
  sqlContext.sparkContext.parallelize(
    s"""{"a": 1${"0" * 38}, "b": 0.01}""" ::
    s"""{"a": 2${"0" * 38}, "b": 0.02}""" :: Nil)

val jsonDF = sqlContext.read
  .option("prefersDecimal", "true")
  .json(doubleRecords)
jsonDF.printSchema()
```

produces below:

- **Before**

```scala
org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1).;
	at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:44)
	at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:144)
	at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:108)
	at
...
```

- **After**

```scala
root
 |-- a: double (nullable = true)
 |-- b: double (nullable = true)
```

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <[email protected]>

Closes #12030 from HyukjinKwon/SPARK-14231.
  • Loading branch information
HyukjinKwon authored and davies committed Apr 3, 2016
1 parent 7be4620 commit 2262a93
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 14 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def json(self, path, schema=None):
You can set the following JSON-specific options to deal with non-standard JSON files:
* ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
type
* `floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal \
type
* `prefersDecimal` (default `false`): infers all floating-point values as a decimal \
type. If the values do not fit in decimal, then it infers them as doubles.
* ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
* ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*
* You can set the following JSON-specific options to deal with non-standard JSON files:
* <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
* <li>`floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal
* type</li>
* <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
* type. If the values do not fit in decimal, then it infers them as doubles.</li>
* <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
* <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
* <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils


private[sql] object InferSchema {

/**
Expand Down Expand Up @@ -135,14 +134,20 @@ private[sql] object InferSchema {
// when we see a Java BigInteger, we use DecimalType.
case BIG_INTEGER | BIG_DECIMAL =>
val v = parser.getDecimalValue
DecimalType(v.precision(), v.scale())
case FLOAT | DOUBLE =>
if (configOptions.floatAsBigDecimal) {
val v = parser.getDecimalValue
DecimalType(v.precision(), v.scale())
if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
DecimalType(Math.max(v.precision(), v.scale()), v.scale())
} else {
DoubleType
}
case FLOAT | DOUBLE if configOptions.prefersDecimal =>
val v = parser.getDecimalValue
if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
DecimalType(Math.max(v.precision(), v.scale()), v.scale())
} else {
DoubleType
}
case FLOAT | DOUBLE =>
DoubleType
}

case VALUE_TRUE | VALUE_FALSE => BooleanType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private[sql] class JSONOptions(
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
val primitivesAsString =
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
val floatAsBigDecimal =
parameters.get("floatAsBigDecimal").map(_.toBoolean).getOrElse(false)
val prefersDecimal =
parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
val allowComments =
parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
val allowUnquotedFieldNames =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}

test("Loading a JSON dataset floatAsBigDecimal returns schema with float types as BigDecimal") {
val jsonDF = sqlContext.read.option("floatAsBigDecimal", "true").json(primitiveFieldAndType)
test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") {
val jsonDF = sqlContext.read.option("prefersDecimal", "true").json(primitiveFieldAndType)

val expectedSchema = StructType(
StructField("bigInteger", DecimalType(20, 0), true) ::
Expand All @@ -773,6 +773,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}

test("Infer big integers correctly even when it does not fit in decimal") {
val jsonDF = sqlContext.read
.json(bigIntegerRecords)

// The value in `a` field will be a double as it does not fit in decimal. For `b` field,
// it will be a decimal as `92233720368547758070`.
val expectedSchema = StructType(
StructField("a", DoubleType, true) ::
StructField("b", DecimalType(20, 0), true) :: Nil)

assert(expectedSchema === jsonDF.schema)
checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070")))
}

test("Infer floating-point values correctly even when it does not fit in decimal") {
val jsonDF = sqlContext.read
.option("prefersDecimal", "true")
.json(floatingValueRecords)

// The value in `a` field will be a double as it does not fit in decimal. For `b` field,
// it will be a decimal as `0.01` by having a precision equal to the scale.
val expectedSchema = StructType(
StructField("a", DoubleType, true) ::
StructField("b", DecimalType(2, 2), true):: Nil)

assert(expectedSchema === jsonDF.schema)
checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01)))

val mergedJsonDF = sqlContext.read
.option("prefersDecimal", "true")
.json(floatingValueRecords ++ bigIntegerRecords)

val expectedMergedSchema = StructType(
StructField("a", DoubleType, true) ::
StructField("b", DecimalType(22, 2), true):: Nil)

assert(expectedMergedSchema === mergedJsonDF.schema)
checkAnswer(
mergedJsonDF,
Row(1.0E-39D, BigDecimal(0.01)) ::
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}

test("Loading a JSON dataset from a text file with SQL") {
val dir = Utils.createTempDir()
dir.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ private[json] trait TestJsonData {
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)

def floatingValueRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)

def bigIntegerRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)

lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)

def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
Expand Down

0 comments on commit 2262a93

Please sign in to comment.