diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ed419db2df6c2..6058e94d471e9 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -260,10 +260,14 @@ def __init__(self, timezone, safecheck, assign_cols_by_name): self._safecheck = safecheck self._assign_cols_by_name = assign_cols_by_name - def arrow_to_pandas(self, arrow_column, data_type): - from pyspark.sql.types import _arrow_column_to_pandas, _check_series_localize_timestamps + def arrow_to_pandas(self, arrow_column): + from pyspark.sql.types import _check_series_localize_timestamps + + # If the given column is a date type column, creates a series of datetime.date directly + # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by + # datetime64[ns] type handling. + s = arrow_column.to_pandas(date_as_object=True) - s = _arrow_column_to_pandas(arrow_column, data_type) s = _check_series_localize_timestamps(s, self._timezone) return s @@ -275,8 +279,6 @@ def _create_batch(self, series): :param series: A single pandas.Series, list of Series, or list of (series, arrow_type) :return: Arrow RecordBatch """ - import decimal - from distutils.version import LooseVersion import pandas as pd import pyarrow as pa from pyspark.sql.types import _check_series_convert_timestamps_internal @@ -289,24 +291,10 @@ def _create_batch(self, series): def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation - # TODO: maybe don't need None check anymore as of Arrow 0.9.1 if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone) # TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2 return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False) - elif t is not None and pa.types.is_string(t) and sys.version < '3': - # TODO: need decode before converting to Arrow in Python 2 - # TODO: don't need as of Arrow 0.9.1 - return pa.Array.from_pandas(s.apply( - lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) - elif t is not None and pa.types.is_decimal(t) and \ - LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - # TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0. - return pa.Array.from_pandas(s.apply( - lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t) - elif LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. - return pa.Array.from_pandas(s, mask=mask, type=t) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) @@ -340,12 +328,7 @@ def create_array(s, t): for i, field in enumerate(t)] struct_arrs, struct_names = zip(*arrs_names) - - # TODO: from_arrays args switched for v0.9.0, remove when bump min pyarrow version - if LooseVersion(pa.__version__) < LooseVersion("0.9.0"): - arrs.append(pa.StructArray.from_arrays(struct_names, struct_arrs)) - else: - arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names)) + arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names)) else: arrs.append(create_array(s, t)) @@ -365,10 +348,8 @@ def load_stream(self, stream): """ batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) import pyarrow as pa - from pyspark.sql.types import from_arrow_type for batch in batches: - yield [self.arrow_to_pandas(c, from_arrow_type(c.type)) - for c in pa.Table.from_batches([batch]).itercolumns()] + yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] def __repr__(self): return "ArrowStreamPandasSerializer" @@ -384,17 +365,17 @@ def __init__(self, timezone, safecheck, assign_cols_by_name, df_for_struct=False .__init__(timezone, safecheck, assign_cols_by_name) self._df_for_struct = df_for_struct - def arrow_to_pandas(self, arrow_column, data_type): - from pyspark.sql.types import StructType, \ - _arrow_column_to_pandas, _check_dataframe_localize_timestamps + def arrow_to_pandas(self, arrow_column): + import pyarrow.types as types - if self._df_for_struct and type(data_type) == StructType: + if self._df_for_struct and types.is_struct(arrow_column.type): import pandas as pd - series = [_arrow_column_to_pandas(column, field.dataType).rename(field.name) - for column, field in zip(arrow_column.flatten(), data_type)] - s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone) + series = [super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(column) + .rename(field.name) + for column, field in zip(arrow_column.flatten(), arrow_column.type)] + s = pd.concat(series, axis=1) else: - s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type) + s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column) return s def dump_stream(self, iterator, stream): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 659cbc4487d13..f8aeb62a27fb9 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2138,13 +2138,15 @@ def toPandas(self): # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. if use_arrow: try: - from pyspark.sql.types import _arrow_table_to_pandas, \ - _check_dataframe_localize_timestamps + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow batches = self._collectAsArrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) - pdf = _arrow_table_to_pandas(table, self.schema) + # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type + # values, but we should use datetime.date to match the behavior with when + # Arrow optimization is disabled. + pdf = table.to_pandas(date_as_object=True) return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index b11e0f3ff69de..52e90370373f6 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -530,7 +530,6 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the data types will be used to coerce the data in Pandas to Arrow conversion. """ - from distutils.version import LooseVersion from pyspark.serializers import ArrowStreamPandasSerializer from pyspark.sql.types import from_arrow_type, to_arrow_type, TimestampType from pyspark.sql.utils import require_minimum_pandas_version, \ @@ -544,11 +543,7 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): - if LooseVersion(pa.__version__) < LooseVersion("0.12.0"): - temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], preserve_index=False) - arrow_schema = temp_batch.schema - else: - arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) + arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) struct = StructType() for name, field in zip(schema, arrow_schema): struct.add(name, from_arrow_type(field.type), nullable=field.nullable) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index a45c3fb85601e..22578cbe2e98c 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -46,7 +46,6 @@ class ArrowTests(ReusedSQLTestCase): def setUpClass(cls): from datetime import date, datetime from decimal import Decimal - from distutils.version import LooseVersion super(ArrowTests, cls).setUpClass() cls.warnings_lock = threading.Lock() @@ -68,23 +67,16 @@ def setUpClass(cls): StructField("5_double_t", DoubleType(), True), StructField("6_decimal_t", DecimalType(38, 18), True), StructField("7_date_t", DateType(), True), - StructField("8_timestamp_t", TimestampType(), True)]) + StructField("8_timestamp_t", TimestampType(), True), + StructField("9_binary_t", BinaryType(), True)]) cls.data = [(u"a", 1, 10, 0.2, 2.0, Decimal("2.0"), - date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), + date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1), bytearray(b"a")), (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"), - date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), + date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2), bytearray(b"bb")), (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), - date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3)), + date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3), bytearray(b"ccc")), (u"d", 4, 40, 1.0, 8.0, Decimal("8.0"), - date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3))] - - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion("0.10.0") <= LooseVersion(pa.__version__): - cls.schema.add(StructField("9_binary_t", BinaryType(), True)) - cls.data[0] = cls.data[0] + (bytearray(b"a"),) - cls.data[1] = cls.data[1] + (bytearray(b"bb"),) - cls.data[2] = cls.data[2] + (bytearray(b"ccc"),) - cls.data[3] = cls.data[3] + (bytearray(b"dddd"),) + date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3), bytearray(b"dddd"))] @classmethod def tearDownClass(cls): @@ -123,8 +115,6 @@ def test_toPandas_fallback_enabled(self): assert_frame_equal(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): - from distutils.version import LooseVersion - schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): @@ -132,14 +122,6 @@ def test_toPandas_fallback_disabled(self): with self.assertRaisesRegexp(Exception, 'Unsupported type'): df.toPandas() - # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - schema = StructType([StructField("binary", BinaryType(), True)]) - df = self.spark.createDataFrame([(None,)], schema=schema) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Unsupported type.*BinaryType'): - df.toPandas() - def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + self.data) @@ -348,20 +330,11 @@ def test_createDataFrame_fallback_enabled(self): self.assertEqual(df.collect(), [Row(a={u'a': 1})]) def test_createDataFrame_fallback_disabled(self): - from distutils.version import LooseVersion - with QuietTest(self.sc): with self.assertRaisesRegexp(TypeError, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") - # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - with self.assertRaisesRegexp(TypeError, 'Unsupported type.*BinaryType'): - self.spark.createDataFrame( - pd.DataFrame([[{'a': b'aaa'}]]), "a: binary") - # Regression test for SPARK-23314 def test_timestamp_dst(self): # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py index fd6d4e18ed2ab..72dc05bb02cdc 100644 --- a/python/pyspark/sql/tests/test_pandas_udf.py +++ b/python/pyspark/sql/tests/test_pandas_udf.py @@ -198,10 +198,8 @@ def foofoo(x, y): ) def test_pandas_udf_detect_unsafe_type_conversion(self): - from distutils.version import LooseVersion import pandas as pd import numpy as np - import pyarrow as pa values = [1.0] * 3 pdf = pd.DataFrame({'A': values}) @@ -209,15 +207,14 @@ def test_pandas_udf_detect_unsafe_type_conversion(self): @pandas_udf(returnType="int") def udf(column): - return pd.Series(np.linspace(0, 1, 3)) + return pd.Series(np.linspace(0, 1, len(column))) # Since 0.11.0, PyArrow supports the feature to raise an error for unsafe cast. - if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): - with self.sql_conf({ - "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): - with self.assertRaisesRegexp(Exception, - "Exception thrown when converting pandas.Series"): - df.select(['A']).withColumn('udf', udf('A')).collect() + with self.sql_conf({ + "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): + with self.assertRaisesRegexp(Exception, + "Exception thrown when converting pandas.Series"): + df.select(['A']).withColumn('udf', udf('A')).collect() # Disabling Arrow safe type check. with self.sql_conf({ @@ -225,35 +222,24 @@ def udf(column): df.select(['A']).withColumn('udf', udf('A')).collect() def test_pandas_udf_arrow_overflow(self): - from distutils.version import LooseVersion import pandas as pd - import pyarrow as pa df = self.spark.range(0, 1) @pandas_udf(returnType="byte") def udf(column): - return pd.Series([128]) - - # Arrow 0.11.0+ allows enabling or disabling safe type check. - if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): - # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast. - with self.sql_conf({ - "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): - with self.assertRaisesRegexp(Exception, - "Exception thrown when converting pandas.Series"): - df.withColumn('udf', udf('id')).collect() - - # Disabling safe type check, let Arrow do the cast anyway. - with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): + return pd.Series([128] * len(column)) + + # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast. + with self.sql_conf({ + "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): + with self.assertRaisesRegexp(Exception, + "Exception thrown when converting pandas.Series"): df.withColumn('udf', udf('id')).collect() - else: - # SQL config `arrowSafeTypeConversion` no matters for older Arrow. - # Overflow cast causes an error. - with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): - with self.assertRaisesRegexp(Exception, - "Integer value out of bounds"): - df.withColumn('udf', udf('id')).collect() + + # Disabling safe type check, let Arrow do the cast anyway. + with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): + df.withColumn('udf', udf('id')).collect() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index c8bad99a7705e..1d87c636ab34e 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -21,7 +21,6 @@ from collections import OrderedDict from decimal import Decimal -from distutils.version import LooseVersion from pyspark.sql import Row from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType @@ -65,20 +64,17 @@ def test_supported_types(self): 1, 2, 3, 4, 5, 1.1, 2.2, Decimal(1.123), - [1, 2, 2], True, 'hello' + [1, 2, 2], True, 'hello', + bytearray([0x01, 0x02]) ] output_fields = [ ('id', IntegerType()), ('byte', ByteType()), ('short', ShortType()), ('int', IntegerType()), ('long', LongType()), ('float', FloatType()), ('double', DoubleType()), ('decim', DecimalType(10, 3)), - ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()) + ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()), + ('bin', BinaryType()) ] - # TODO: Add BinaryType to variables above once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) >= LooseVersion("0.10.0"): - values.append(bytearray([0x01, 0x02])) - output_fields.append(('bin', BinaryType())) - output_schema = StructType([StructField(*x) for x in output_fields]) df = self.spark.createDataFrame([values], schema=output_schema) @@ -95,6 +91,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -112,6 +109,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -130,6 +128,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -291,10 +290,6 @@ def test_unsupported_types(self): StructField('struct', StructType([StructField('l', LongType())])), ] - # TODO: Remove this if-statement once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - unsupported_types.append(StructField('bin', BinaryType())) - for unsupported_type in unsupported_types: schema = StructType([StructField('id', LongType(), True), unsupported_type]) with QuietTest(self.sc): @@ -466,13 +461,8 @@ def invalid_positional_types(pdf): with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): grouped_df.apply(column_name_typo).collect() - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. - with self.assertRaisesRegexp(Exception, "No cast implemented"): - grouped_df.apply(invalid_positional_types).collect() - else: - with self.assertRaisesRegexp(Exception, "an integer is required"): - grouped_df.apply(invalid_positional_types).collect() + with self.assertRaisesRegexp(Exception, "an integer is required"): + grouped_df.apply(invalid_positional_types).collect() def test_positional_assignment_conf(self): with self.sql_conf({ diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index ebba074fec109..b219624e5801b 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -28,7 +28,6 @@ from datetime import date, datetime from decimal import Decimal -from distutils.version import LooseVersion from pyspark.rdd import PythonEvalType from pyspark.sql import Column @@ -240,19 +239,12 @@ def test_vectorized_udf_datatype_string(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_binary(self): - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*BinaryType'): - pandas_udf(lambda x: x, BinaryType()) - else: - data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] - schema = StructType().add("binary", BinaryType()) - df = self.spark.createDataFrame(data, schema) - str_f = pandas_udf(lambda x: x, BinaryType()) - res = df.select(str_f(col('binary'))) - self.assertEquals(df.collect(), res.collect()) + data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] + schema = StructType().add("binary", BinaryType()) + df = self.spark.createDataFrame(data, schema) + str_f = pandas_udf(lambda x: x, BinaryType()) + res = df.select(str_f(col('binary'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_array_type(self): data = [([1, 2],), ([3, 4],)] @@ -293,15 +285,7 @@ def func(id): struct_f = pandas_udf(lambda x: x, return_type) actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str')))) - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - from py4j.protocol import Py4JJavaError - with self.assertRaisesRegexp( - Py4JJavaError, - 'Unsupported type in conversion from Arrow'): - self.assertEqual(expected, actual.collect()) - else: - self.assertEqual(expected, actual.collect()) + self.assertEqual(expected, actual.collect()) def test_vectorized_udf_struct_complex(self): df = self.spark.range(10) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 21595e5d63bf5..72c437a499a91 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1581,7 +1581,6 @@ def convert(self, obj, gateway_client): def to_arrow_type(dt): """ Convert Spark data type to pyarrow type """ - from distutils.version import LooseVersion import pyarrow as pa if type(dt) == BooleanType: arrow_type = pa.bool_() @@ -1602,10 +1601,6 @@ def to_arrow_type(dt): elif type(dt) == StringType: arrow_type = pa.string() elif type(dt) == BinaryType: - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion to Arrow: " + str(dt) + - "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") arrow_type = pa.binary() elif type(dt) == DateType: arrow_type = pa.date32() @@ -1639,8 +1634,6 @@ def to_arrow_schema(schema): def from_arrow_type(at): """ Convert pyarrow type to Spark data type. """ - from distutils.version import LooseVersion - import pyarrow as pa import pyarrow.types as types if types.is_boolean(at): spark_type = BooleanType() @@ -1661,10 +1654,6 @@ def from_arrow_type(at): elif types.is_string(at): spark_type = StringType() elif types.is_binary(at): - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + - "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") spark_type = BinaryType() elif types.is_date32(at): spark_type = DateType() @@ -1675,10 +1664,6 @@ def from_arrow_type(at): raise TypeError("Unsupported type in conversion from Arrow: " + str(at)) spark_type = ArrayType(from_arrow_type(at.value_type)) elif types.is_struct(at): - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + - "\nPlease install pyarrow >= 0.10.0 for StructType support.") if any(types.is_struct(field.type) for field in at): raise TypeError("Nested StructType not supported in conversion from Arrow: " + str(at)) return StructType( @@ -1697,54 +1682,6 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _arrow_column_to_pandas(column, data_type): - """ Convert Arrow Column to pandas Series. - - :param series: pyarrow.lib.Column - :param data_type: a Spark data type for the column - """ - import pandas as pd - import pyarrow as pa - from distutils.version import LooseVersion - # If the given column is a date type column, creates a series of datetime.date directly instead - # of creating datetime64[ns] as intermediate data to avoid overflow caused by datetime64[ns] - # type handling. - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - if type(data_type) == DateType: - return pd.Series(column.to_pylist(), name=column.name) - else: - return column.to_pandas() - else: - # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of - # np.datetime64. - return column.to_pandas(date_as_object=True) - - -def _arrow_table_to_pandas(table, schema): - """ Convert Arrow Table to pandas DataFrame. - - Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should - use datetime.date to match the behavior with when Arrow optimization is disabled. - - :param table: pyarrow.lib.Table - :param schema: a Spark schema of the pyarrow.lib.Table - """ - import pandas as pd - import pyarrow as pa - from distutils.version import LooseVersion - # If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11 - # or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as - # intermediate data. - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - if any(type(field.dataType) == DateType for field in schema): - return pd.concat([_arrow_column_to_pandas(column, field.dataType) - for column, field in zip(table.itercolumns(), schema)], axis=1) - else: - return table.to_pandas() - else: - return table.to_pandas(date_as_object=True) - - def _get_local_timezone(): """ Get local timezone using pytz with environment variable, or dateutil. diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index bdb3a1467f1d8..709d3a0642616 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -136,7 +136,7 @@ def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. - minimum_pyarrow_version = "0.8.0" + minimum_pyarrow_version = "0.12.1" from distutils.version import LooseVersion try: diff --git a/python/setup.py b/python/setup.py index 3c129c9251b1f..e769bf52e7ebb 100644 --- a/python/setup.py +++ b/python/setup.py @@ -102,10 +102,11 @@ def _supports_symlinks(): file=sys.stderr) sys.exit(-1) -# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py and -# ./python/run-tests.py. In case of Arrow, you should also check ./pom.xml. +# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py +# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the +# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications. _minimum_pandas_version = "0.19.2" -_minimum_pyarrow_version = "0.8.0" +_minimum_pyarrow_version = "0.12.1" try: # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts