Skip to content

Commit

Permalink
add timezone support for DateType
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 11, 2015
1 parent 99d9d9c commit 44d8497
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
7 changes: 5 additions & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,17 @@ def test_filter_with_datetime(self):
self.assertEqual(0, df.filter(df.time > time).count())

def test_time_with_timezone(self):
day = datetime.date.today()
now = datetime.datetime.now()
ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
# class in __main__ is not serializable
from pyspark.sql.tests import UTC
utc = UTC()
utcnow = datetime.datetime.fromtimestamp(ts, utc)
df = self.sqlCtx.createDataFrame([(now, utcnow)])
now1, utcnow1 = df.first()
df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
day1, now1, utcnow1 = df.first()
# Pyrolite serialize java.sql.Date as datetime, will be fixed in new version
self.assertEqual(day1.date(), day)
# Pyrolite does not support microsecond, the error should be
# less than 1 millisecond
self.assertTrue(now - now1 < datetime.timedelta(0.001))
Expand Down
23 changes: 15 additions & 8 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
elif isinstance(dataType, TimestampType):
elif isinstance(dataType, (DateType, TimestampType)):
return True
else:
return False


EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()


def _python_to_sql_converter(dataType):
"""
Returns a converter that converts a Python object into a SQL datum for the given type.
Expand Down Expand Up @@ -698,26 +701,30 @@ def converter(obj):
return tuple(c(d.get(n)) for n, c in zip(names, converters))
else:
return tuple(c(v) for c, v in zip(converters, obj))
else:
elif obj is not None:
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
return converter
elif isinstance(dataType, ArrayType):
element_converter = _python_to_sql_converter(dataType.elementType)
return lambda a: [element_converter(v) for v in a]
return lambda a: a and [element_converter(v) for v in a]
elif isinstance(dataType, MapType):
key_converter = _python_to_sql_converter(dataType.keyType)
value_converter = _python_to_sql_converter(dataType.valueType)
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])

elif isinstance(dataType, UserDefinedType):
return lambda obj: dataType.serialize(obj)
return lambda obj: obj and dataType.serialize(obj)

elif isinstance(dataType, DateType):
return lambda d: d and d.toordinal() - EPOCH_ORDINAL

elif isinstance(dataType, TimestampType):

def to_posix_timstamp(dt):
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds * 1e7 + dt.microsecond * 10)
if dt:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds * 1e7 + dt.microsecond * 10)
return to_posix_timstamp

else:
Expand Down

0 comments on commit 44d8497

Please sign in to comment.