-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22966][PYTHON][SQL] Python UDFs with returnType=StringType should treat return values of datetime.date or datetime.datetime as unconvertible #20163
Conversation
python/pyspark/sql/udf.py
Outdated
# because the format of the string should be different, depending on the type of the input | ||
# object. So for those two specific types we eagerly convert them to string here, where the | ||
# Python type information is still intact. | ||
if returnType == StringType(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to handle when a python udf returns date
or datetime
but mark the return type as string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question, why we need to handle this type conversion? If we expect correct string format, isn't it more reasonable to convert the date/datetime to strings in the udf, instead of adding this conversion implicitly?
LGTM, cc @ueshin @icexelloss is this behavior consistent with pandas UDF? |
I think Scalar and Group map UDF expect pandas Series of datetime64[ns] (native pandas timestamp type) instead of a pandas Series of datetime.date and datetime.datetime object. I don't think it's necessary to have pandas UDF to work with a pandas Series of datetime.date or datetime.datetime object, as the standard type of timestamp is datetime64[ns] in pandas. |
@@ -120,10 +121,18 @@ object EvaluatePython { | |||
case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) | |||
|
|||
case (c: Int, DateType) => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, as a side note, I think we can make the converter for the type and then reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, separate change obviously.
python/pyspark/sql/udf.py
Outdated
def coerce_to_str(v): | ||
import datetime | ||
if type(v) == datetime.date or type(v) == datetime.datetime: | ||
return str(v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's weird that we have a cast here alone ... Can't we register a custom Pyrolite unpickler? Does it make the things more complicated?
Wait .. Isn't this because we failed to call |
Test build #85709 has finished for PR 20163 at commit
|
The problem here seems, spark/python/pyspark/sql/types.py Lines 170 to 171 in 1c9f95c
spark/python/pyspark/sql/types.py Lines 173 to 175 in 1c9f95c
which will be called via in spark/python/pyspark/worker.py Lines 70 to 74 in 64817c4
If the spark/python/pyspark/sql/types.py Lines 141 to 145 in 1c9f95c
spark/python/pyspark/sql/types.py Lines 76 to 82 in 1c9f95c
So, here: spark/python/pyspark/worker.py Lines 70 to 74 in 64817c4
we will send the return values as are without conversion, which ends up with For the fix in Python side in diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 146e673ae97..37137e02c08 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -144,6 +144,17 @@ class StringType(AtomicType):
__metaclass__ = DataTypeSingleton
+ def needConversion(self):
+ return True
+
+ def toInternal(self, v):
+ if v is not None:
+ return str(v)
+
+ def fromInternal(self, v):
+ if v is not None:
+ return str(v)
+ but then this will bring performance regression because I am less sure if this is something we should allow. Can we simply document this saying Please correct me if I missed anything. |
I ran some experiments:
This works correctly
Result:
The change that the PR proposes seem to be coercing python |
Thanks for all of your comments, @HyukjinKwon and @icexelloss ! All of what you guys mentioned are correct. Sorry for the mess, I actually got myself confused... Both @HyukjinKwon and @icexelloss correctly pointed out that the bug only happens when the A note on how I got here: the reason why my current PR (incorrectly) contained the cases for But then I realized I also needed to handle the case where I have to tell apart To address a point from @icexelloss :
static PyObject *
date_isoformat(PyDateTime_Date *self)
{
return PyUnicode_FromFormat("%04d-%02d-%02d",
GET_YEAR(self), GET_MONTH(self), GET_DAY(self));
} and // `SimpleDateFormat` is not thread-safe.
private val threadLocalDateFormat = new ThreadLocal[DateFormat] {
override def initialValue(): SimpleDateFormat = {
new SimpleDateFormat("yyyy-MM-dd", Locale.US)
}
} |
Hey @rednaxelafx that's fine. We all make mistake and I usually think it's generally better then not trying. I also made a mistake at the first time above. It was easier to debug this by reading your comments and all details in the PR description. Thank you.
So, few options might be ...
|
@ueshin @icexelloss @cloud-fan @rednaxelafx, which one would you prefer? To me, I like 1 at most. If the perf diff is trivial, 2. is also fine. If 3. works fine, I think I am also fine with it. |
} | ||
|
||
/** | ||
* Returns SQLTimestamp from java.util.Calendar (microseconds since epoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Matching the comment of fromJavaCalendarForDate.)
nit: Returns the number of microseconds since epoch from java.util.Calendar.
python/pyspark/sql/udf.py
Outdated
# because the format of the string should be different, depending on the type of the input | ||
# object. So for those two specific types we eagerly convert them to string here, where the | ||
# Python type information is still intact. | ||
if returnType == StringType(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question, why we need to handle this type conversion? If we expect correct string format, isn't it more reasonable to convert the date/datetime to strings in the udf, instead of adding this conversion implicitly?
@@ -120,10 +121,18 @@ object EvaluatePython { | |||
case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) | |||
|
|||
case (c: Int, DateType) => c | |||
// Pyrolite will unpickle a Python datetime.date to a java.util.Calendar | |||
case (c: Calendar, DateType) => DateTimeUtils.fromJavaCalendarForDate(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we will never hit this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems what we need is a case (c: Calendar, StringType) => ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think he did this in Python side because here we don't know if Calendar
is from datetime.date
or datetime.datetime
.
@cloud-fan, actually I have the similar question too - #20163 (comment). I tend to agree with it and I think we disallow this and document this. Just want to check if you feel strongly about this. If we need to support this, I believe the ways are 2. or 3. in #20163 (comment). |
The current behavior looks weird, we should either throw exception and ask users to give a corrected return type or fix it via proposal 2. |
@@ -120,10 +121,18 @@ object EvaluatePython { | |||
case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) | |||
|
|||
case (c: Int, DateType) => c | |||
// Pyrolite will unpickle a Python datetime.date to a java.util.Calendar | |||
case (c: Calendar, DateType) => DateTimeUtils.fromJavaCalendarForDate(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we return null
in this case? Other cases seems also returning null
if it fails to be converted:
>>> from pyspark.sql.functions import udf
>>> f = udf(lambda x: x, "double")
>>> spark.range(1).select(f("id")).show()
+------------+
|<lambda>(id)|
+------------+
| null|
+------------+
Seems we can do it like:
case StringType => (obj: Any) => nullSafeConvert(obj) {
case c: Calendar => null
case _ => UTF8String.fromString(obj.toString)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea it's consistent with other un-convertible cases, but StringType
is the default return type, I'm afraid many users many hit this and get confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Let's go ahead for 2. then. I am fine if it's done as an exception for practical purpose. Maybe we could add an if isinstance(.., basestring)
and return directly as a shortcut. I haven't checked the perf diff but I think we can do it easily via profile as I mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about ^ @ueshin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, 2. should work for StringType
.
I'd also like to add some documents like 1. for users to be careful about the return type. I've found that udf
s return null
and pandas_udf
s throw some exception in most case when the return type is mismatching.
Of course we can try to make the behavior differences between udf
and pandas_udf
closer as possible in the future, but I think it is the best effort basis for the mismatching return type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My 2 cents:
- I am +0 for 2. I think having type coercion for str type but not for other types can be confusing to users. Realistically, I think any udf users would hit the
null
result for mismatch type anyway (I have hit it so many times..) and will learn thatnull
means type mismatch. Even we make the behavior for str a bit friendly, they will likely to hit the issue with other types anyway. I don't think "default returnType for udf is str" is a strong reason for "having special type coercion for str", they seem orthogonal.
I prefer that we keep the type mismatch behavior consistent for str types vs other types (return null) and document this more clearly. - I find returning
null
for type mismatch is an unintuitive behavior of row-at-a-time udf and prefer not to replicate it in pandas udf.
I investigated the behavior differences between |
Probably we consider to catch and set nulls in pandas_udf if possible to match the behaviour with udf ... |
Given the above discussion, do we have consensus on all of the following:
I believe we all agree on the first point. The second point above is in line with @icexelloss 's opinion, which I tend to agree in terms of API semantic consistency. It might not be as user-friendly as Option 2 from @HyukjinKwon , but it's less magic and more consistent. I tend to find more consistency leads to less surprises. If we have consensus then I'll update the JIRA ticket and this PR to reflect that. |
SGTM |
One more SGTM |
+1 |
1 similar comment
+1 |
ca026d3
to
4c7bcc1
Compare
Test build #86000 has finished for PR 20163 at commit
|
…uld treat return values of datetime.date or datetime.datetime as unconvertible Add conversion to PySpark to mark Python UDFs that declared returnType=StringType() but actually returned a datatime.date or datetime.datetime as unconvertible, i.e. converting it to null. Also added a new unit test to pyspark/sql/tests.py to reflect current semantics of Python UDFs returning a value of mismatched type with the declared returnType.
4c7bcc1
to
d307cee
Compare
Test build #86003 has finished for PR 20163 at commit
|
jenkins retest this please |
Test build #86023 has finished for PR 20163 at commit
|
@@ -144,6 +145,7 @@ object EvaluatePython { | |||
} | |||
|
|||
case StringType => (obj: Any) => nullSafeConvert(obj) { | |||
case _: Calendar => null | |||
case _ => UTF8String.fromString(obj.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we blacklist more types? e.g. if a udf returns decimal and mark the return type as string type, is it a mismatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was pounding on that yesterday, too... somehow I have this feeling that no matter which direction we take, there's no good answer to type mismatch situations.
Let's say if we blacklist more types, should we document the list so that it's clear what will definitely NOT work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the perfectness, I think we should check all the types, https://github.com/irmen/Pyrolite,
PYTHON ----> JAVA
------ ----
None null
bool boolean
int int
long long or BigInteger (depending on size)
string String
unicode String
complex net.razorvine.pickle.objects.ComplexNumber
datetime.date java.util.Calendar
datetime.datetime java.util.Calendar
datetime.time net.razorvine.pickle.objects.Time
datetime.timedelta net.razorvine.pickle.objects.TimeDelta
float double (float isn't used)
array.array array of appropriate primitive type (char, int, short, long, float, double)
list java.util.List<Object>
tuple Object[]
set java.util.Set
dict java.util.Map
bytes byte[]
bytearray byte[]
decimal BigDecimal
custom class Map<String, Object> (dict with class attributes including its name in "__class__")
Pyro4.core.URI net.razorvine.pyro.PyroURI
Pyro4.core.Proxy net.razorvine.pyro.PyroProxy
Pyro4.errors.* net.razorvine.pyro.PyroException
Pyro4.utils.flame.FlameBuiltin net.razorvine.pyro.FlameBuiltin
Pyro4.utils.flame.FlameModule net.razorvine.pyro.FlameModule
Pyro4.utils.flame.RemoteInteractiveConsole net.razorvine.pyro.FlameRemoteConsole
and then check if the string conversion looks reasonably consistent by obj.toString
. If not, we add it in the blacklist.
Another possibility is to whitelist String
, but then I guess this is rather a radical change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check if the string conversion looks reasonably consistent by obj.toString. If not, we add it in the blacklist.
hmm, this seems weird as the type mismatch now is defined by Pyrolite object's toString
behavior...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, for now .. I think it's fine as a small fix as is ... We are going to document that the return type and return value should be matched anyway ..
So, expected return values will be (including dict
, list
, tuple
and array
):
spark/python/pyspark/sql/types.py
Lines 928 to 946 in 3e40eb3
# Mapping Python types to Spark SQL DataType | |
_type_mappings = { | |
type(None): NullType, | |
bool: BooleanType, | |
int: LongType, | |
float: DoubleType, | |
str: StringType, | |
bytearray: BinaryType, | |
decimal.Decimal: DecimalType, | |
datetime.date: DateType, | |
datetime.datetime: TimestampType, | |
datetime.time: TimestampType, | |
} | |
if sys.version < "3": | |
_type_mappings.update({ | |
unicode: StringType, | |
long: LongType, | |
}) |
Seems, we can also check if the string conversion looks reasonable and then blacklist net.razorvine.pickle.objects.Time
if not ...
How does this sound to you @cloud-fan and @rednaxelafx?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, seems there is another hole when we actually do the internal conversion with unexpected types:
>>> from pyspark.sql.functions import udf
>>> f = udf(lambda x: x, "date")
>>> spark.range(1).select(f("id")).show()
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "./python/pyspark/worker.py", line 229, in main
process()
File "./python/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "./python/pyspark/worker.py", line 149, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "./python/pyspark/worker.py", line 72, in <lambda>
return lambda *a: toInternal(f(*a))
File "/.../pyspark/sql/types.py", line 175, in toInternal
return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'int' object has no attribute 'toordinal'
another hole
>>> from pyspark.sql.functions import udf, struct
>>> f = udf(lambda x: x, "string")
>>> spark.range(1).select(f(struct("id"))).show()
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:86)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:85)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no perfect solution .. I think #20163 (comment) sounds good enough as a fix for this issue for now ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, how about something like this then?
case StringType => (obj: Any) => nullSafeConvert(obj) {
// Shortcut for string conversion
case c: String => UTF8String.fromString(c)
// Here, we return null for 'array', 'tuple', 'dict', 'list', 'datetime.datetime',
// 'datetime.date' and 'datetime.time' because those string conversions are
// not quite consistent with SQL string representation of data.
case _: java.util.Calendar | _: net.razorvine.pickle.objects.Time |
_: java.util.List[_] | _: java.util.Map[_, _] =>
null
case c if c.getClass.isArray => null
// Here, we keep the string conversion fall back for compatibility.
// TODO: We should revisit this and rewrite the type conversion logic in Spark 3.x.
case c => UTF8String.fromString(c.toString)
}
My few tests:
datetime.time
:
from pyspark.sql.functions import udf
from datetime import time
f = udf(lambda x: time(0, 0), "string")
spark.range(1).select(f("id")).show()
+--------------------+
| <lambda>(id)|
+--------------------+
|Time: 0 hours, 0 ...|
+--------------------+
array
:
from pyspark.sql.functions import udf
import array
f = udf(lambda x: array.array("c", "aaa"), "string")
spark.range(1).select(f("id")).show()
+------------+
|<lambda>(id)|
+------------+
| [C@11618d9e|
+------------+
tuple
:
from pyspark.sql.functions import udf
f = udf(lambda x: (x,), "string")
spark.range(1).select(f("id")).show()
+--------------------+
| <lambda>(id)|
+--------------------+
|[Ljava.lang.Objec...|
+--------------------+
list
:
from pyspark.sql.functions import udf
from datetime import datetime
f = udf(lambda x: [datetime(1990, 1, 1)], "string")
spark.range(1).select(f("id")).show()
+--------------------+
| <lambda>(id)|
+--------------------+
|[java.util.Gregor...|
+--------------------+
dict
:
from pyspark.sql.functions import udf
from datetime import datetime
f = udf(lambda x: {1: datetime(1990, 1, 1)}, "string")
spark.range(1).select(f("id")).show()
+--------------------+
| <lambda>(id)|
+--------------------+
|{1=java.util.Greg...|
+--------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, the array case seems a bit weird?
…hon data and SQL types in normal UDFs ### What changes were proposed in this pull request? We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well). It's even difficult to identify the problems (see #20163 and #22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. ```python import sys import array import datetime from decimal import Decimal from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql.functions import udf if sys.version >= '3': long = int data = [ None, True, 1, long(1), "a", u"a", datetime.date(1970, 1, 1), datetime.datetime(1970, 1, 1, 0, 0), 1.0, array.array("i", [1]), [1], (1,), bytearray([65, 66, 67]), Decimal(1), {"a": 1}, Row(kwargs=1), Row("namedtuple")(1), ] types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), StringType(), DateType(), TimestampType(), FloatType(), DoubleType(), ArrayType(IntegerType()), BinaryType(), DecimalType(10, 0), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), ] df = spark.range(1) results = [] count = 0 total = len(types) * len(data) spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for v in data: try: row = df.select(udf(lambda: v, t)()).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Python Value: [%s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), str(v), type(v).__name__, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data)) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This table was generated under Python 2 but the code above is Python 3 compatible as well. ## How was this patch tested? Manually tested and lint check. Closes #22655 from HyukjinKwon/SPARK-25666. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
…das data and SQL types in Pandas UDFs ## What changes were proposed in this pull request? We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs. It's even difficult to identify the problems (see #20163 and #22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. Table can be generated via the codes below: ```python from pyspark.sql.types import * from pyspark.sql.functions import pandas_udf columns = [ ('none', 'object(NoneType)'), ('bool', 'bool'), ('int8', 'int8'), ('int16', 'int16'), ('int32', 'int32'), ('int64', 'int64'), ('uint8', 'uint8'), ('uint16', 'uint16'), ('uint32', 'uint32'), ('uint64', 'uint64'), ('float64', 'float16'), ('float64', 'float32'), ('float64', 'float64'), ('date', 'datetime64[ns]'), ('tz_aware_dates', 'datetime64[ns, US/Eastern]'), ('string', 'object(string)'), ('decimal', 'object(Decimal)'), ('array', 'object(array[int32])'), ('float128', 'float128'), ('complex64', 'complex64'), ('complex128', 'complex128'), ('category', 'category'), ('tdeltas', 'timedelta64[ns]'), ] def create_dataframe(): import pandas as pd import numpy as np import decimal pdf = pd.DataFrame({ 'none': [None, None], 'bool': [True, False], 'int8': np.arange(1, 3).astype('int8'), 'int16': np.arange(1, 3).astype('int16'), 'int32': np.arange(1, 3).astype('int32'), 'int64': np.arange(1, 3).astype('int64'), 'uint8': np.arange(1, 3).astype('uint8'), 'uint16': np.arange(1, 3).astype('uint16'), 'uint32': np.arange(1, 3).astype('uint32'), 'uint64': np.arange(1, 3).astype('uint64'), 'float16': np.arange(1, 3).astype('float16'), 'float32': np.arange(1, 3).astype('float32'), 'float64': np.arange(1, 3).astype('float64'), 'float128': np.arange(1, 3).astype('float128'), 'complex64': np.arange(1, 3).astype('complex64'), 'complex128': np.arange(1, 3).astype('complex128'), 'string': list('ab'), 'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]), 'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]), 'date': pd.date_range('19700101', periods=2).values, 'category': pd.Series(list("AB")).astype('category')}) pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]] pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern') return pdf types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), FloatType(), DoubleType(), DateType(), TimestampType(), StringType(), DecimalType(10, 0), ArrayType(IntegerType()), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), BinaryType(), ] df = spark.range(2).repartition(1) results = [] count = 0 total = len(types) * len(columns) values = [] spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for column, pandas_t in columns: v = create_dataframe()[column][0] values.append(v) try: row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Pandas Value(Type): %s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), v, pandas_t, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns))) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This code is compatible with both Python 2 and 3 but the table was generated under Python 2. ## How was this patch tested? Manually tested and lint check. Closes #22795 from HyukjinKwon/SPARK-25798. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
Let's leave this closed. |
…hon data and SQL types in normal UDFs ### What changes were proposed in this pull request? We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well). It's even difficult to identify the problems (see apache#20163 and apache#22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. ```python import sys import array import datetime from decimal import Decimal from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql.functions import udf if sys.version >= '3': long = int data = [ None, True, 1, long(1), "a", u"a", datetime.date(1970, 1, 1), datetime.datetime(1970, 1, 1, 0, 0), 1.0, array.array("i", [1]), [1], (1,), bytearray([65, 66, 67]), Decimal(1), {"a": 1}, Row(kwargs=1), Row("namedtuple")(1), ] types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), StringType(), DateType(), TimestampType(), FloatType(), DoubleType(), ArrayType(IntegerType()), BinaryType(), DecimalType(10, 0), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), ] df = spark.range(1) results = [] count = 0 total = len(types) * len(data) spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for v in data: try: row = df.select(udf(lambda: v, t)()).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Python Value: [%s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), str(v), type(v).__name__, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data)) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This table was generated under Python 2 but the code above is Python 3 compatible as well. ## How was this patch tested? Manually tested and lint check. Closes apache#22655 from HyukjinKwon/SPARK-25666. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
…das data and SQL types in Pandas UDFs ## What changes were proposed in this pull request? We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs. It's even difficult to identify the problems (see apache#20163 and apache#22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. Table can be generated via the codes below: ```python from pyspark.sql.types import * from pyspark.sql.functions import pandas_udf columns = [ ('none', 'object(NoneType)'), ('bool', 'bool'), ('int8', 'int8'), ('int16', 'int16'), ('int32', 'int32'), ('int64', 'int64'), ('uint8', 'uint8'), ('uint16', 'uint16'), ('uint32', 'uint32'), ('uint64', 'uint64'), ('float64', 'float16'), ('float64', 'float32'), ('float64', 'float64'), ('date', 'datetime64[ns]'), ('tz_aware_dates', 'datetime64[ns, US/Eastern]'), ('string', 'object(string)'), ('decimal', 'object(Decimal)'), ('array', 'object(array[int32])'), ('float128', 'float128'), ('complex64', 'complex64'), ('complex128', 'complex128'), ('category', 'category'), ('tdeltas', 'timedelta64[ns]'), ] def create_dataframe(): import pandas as pd import numpy as np import decimal pdf = pd.DataFrame({ 'none': [None, None], 'bool': [True, False], 'int8': np.arange(1, 3).astype('int8'), 'int16': np.arange(1, 3).astype('int16'), 'int32': np.arange(1, 3).astype('int32'), 'int64': np.arange(1, 3).astype('int64'), 'uint8': np.arange(1, 3).astype('uint8'), 'uint16': np.arange(1, 3).astype('uint16'), 'uint32': np.arange(1, 3).astype('uint32'), 'uint64': np.arange(1, 3).astype('uint64'), 'float16': np.arange(1, 3).astype('float16'), 'float32': np.arange(1, 3).astype('float32'), 'float64': np.arange(1, 3).astype('float64'), 'float128': np.arange(1, 3).astype('float128'), 'complex64': np.arange(1, 3).astype('complex64'), 'complex128': np.arange(1, 3).astype('complex128'), 'string': list('ab'), 'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]), 'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]), 'date': pd.date_range('19700101', periods=2).values, 'category': pd.Series(list("AB")).astype('category')}) pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]] pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern') return pdf types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), FloatType(), DoubleType(), DateType(), TimestampType(), StringType(), DecimalType(10, 0), ArrayType(IntegerType()), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), BinaryType(), ] df = spark.range(2).repartition(1) results = [] count = 0 total = len(types) * len(columns) values = [] spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for column, pandas_t in columns: v = create_dataframe()[column][0] values.append(v) try: row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Pandas Value(Type): %s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), v, pandas_t, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns))) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This code is compatible with both Python 2 and 3 but the table was generated under Python 2. ## How was this patch tested? Manually tested and lint check. Closes apache#22795 from HyukjinKwon/SPARK-25798. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
What changes were proposed in this pull request?
Perform appropriate conversions for results coming from Python UDFs that return
datetime.date
ordatetime.datetime
.Before this PR, Pyrolite would unpickle both
datetime.date
anddatetime.datetime
into ajava.util.Calendar
, which Spark SQL doesn't understand, which then leads to incorrect results. An example of such incorrect result is:After this PR, the same query above would give correct results:
An explicit non-goal of this PR is to change the behavior of timezone awareness or timezone settings of
datetime.datetime
objects collected from aDataFrame
.Currently PySpark always returns such
datetime.datetime
objects as timezone unaware (naive) ones that respect Python's current local timezone (#19607 changed the default behavior for Pandas support but not for plaincollect()
). This PR does not change that behavior.How was this patch tested?
Added some unit tests to
pyspark.sql.tests
for such UDFs, so thatdatetime.date
->StringType
datetime.date
->DateType
datetime.datetime
->StringType
datetime.datetime
->TimestampType
datetime.datetime
with non-default timezonedatetime.datetime
with null timezone (naive datetime)cases are covered.