Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39760][PYTHON] Support Varchar in PySpark #37173

Closed
wants to merge 6 commits into from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Jul 13, 2022

What changes were proposed in this pull request?

Support Varchar in PySpark

Why are the changes needed?

function parity

Does this PR introduce any user-facing change?

yes, new datatype supported

How was this patch tested?

1, added UT;
2, manually check against the scala side:

In [1]: from pyspark.sql.types import *
   ...: from pyspark.sql.functions import *
   ...: 
   ...: df = spark.createDataFrame([(1,), (11,)], ["value"])
   ...: ret = df.select(col("value").cast(VarcharType(10))).collect()
   ...: 
22/07/13 17:17:07 WARN CharVarcharUtils: The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion. Otherwise, you can set spark.sql.legacy.charVarcharAsString to true, so that Spark treat them as string type as same as Spark 3.0 and earlier
                                                                                
In [2]: 

In [2]: schema = StructType([StructField("a", IntegerType(), True), (StructField("v", VarcharType(10), True))])
   ...: description = "this a table created via Catalog.createTable()"
   ...: table = spark.catalog.createTable("tab3_via_catalog", schema=schema, description=description)
   ...: table.schema
   ...: 
Out[2]: StructType([StructField('a', IntegerType(), True), StructField('v', StringType(), True)])
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = spark.range(0, 10).selectExpr(" id AS value")
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> val ret = df.select(col("value").cast(VarcharType(10))).collect()
22/07/13 17:28:56 WARN CharVarcharUtils: The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion. Otherwise, you can set spark.sql.legacy.charVarcharAsString to true, so that Spark treat them as string type as same as Spark 3.0 and earlier
ret: Array[org.apache.spark.sql.Row] = Array([0], [1], [2], [3], [4], [5], [6], [7], [8], [9])

scala> 

scala> val schema = StructType(StructField("a", IntegerType, true) :: (StructField("v", VarcharType(10), true) :: Nil))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true),StructField(v,VarcharType(10),true))

scala> val description = "this a table created via Catalog.createTable()"
description: String = this a table created via Catalog.createTable()

scala> val table = spark.catalog.createTable("tab3_via_catalog", source="json", schema=schema, description=description, options=Map.empty[String, String])
table: org.apache.spark.sql.DataFrame = [a: int, v: string]

scala> table.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true),StructField(v,StringType,true))

@zhengruifeng zhengruifeng changed the title [SPARK-39760][PYTHON][WIP] Support Varchar in PySpark [SPARK-39760][PYTHON] Support Varchar in PySpark Jul 14, 2022
@zhengruifeng zhengruifeng marked this pull request as ready for review July 14, 2022 05:15
@zhengruifeng
Copy link
Contributor Author

gentle ping @HyukjinKwon @cloud-fan

Parameters
----------
length : int
the length limitation. Data writing will fail if the input
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent?

@@ -1659,8 +1698,8 @@ def verify_acceptable_types(obj: Any) -> None:
new_msg("%s can not accept object %r in type %s" % (dataType, obj, type(obj)))
)

if isinstance(dataType, StringType):
# StringType can work with any types
if isinstance(dataType, StringType) or isinstance(dataType, VarcharType):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isinstance(dataType, (StringType, VarcharType))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@@ -181,6 +182,29 @@ class StringType(AtomicType, metaclass=DataTypeSingleton):
pass


class VarcharType(AtomicType):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metaclass=DataTypeSingleton?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I firstly tried this, but it cause initialization error due to the __call__ in DataTypeSingleton:
a type mixin with DataTypeSingleton should support constructor without parameters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Let me follow up with a parametric DataTypeSingleton then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.assertTrue(v2 is not v1)
self.assertNotEqual(v1, v2)
v3 = VarcharType(10)
self.assertEqual(v1, v3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check if v1 is v3?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v1 is v3 should be True after parametric singleton is introduced. I will adjust that together in the followup.

@xinrong-meng
Copy link
Member

LGTM! Thanks!

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to fix the codes when spark.sql.execution.arrow.pyspark.enabled is enabled, and Py4J + Python UDFs too. But we can do it separately. See also https://issues.apache.org/jira/browse/SPARK-37275

@zhengruifeng
Copy link
Contributor Author

Merged to master, thank you all!

@HyukjinKwon I will send followup PR for arrow/py4j/udf, thanks for pointing out it!

@zhengruifeng zhengruifeng deleted the py_add_varchar branch July 18, 2022 07:58
@cloud-fan
Copy link
Contributor

late LGTM, do we want to support CharType in pyspark?

@zhengruifeng
Copy link
Contributor Author

@cloud-fan I think so, let me add it in near future.

@HyukjinKwon Btw, I guess we may not need to add extra support for pyarrow or py4j+python UDF, because it seems that there is not a class for char/varchar instances in scala or python (pandas/numpy/built-in), and they are treated as string internally.

There is also a warning message if we want to cast to char/varchar, in CharVarcharUtils
The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants