Skip to content

Commit

Permalink
[SPARK-23314][PYTHON] Add ambiguous=False when localizing tz-naive ti…
Browse files Browse the repository at this point in the history
…mestamps in Arrow codepath to deal with dst

## What changes were proposed in this pull request?
When tz_localize a tz-naive timetamp, pandas will throw exception if the timestamp is during daylight saving time period, e.g., `2015-11-01 01:30:00`. This PR fixes this issue by setting `ambiguous=False` when calling tz_localize, which is the same default behavior of pytz.

## How was this patch tested?
Add `test_timestamp_dst`

Author: Li Jin <[email protected]>

Closes #20537 from icexelloss/SPARK-23314.
  • Loading branch information
icexelloss authored and HyukjinKwon committed Feb 11, 2018
1 parent 0783876 commit a34fce1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 3 deletions.
39 changes: 39 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3670,6 +3670,21 @@ def test_createDataFrame_with_int_col_names(self):
self.assertEqual(pdf_col_names, df.columns)
self.assertEqual(pdf_col_names, df_arrow.columns)

# Regression test for SPARK-23314
def test_timestamp_dst(self):
import pandas as pd
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
dt = [datetime.datetime(2015, 11, 1, 0, 30),
datetime.datetime(2015, 11, 1, 1, 30),
datetime.datetime(2015, 11, 1, 2, 30)]
pdf = pd.DataFrame({'time': dt})

df_from_python = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
df_from_pandas = self.spark.createDataFrame(pdf)

self.assertPandasEqual(pdf, df_from_python.toPandas())
self.assertPandasEqual(pdf, df_from_pandas.toPandas())


@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
Expand Down Expand Up @@ -4311,6 +4326,18 @@ def test_register_vectorized_udf_basic(self):
self.assertEquals(expected.collect(), res1.collect())
self.assertEquals(expected.collect(), res2.collect())

# Regression test for SPARK-23314
def test_timestamp_dst(self):
from pyspark.sql.functions import pandas_udf
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
dt = [datetime.datetime(2015, 11, 1, 0, 30),
datetime.datetime(2015, 11, 1, 1, 30),
datetime.datetime(2015, 11, 1, 2, 30)]
df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
foo_udf = pandas_udf(lambda x: x, 'timestamp')
result = df.withColumn('time', foo_udf(df.time))
self.assertEquals(df.collect(), result.collect())


@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
Expand Down Expand Up @@ -4482,6 +4509,18 @@ def test_unsupported_types(self):
with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
df.groupby('id').apply(f).collect()

# Regression test for SPARK-23314
def test_timestamp_dst(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
dt = [datetime.datetime(2015, 11, 1, 0, 30),
datetime.datetime(2015, 11, 1, 1, 30),
datetime.datetime(2015, 11, 1, 2, 30)]
df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
foo_udf = pandas_udf(lambda pdf: pdf, 'time timestamp', PandasUDFType.GROUPED_MAP)
result = df.groupby('time').apply(foo_udf).sort('time')
self.assertPandasEqual(df.toPandas(), result.toPandas())


@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
Expand Down
37 changes: 34 additions & 3 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1759,8 +1759,38 @@ def _check_series_convert_timestamps_internal(s, timezone):
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
if is_datetime64_dtype(s.dtype):
# When tz_localize a tz-naive timestamp, the result is ambiguous if the tz-naive
# timestamp is during the hour when the clock is adjusted backward during due to
# daylight saving time (dst).
# E.g., for America/New_York, the clock is adjusted backward on 2015-11-01 2:00 to
# 2015-11-01 1:00 from dst-time to standard time, and therefore, when tz_localize
# a tz-naive timestamp 2015-11-01 1:30 with America/New_York timezone, it can be either
# dst time (2015-01-01 1:30-0400) or standard time (2015-11-01 1:30-0500).
#
# Here we explicit choose to use standard time. This matches the default behavior of
# pytz.
#
# Here are some code to help understand this behavior:
# >>> import datetime
# >>> import pandas as pd
# >>> import pytz
# >>>
# >>> t = datetime.datetime(2015, 11, 1, 1, 30)
# >>> ts = pd.Series([t])
# >>> tz = pytz.timezone('America/New_York')
# >>>
# >>> ts.dt.tz_localize(tz, ambiguous=True)
# 0 2015-11-01 01:30:00-04:00
# dtype: datetime64[ns, America/New_York]
# >>>
# >>> ts.dt.tz_localize(tz, ambiguous=False)
# 0 2015-11-01 01:30:00-05:00
# dtype: datetime64[ns, America/New_York]
# >>>
# >>> str(tz.localize(t))
# '2015-11-01 01:30:00-05:00'
tz = timezone or _get_local_timezone()
return s.dt.tz_localize(tz).dt.tz_convert('UTC')
return s.dt.tz_localize(tz, ambiguous=False).dt.tz_convert('UTC')
elif is_datetime64tz_dtype(s.dtype):
return s.dt.tz_convert('UTC')
else:
Expand Down Expand Up @@ -1788,8 +1818,9 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone):
return s.dt.tz_convert(to_tz).dt.tz_localize(None)
elif is_datetime64_dtype(s.dtype) and from_tz != to_tz:
# `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT.
return s.apply(lambda ts: ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None)
if ts is not pd.NaT else pd.NaT)
return s.apply(
lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
if ts is not pd.NaT else pd.NaT)
else:
return s

Expand Down

0 comments on commit a34fce1

Please sign in to comment.