-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3988][SQL] add public API for date type #2901
Changes from 4 commits
8d7dd22
1d74448
444f100
f760d8e
5670626
c51a24d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,7 @@ | |
|
||
|
||
__all__ = [ | ||
"StringType", "BinaryType", "BooleanType", "TimestampType", "DecimalType", | ||
"StringType", "BinaryType", "BooleanType", "DateType", "TimestampType", "DecimalType", | ||
"DoubleType", "FloatType", "ByteType", "IntegerType", "LongType", | ||
"ShortType", "ArrayType", "MapType", "StructField", "StructType", | ||
"SQLContext", "HiveContext", "SchemaRDD", "Row"] | ||
|
@@ -132,6 +132,14 @@ class BooleanType(PrimitiveType): | |
""" | ||
|
||
|
||
class DateType(PrimitiveType): | ||
|
||
"""Spark SQL DateType | ||
|
||
The data type representing datetime.date values. | ||
""" | ||
|
||
|
||
class TimestampType(PrimitiveType): | ||
|
||
"""Spark SQL TimestampType | ||
|
@@ -438,7 +446,7 @@ def _parse_datatype_json_value(json_value): | |
return _all_complex_types[json_value["type"]].fromJson(json_value) | ||
|
||
|
||
# Mapping Python types to Spark SQL DateType | ||
# Mapping Python types to Spark SQL DataType | ||
_type_mappings = { | ||
bool: BooleanType, | ||
int: IntegerType, | ||
|
@@ -448,8 +456,8 @@ def _parse_datatype_json_value(json_value): | |
unicode: StringType, | ||
bytearray: BinaryType, | ||
decimal.Decimal: DecimalType, | ||
datetime.date: DateType, | ||
datetime.datetime: TimestampType, | ||
datetime.date: TimestampType, | ||
datetime.time: TimestampType, | ||
} | ||
|
||
|
@@ -656,10 +664,10 @@ def _infer_schema_type(obj, dataType): | |
""" | ||
Fill the dataType with types infered from obj | ||
|
||
>>> schema = _parse_schema_abstract("a b c") | ||
>>> row = (1, 1.0, "str") | ||
>>> schema = _parse_schema_abstract("a b c d") | ||
>>> row = (1, 1.0, "str", datetime.date(2014, 10, 10)) | ||
>>> _infer_schema_type(row, schema) | ||
StructType...IntegerType...DoubleType...StringType... | ||
StructType...IntegerType...DoubleType...StringType...DateType... | ||
>>> row = [[1], {"key": (1, 2.0)}] | ||
>>> schema = _parse_schema_abstract("a[] b{c d}") | ||
>>> _infer_schema_type(row, schema) | ||
|
@@ -703,6 +711,7 @@ def _infer_schema_type(obj, dataType): | |
DecimalType: (decimal.Decimal,), | ||
StringType: (str, unicode), | ||
BinaryType: (bytearray,), | ||
DateType: (datetime.date,), | ||
TimestampType: (datetime.datetime,), | ||
ArrayType: (list, tuple, array), | ||
MapType: (dict,), | ||
|
@@ -740,7 +749,7 @@ def _verify_type(obj, dataType): | |
|
||
# subclass of them can not be deserialized in JVM | ||
if type(obj) not in _acceptable_types[_type]: | ||
raise TypeError("%s can not accept abject in type %s" | ||
raise TypeError("%s can not accept object in type %s" | ||
% (dataType, type(obj))) | ||
|
||
if isinstance(dataType, ArrayType): | ||
|
@@ -767,7 +776,7 @@ def _restore_object(dataType, obj): | |
""" Restore object during unpickling. """ | ||
# use id(dataType) as key to speed up lookup in dict | ||
# Because of batched pickling, dataType will be the | ||
# same object in mose cases. | ||
# same object in most cases. | ||
k = id(dataType) | ||
cls = _cached_cls.get(k) | ||
if cls is None: | ||
|
@@ -1065,7 +1074,9 @@ def applySchema(self, rdd, schema): | |
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')] | ||
|
||
>>> from datetime import datetime | ||
>>> from datetime import date | ||
>>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0, | ||
... date(2010, 1, 1), | ||
... datetime(2010, 1, 1, 1, 1, 1), | ||
... {"a": 1}, (2,), [1, 2, 3], None)]) | ||
>>> schema = StructType([ | ||
|
@@ -1075,6 +1086,7 @@ def applySchema(self, rdd, schema): | |
... StructField("short2", ShortType(), False), | ||
... StructField("int", IntegerType(), False), | ||
... StructField("float", FloatType(), False), | ||
... StructField("date", DateType(), False), | ||
... StructField("time", TimestampType(), False), | ||
... StructField("map", | ||
... MapType(StringType(), IntegerType(), False), False), | ||
|
@@ -1084,10 +1096,11 @@ def applySchema(self, rdd, schema): | |
... StructField("null", DoubleType(), True)]) | ||
>>> srdd = sqlCtx.applySchema(rdd, schema) | ||
>>> results = srdd.map( | ||
... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int, x.float, x.time, | ||
... x.map["a"], x.struct.b, x.list, x.null)) | ||
>>> results.collect()[0] | ||
(127, -128, -32768, 32767, 2147483647, 1.0, ...(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None) | ||
... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int, x.float, x.date, | ||
... x.time, x.map["a"], x.struct.b, x.list, x.null)) | ||
>>> results.collect()[0] # doctest: +NORMALIZE_WHITESPACE | ||
(127, -128, -32768, 32767, 2147483647, 1.0, datetime.datetime(2010, 1, 1, 0, 0), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davies because of using pyrolite, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. After the data was deserialized in Python, we need to some data coversions, so we can convert datetime to date if DataType is DateType. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so should the convert in python side or scala side, which one would you prefer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can only do it in Python side. |
||
datetime.datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None) | ||
|
||
>>> srdd.registerTempTable("table2") | ||
>>> sqlCtx.sql( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql.json | |
import scala.collection.Map | ||
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper} | ||
import scala.math.BigDecimal | ||
import java.sql.Timestamp | ||
import java.sql.{Date, Timestamp} | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException | ||
import com.fasterxml.jackson.databind.ObjectMapper | ||
|
@@ -372,13 +372,20 @@ private[sql] object JsonRDD extends Logging { | |
} | ||
} | ||
|
||
private def toDate(value: Any): Date = { | ||
value match { | ||
// only support string as date | ||
case value: java.lang.String => Date.valueOf(value) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add more formats support for Date? At least ISO8601, see some discussion here: http://stackoverflow.com/questions/10286204/the-right-json-date-format We would like have jsonRDD() as robust as possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, could we guess the DateType given a String in Date format? We also should do this for TimestampType, it will be great if you could fix them in this PR. |
||
} | ||
} | ||
|
||
private def toTimestamp(value: Any): Timestamp = { | ||
value match { | ||
case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong) | ||
case value: java.lang.Long => new Timestamp(value) | ||
case value: java.lang.String => Timestamp.valueOf(value) | ||
} | ||
} | ||
case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong) | ||
case value: java.lang.Long => new Timestamp(value) | ||
case value: java.lang.String => Timestamp.valueOf(value) | ||
} | ||
} | ||
|
||
private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={ | ||
if (value == null) { | ||
|
@@ -396,6 +403,7 @@ private[sql] object JsonRDD extends Logging { | |
case ArrayType(elementType, _) => | ||
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) | ||
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct) | ||
case DateType => toDate(value) | ||
case TimestampType => toTimestamp(value) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: these two lines can be combined.