Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone #19607

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4735e59
Add a conf to make Pandas DataFrame respect session local timezone.
ueshin Oct 23, 2017
1f85150
Fix toPandas() behavior.
ueshin Oct 23, 2017
5c08ecf
Modify pandas UDFs to respect session timezone.
ueshin Oct 23, 2017
ee1a1c8
Workaround for old pandas.
ueshin Nov 1, 2017
b1436b8
Don't use is_datetime64tz_dtype for old pandas.
ueshin Nov 1, 2017
6872516
Fix one of the failed tests.
ueshin Nov 1, 2017
1f096bf
Modify check_data udf for debug messages.
ueshin Nov 2, 2017
569bb63
Remove unused method.
ueshin Nov 3, 2017
ce07f39
Modify a test.
ueshin Nov 3, 2017
ba3d6e3
Add debug print, which will be removed later.
ueshin Nov 6, 2017
9101a3a
Fix style.
ueshin Nov 6, 2017
ab13baf
Remove debug prints.
ueshin Nov 8, 2017
4adb073
Modify tests to avoid times within DST.
ueshin Nov 8, 2017
1e0f217
Clean up.
ueshin Nov 8, 2017
d18cd36
Merge branch 'master' into issues/SPARK-22395
ueshin Nov 8, 2017
292678f
Fix the behavior of createDataFrame from pandas DataFrame.
ueshin Nov 8, 2017
f37c067
Merge branch 'master' into issues/SPARK-22395
ueshin Nov 13, 2017
8b1a4d8
Add a test to check the behavior of createDataFrame from pandas DataF…
ueshin Nov 13, 2017
e919ed5
Clarify the usage of Row.
ueshin Nov 13, 2017
9c94f90
Merge branch 'master' into issues/SPARK-22395
ueshin Nov 20, 2017
9cfdde2
Add TODOs for nested timestamp fields.
ueshin Nov 21, 2017
8b1a4a1
Remove workarounds for old Pandas but add some error messages saying …
ueshin Nov 21, 2017
3db2bea
Fix tests.
ueshin Nov 21, 2017
3e23653
Use `_exception_message()` to access error messages.
ueshin Nov 21, 2017
d741171
Fix a test.
ueshin Nov 21, 2017
e240631
Add a description about deprecation of the config.
ueshin Nov 27, 2017
f92eae3
Add migration guide.
ueshin Nov 27, 2017
40a9735
Merge branch 'master' into issues/SPARK-22395
ueshin Nov 27, 2017
9200f38
Address comments.
ueshin Nov 28, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,12 @@ def __repr__(self):
return "ArrowSerializer"


def _create_batch(series):
def _create_batch(series, timezone):
"""
Create an Arrow record batch from the given pandas.Series or list of Series, with optional type.

:param series: A single pandas.Series, list of Series, or list of (series, arrow_type)
:param timezone: A timezone to respect when handling timestamp values
:return: Arrow RecordBatch
"""

Expand All @@ -227,7 +228,7 @@ def _create_batch(series):
def cast_series(s, t):
if type(t) == pa.TimestampType:
# NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680
return _check_series_convert_timestamps_internal(s.fillna(0))\
return _check_series_convert_timestamps_internal(s.fillna(0), timezone)\
.values.astype('datetime64[us]', copy=False)
# NOTE: can not compare None with pyarrow.DataType(), fixed with Arrow >= 0.7.1
elif t is not None and t == pa.date32():
Expand All @@ -253,6 +254,10 @@ class ArrowStreamPandasSerializer(Serializer):
Serializes Pandas.Series as Arrow data with Arrow streaming format.
"""

def __init__(self, timezone):
super(ArrowStreamPandasSerializer, self).__init__()
self._timezone = timezone

def dump_stream(self, iterator, stream):
"""
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or
Expand All @@ -262,7 +267,7 @@ def dump_stream(self, iterator, stream):
writer = None
try:
for series in iterator:
batch = _create_batch(series)
batch = _create_batch(series, self._timezone)
if writer is None:
write_int(SpecialLengths.START_ARROW_STREAM, stream)
writer = pa.RecordBatchStreamWriter(stream, batch.schema)
Expand All @@ -280,7 +285,7 @@ def load_stream(self, stream):
reader = pa.open_stream(stream)
for batch in reader:
# NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
pdf = _check_dataframe_localize_timestamps(batch.to_pandas())
pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
yield [c for _, c in pdf.iteritems()]

def __repr__(self):
Expand Down
24 changes: 21 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.types import IntegralType
from pyspark.sql.types import *
from pyspark.util import _exception_message

__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]

Expand Down Expand Up @@ -1881,6 +1882,13 @@ def toPandas(self):
1 5 Bob
"""
import pandas as pd

if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
== "true":
timezone = self.sql_ctx.getConf("spark.sql.session.timeZone")
else:
timezone = None

if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
try:
from pyspark.sql.types import _check_dataframe_localize_timestamps
Expand All @@ -1889,13 +1897,13 @@ def toPandas(self):
if tables:
table = pyarrow.concat_tables(tables)
pdf = table.to_pandas()
return _check_dataframe_localize_timestamps(pdf)
return _check_dataframe_localize_timestamps(pdf, timezone)
else:
return pd.DataFrame.from_records([], columns=self.columns)
except ImportError as e:
msg = "note: pyarrow must be installed and available on calling Python process " \
"if using spark.sql.execution.arrow.enabled=true"
raise ImportError("%s\n%s" % (e.message, msg))
raise ImportError("%s\n%s" % (_exception_message(e), msg))
else:
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

Expand All @@ -1913,7 +1921,17 @@ def toPandas(self):

for f, t in dtype.items():
pdf[f] = pdf[f].astype(t, copy=False)
return pdf

if timezone is None:
return pdf
else:
from pyspark.sql.types import _check_series_convert_timestamps_local_tz
for field in self.schema:
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
if isinstance(field.dataType, TimestampType):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO for nested timestamp field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add it.

pdf[field.name] = \
_check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
return pdf

def _collectAsArrow(self):
"""
Expand Down
48 changes: 39 additions & 9 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.readwriter import DataFrameReader
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql.types import Row, DataType, StringType, StructType, _make_type_verifier, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
from pyspark.sql.types import Row, DataType, StringType, StructType, TimestampType, \
_make_type_verifier, _infer_schema, _has_nulltype, _merge_type, _create_converter, \
_parse_datatype_string
from pyspark.sql.utils import install_exception_handler

__all__ = ["SparkSession"]
Expand Down Expand Up @@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
record_type_list.append((str(col_names[i]), curr_type))
return np.dtype(record_type_list) if has_rec_fix else None

def _convert_from_pandas(self, pdf):
def _convert_from_pandas(self, pdf, schema, timezone):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea not blocking this PR. Probably, we have enough codes to make a separate Python file / class to put Pandas / Arrow stuff into one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I agree with it but maybe I'll leave those as they are in this pr.

"""
Convert a pandas.DataFrame to list of records that can be used to make a DataFrame
:return list of records
"""
if timezone is not None:
from pyspark.sql.types import _check_series_convert_timestamps_tz_local
copied = False
if isinstance(schema, StructType):
for field in schema:
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
if isinstance(field.dataType, TimestampType):
s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
if not copied and s is not pdf[field.name]:
pdf = pdf.copy()
copied = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind if I ask why we should copy here? Probably, some comments explaining it would be helpful. To be clear, Is it to prevent the original Pandas DataFrame being updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's to prevent the original one from being updated.
I'll add some comments.

pdf[field.name] = s
else:
for column, series in pdf.iteritems():
s = _check_series_convert_timestamps_tz_local(pdf[column], timezone)
if not copied and s is not pdf[column]:
pdf = pdf.copy()
copied = True
pdf[column] = s

# Convert pandas.DataFrame to list of numpy records
np_records = pdf.to_records(index=False)
Expand All @@ -462,15 +482,19 @@ def _convert_from_pandas(self, pdf):
# Convert list of numpy records to python lists
return [r.tolist() for r in np_records]

def _create_from_pandas_with_arrow(self, pdf, schema):
def _create_from_pandas_with_arrow(self, pdf, schema, timezone):
"""
Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
data types will be used to coerce the data in Pandas to Arrow conversion.
"""
from pyspark.serializers import ArrowSerializer, _create_batch
from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
from pyspark.sql.types import from_arrow_schema, to_arrow_type, \
_old_pandas_exception_message, TimestampType
try:
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
except ImportError as e:
raise ImportError(_old_pandas_exception_message(e))

# Determine arrow types to coerce data when creating batches
if isinstance(schema, StructType):
Expand All @@ -488,7 +512,8 @@ def _create_from_pandas_with_arrow(self, pdf, schema):
pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))

# Create Arrow record batches
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)],
timezone)
for pdf_slice in pdf_slices]

# Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
Expand Down Expand Up @@ -606,6 +631,11 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
except Exception:
has_pandas = False
if has_pandas and isinstance(data, pandas.DataFrame):
if self.conf.get("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is a weird config... I think it's acceptable to introduce behavior change during bug fix, like the type inference bug we fixed when converting pandas dataframe to pyspark dataframe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we don't need the config and just fix the behavior? cc @gatorsmile

== "true":
timezone = self.conf.get("spark.sql.session.timeZone")
else:
timezone = None

# If no schema supplied by user then get the names of columns only
if schema is None:
Expand All @@ -614,11 +644,11 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
and len(data) > 0:
try:
return self._create_from_pandas_with_arrow(data, schema)
return self._create_from_pandas_with_arrow(data, schema, timezone)
except Exception as e:
warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
# Fallback to create DataFrame without arrow if raise some exception
data = self._convert_from_pandas(data)
data = self._convert_from_pandas(data, schema, timezone)

if isinstance(schema, StructType):
verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True
Expand Down
Loading