-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone #19607
Changes from 20 commits
4735e59
1f85150
5c08ecf
ee1a1c8
b1436b8
6872516
1f096bf
569bb63
ce07f39
ba3d6e3
9101a3a
ab13baf
4adb073
1e0f217
d18cd36
292678f
f37c067
8b1a4d8
e919ed5
9c94f90
9cfdde2
8b1a4a1
3db2bea
3e23653
d741171
e240631
f92eae3
40a9735
9200f38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,8 +34,9 @@ | |
from pyspark.sql.dataframe import DataFrame | ||
from pyspark.sql.readwriter import DataFrameReader | ||
from pyspark.sql.streaming import DataStreamReader | ||
from pyspark.sql.types import Row, DataType, StringType, StructType, _make_type_verifier, \ | ||
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string | ||
from pyspark.sql.types import Row, DataType, StringType, StructType, TimestampType, \ | ||
_make_type_verifier, _infer_schema, _has_nulltype, _merge_type, _create_converter, \ | ||
_parse_datatype_string | ||
from pyspark.sql.utils import install_exception_handler | ||
|
||
__all__ = ["SparkSession"] | ||
|
@@ -444,11 +445,29 @@ def _get_numpy_record_dtype(self, rec): | |
record_type_list.append((str(col_names[i]), curr_type)) | ||
return np.dtype(record_type_list) if has_rec_fix else None | ||
|
||
def _convert_from_pandas(self, pdf): | ||
def _convert_from_pandas(self, pdf, schema, timezone): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just an idea not blocking this PR. Probably, we have enough codes to make a separate Python file / class to put Pandas / Arrow stuff into one place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I agree with it but maybe I'll leave those as they are in this pr. |
||
""" | ||
Convert a pandas.DataFrame to list of records that can be used to make a DataFrame | ||
:return list of records | ||
""" | ||
if timezone is not None: | ||
from pyspark.sql.types import _check_series_convert_timestamps_tz_local | ||
copied = False | ||
if isinstance(schema, StructType): | ||
for field in schema: | ||
if isinstance(field.dataType, TimestampType): | ||
s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone) | ||
if not copied and s is not pdf[field.name]: | ||
pdf = pdf.copy() | ||
copied = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you mind if I ask why we should copy here? Probably, some comments explaining it would be helpful. To be clear, Is it to prevent the original Pandas DataFrame being updated? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's to prevent the original one from being updated. |
||
pdf[field.name] = s | ||
else: | ||
for column, series in pdf.iteritems(): | ||
s = _check_series_convert_timestamps_tz_local(pdf[column], timezone) | ||
if not copied and s is not pdf[column]: | ||
pdf = pdf.copy() | ||
copied = True | ||
pdf[column] = s | ||
|
||
# Convert pandas.DataFrame to list of numpy records | ||
np_records = pdf.to_records(index=False) | ||
|
@@ -462,7 +481,7 @@ def _convert_from_pandas(self, pdf): | |
# Convert list of numpy records to python lists | ||
return [r.tolist() for r in np_records] | ||
|
||
def _create_from_pandas_with_arrow(self, pdf, schema): | ||
def _create_from_pandas_with_arrow(self, pdf, schema, timezone): | ||
""" | ||
Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting | ||
to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the | ||
|
@@ -488,7 +507,8 @@ def _create_from_pandas_with_arrow(self, pdf, schema): | |
pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step)) | ||
|
||
# Create Arrow record batches | ||
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]) | ||
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)], | ||
timezone) | ||
for pdf_slice in pdf_slices] | ||
|
||
# Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing) | ||
|
@@ -606,6 +626,11 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr | |
except Exception: | ||
has_pandas = False | ||
if has_pandas and isinstance(data, pandas.DataFrame): | ||
if self.conf.get("spark.sql.execution.pandas.respectSessionTimeZone").lower() \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel this is a weird config... I think it's acceptable to introduce behavior change during bug fix, like the type inference bug we fixed when converting pandas dataframe to pyspark dataframe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean we don't need the config and just fix the behavior? cc @gatorsmile |
||
== "true": | ||
timezone = self.conf.get("spark.sql.session.timeZone") | ||
else: | ||
timezone = None | ||
|
||
# If no schema supplied by user then get the names of columns only | ||
if schema is None: | ||
|
@@ -614,11 +639,11 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr | |
if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \ | ||
and len(data) > 0: | ||
try: | ||
return self._create_from_pandas_with_arrow(data, schema) | ||
return self._create_from_pandas_with_arrow(data, schema, timezone) | ||
except Exception as e: | ||
warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e)) | ||
# Fallback to create DataFrame without arrow if raise some exception | ||
data = self._convert_from_pandas(data) | ||
data = self._convert_from_pandas(data, schema, timezone) | ||
|
||
if isinstance(schema, StructType): | ||
verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a TODO for nested timestamp field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll add it.