Skip to content

Commit

Permalink
Remove sql.util.package introduced in a previous commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jul 11, 2014
1 parent 0266761 commit 43a45e1
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 210 deletions.
6 changes: 3 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def applySchema[A](rdd: RDD[A],schema: StructType, f: A => Row): SchemaRDD =
applySchemaPartitions(rdd, schema, (iter: Iterator[A]) => iter.map(f))
applySchemaToPartitions(rdd, schema, (iter: Iterator[A]) => iter.map(f))

/**
* Creates a [[SchemaRDD]] from an [[RDD]] by applying a schema to this RDD and using a function
* that will be applied to each partition of the RDD to convert RDD records to [[Row]]s.
*
* @group userf
*/
def applySchemaPartitions[A](
def applySchemaToPartitions[A](
rdd: RDD[A],
schema: StructType,
f: Iterator[A] => Iterator[Row]): SchemaRDD =
Expand Down Expand Up @@ -154,7 +154,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
val schema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
applySchemaPartitions(json, schema, JsonRDD.jsonStringToRow(schema, _: Iterator[String]))
applySchemaToPartitions(json, schema, JsonRDD.jsonStringToRow(schema, _: Iterator[String]))
}

/**
Expand Down
66 changes: 44 additions & 22 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.Logging
import org.apache.spark.sql.util

private[sql] object JsonRDD extends Logging {

Expand Down Expand Up @@ -271,12 +270,29 @@ private[sql] object JsonRDD extends Logging {
}
}

private def toDecimalValue: PartialFunction[Any, BigDecimal] = {
def bigIntegerToDecimalValue: PartialFunction[Any, BigDecimal] = {
case v: java.math.BigInteger => BigDecimal(v)
private def toLong(value: Any): Long = {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toLong
case value: java.lang.Long => value.asInstanceOf[Long]
}
}

bigIntegerToDecimalValue orElse util.toDecimalValue
private def toDouble(value: Any): Double = {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toDouble
case value: java.lang.Long => value.asInstanceOf[Long].toDouble
case value: java.lang.Double => value.asInstanceOf[Double]
}
}

private def toDecimal(value: Any): BigDecimal = {
value match {
case value: java.lang.Integer => BigDecimal(value)
case value: java.lang.Long => BigDecimal(value)
case value: java.math.BigInteger => BigDecimal(value)
case value: java.lang.Double => BigDecimal(value)
case value: java.math.BigDecimal => BigDecimal(value)
}
}

private def toJsonArrayString(seq: Seq[Any]): String = {
Expand All @@ -287,7 +303,7 @@ private[sql] object JsonRDD extends Logging {
element =>
if (count > 0) builder.append(",")
count += 1
builder.append(toStringValue(element))
builder.append(toString(element))
}
builder.append("]")

Expand All @@ -302,31 +318,37 @@ private[sql] object JsonRDD extends Logging {
case (key, value) =>
if (count > 0) builder.append(",")
count += 1
builder.append(s"""\"${key}\":${toStringValue(value)}""")
builder.append(s"""\"${key}\":${toString(value)}""")
}
builder.append("}")

builder.toString()
}

private def toStringValue: PartialFunction[Any, String] = {
def complexValueToStringValue: PartialFunction[Any, String] = {
case v: Map[String, Any] => toJsonObjectString(v)
case v: Seq[Any] => toJsonArrayString(v)
private def toString(value: Any): String = {
value match {
case value: Map[String, Any] => toJsonObjectString(value)
case value: Seq[Any] => toJsonArrayString(value)
case value => Option(value).map(_.toString).orNull
}

complexValueToStringValue orElse util.toStringValue
}

private[json] def castToType: PartialFunction[(Any, DataType), Any] = {
def jsonSpecificCast: PartialFunction[(Any, DataType), Any] = {
case (v, StringType) => toStringValue(v)
case (v, DecimalType) => toDecimalValue(v)
case (v, ArrayType(elementType)) =>
v.asInstanceOf[Seq[Any]].map(castToType(_, elementType))
private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
if (value == null) {
null
} else {
desiredType match {
case ArrayType(elementType) =>
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
case StringType => toString(value)
case IntegerType => value.asInstanceOf[IntegerType.JvmType]
case LongType => toLong(value)
case DoubleType => toDouble(value)
case DecimalType => toDecimal(value)
case BooleanType => value.asInstanceOf[BooleanType.JvmType]
case NullType => null
}
}

jsonSpecificCast orElse util.castToType
}

private def asRow(json: Map[String,Any], schema: StructType): Row = {
Expand All @@ -348,7 +370,7 @@ private[sql] object JsonRDD extends Logging {
// Other cases
case (StructField(name, dataType, _), i) =>
row.update(i, json.get(name).flatMap(v => Option(v)).map(
castToType(_, dataType)).getOrElse(null))
enforceCorrectType(_, dataType)).getOrElse(null))
}

row
Expand Down
175 changes: 0 additions & 175 deletions sql/core/src/main/scala/org/apache/spark/sql/util/package.scala

This file was deleted.

20 changes: 10 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.json.JsonRDD.{castToType, compatibleType}
import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.TestSQLContext._

Expand All @@ -41,19 +41,19 @@ class JsonSuite extends QueryTest {
}

val intNumber: Int = 2147483647
checkTypePromotion(intNumber, castToType(intNumber, IntegerType))
checkTypePromotion(intNumber.toLong, castToType(intNumber, LongType))
checkTypePromotion(intNumber.toDouble, castToType(intNumber, DoubleType))
checkTypePromotion(BigDecimal(intNumber), castToType(intNumber, DecimalType))
checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType))
checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType))
checkTypePromotion(BigDecimal(intNumber), enforceCorrectType(intNumber, DecimalType))

val longNumber: Long = 9223372036854775807L
checkTypePromotion(longNumber, castToType(longNumber, LongType))
checkTypePromotion(longNumber.toDouble, castToType(longNumber, DoubleType))
checkTypePromotion(BigDecimal(longNumber), castToType(longNumber, DecimalType))
checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType))
checkTypePromotion(BigDecimal(longNumber), enforceCorrectType(longNumber, DecimalType))

val doubleNumber: Double = 1.7976931348623157E308d
checkTypePromotion(doubleNumber.toDouble, castToType(doubleNumber, DoubleType))
checkTypePromotion(BigDecimal(doubleNumber), castToType(doubleNumber, DecimalType))
checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
checkTypePromotion(BigDecimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType))
}

test("Get compatible type") {
Expand Down

0 comments on commit 43a45e1

Please sign in to comment.