Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23931][SQL] Adds arrays_zip function to sparksql #21045

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
7bf45dd
Adds zip function to sparksql
DylanGuedes Apr 11, 2018
99848fe
Changes zip construction
DylanGuedes Apr 13, 2018
27b0bc2
Changes tests and uses builtin namespace in pyspark
DylanGuedes Apr 13, 2018
93826b6
fixes examples string and uses struct instead of arrays
DylanGuedes Apr 26, 2018
a7e29f6
working pyspark zip_lists
DylanGuedes May 11, 2018
7130fec
Fixes java version when arrays have different lengths
DylanGuedes May 11, 2018
d552216
remove unused variables
DylanGuedes May 11, 2018
1fecef4
rename zip_lists to zip
DylanGuedes May 11, 2018
f71151a
adds expression tests and uses strip margin syntax
DylanGuedes May 12, 2018
6b4bc94
Adds variable number of inputs to zip function
DylanGuedes May 15, 2018
1549928
uses foldleft instead of while for iterating
DylanGuedes May 15, 2018
9f7bba1
rewritten some notation
DylanGuedes May 16, 2018
3ba2b4f
fix dogencode generation
DylanGuedes May 17, 2018
3a59201
Adds new tests, uses lazy val and split calls
DylanGuedes May 17, 2018
6462fa8
uses splitFunction
DylanGuedes May 17, 2018
8b1eb7c
move arraytypes to private member
DylanGuedes May 18, 2018
2bfba80
adds binary and array of array tests
DylanGuedes May 18, 2018
c3b062c
uses stored array types names
DylanGuedes May 18, 2018
d9b95c4
split input function using ctxsplitexpression
DylanGuedes May 18, 2018
26bbf66
uses splitexpression for inputs
DylanGuedes May 19, 2018
d9ad04d
Refactor cases, add new tests with empty seq, check size of array
DylanGuedes May 22, 2018
f29ee1c
Check empty seq as input
DylanGuedes May 22, 2018
c58d09c
Uses switch instead of if
DylanGuedes May 23, 2018
38fa996
refactor switch and else methods
DylanGuedes May 23, 2018
5b3066b
uses if instead of switch
DylanGuedes May 30, 2018
759a4d4
Not using storedarrtype anymore
DylanGuedes Jun 4, 2018
68e69db
split between empty and nonempty codegen
DylanGuedes Jun 4, 2018
12b3835
remove ternary if
DylanGuedes Jun 4, 2018
643cb9b
Fixes null values evaluation and adds back tests
DylanGuedes Jun 4, 2018
5876082
move to else
DylanGuedes Jun 4, 2018
0223960
remove unused lines
DylanGuedes Jun 4, 2018
2b88387
use zip alias
DylanGuedes Jun 5, 2018
bbc20ee
using same docs for all apis
DylanGuedes Jun 8, 2018
8d3a838
adds transient to method
DylanGuedes Jun 8, 2018
d8f3dea
rename zip function to arrays_zip
DylanGuedes Jun 10, 2018
3d68ea9
adds pretty_name for arrays_zip
DylanGuedes Jun 11, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2394,6 +2394,23 @@ def array_repeat(col, count):
return Column(sc._jvm.functions.array_repeat(_to_java_column(col), count))


@since(2.4)
def arrays_zip(*cols):
"""
Collection function: Returns a merged array of structs in which the N-th struct contains all
N-th values of input arrays.

:param cols: columns of arrays to be merged.

>>> from pyspark.sql.functions import arrays_zip
>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
[Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))


# ---------------------------- User Defined Function ----------------------------------

class PandasUDFType(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ object FunctionRegistry {
expression[Size]("size"),
expression[Slice]("slice"),
expression[Size]("cardinality"),
expression[ArraysZip]("arrays_zip"),
expression[SortArray]("sort_array"),
expression[ArrayMin]("array_min"),
expression[ArrayMax]("array_max"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,170 @@ case class MapKeys(child: Expression)
override def prettyName: String = "map_keys"
}

@ExpressionDescription(
usage = """
_FUNC_(a1, a2, ...) - Returns a merged array of structs in which the N-th struct contains all
N-th values of input arrays.
""",
examples = """
Examples:
> SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
[[1, 2], [2, 3], [3, 4]]
> SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
[[1, 2, 3], [2, 3, 4]]
""",
since = "2.4.0")
case class ArraysZip(children: Seq[Expression]) extends Expression with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType)

override def dataType: DataType = ArrayType(mountSchema)
Copy link
Contributor

@mn-mikke mn-mikke Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick follow up question... Under what circumstances can the output array contain null elements? Shouldn't the output dataType be ArrayType(mountSchema, false)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first test case I zipped Seq(9001, 9002, 9003, null) with Seq(null, 1L, null, 4L, 11L), and expected the result to be Seq(Seq(9001, null), Seq(9002, 1L), ..., Seq(null, 11L)), for instance.

I tried to define the nullability (this word exist? haha) of the output in runtime, but I thought that it was not possible since I can't eval every result before defining the schema.

What you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that fields of the nested struct can be null, but can you give me an example of the input that would lead to something like Seq(null, Seq(9002, 1L))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm you are correct then, I don't think that such scenario could happen (correctly, at least). That means that the dataType should always reject null values?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems like the struct which is the element of the array is not null, so the data type would be ArrayType(mountSchema, containsNull = false).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will fix it as a part of #21352.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the struct can be null if any of the input element is null IIUC. So probably ArrayType(mountSchema, containsNull = children.exists(_nullable))?

Copy link
Member

@ueshin ueshin Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, the array itself will be null and def nullable is already children.exists(_nullable).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right, sorry!


override def nullable: Boolean = children.exists(_.nullable)

private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType])

private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have more than one arrayElementTypes?


@transient private lazy val mountSchema: StructType = {
val fields = children.zip(arrayElementTypes).zipWithIndex.map {
case ((expr: NamedExpression, elementType), _) =>
StructField(expr.name, elementType, nullable = true)
case ((_, elementType), idx) =>
StructField(idx.toString, elementType, nullable = true)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make List explicitly? How about:

val fields = arrayTypes.zipWithIndex.map { case (arr, idx) =>
  StructField( ... )
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

val fields = children.zip(arrayElementTypes).zipWithIndex.map {
  case ((expr: NamedExpression, elementType), _) =>
    StructField(expr.name, elementType, nullable = true)
  case ((_, elementType), idx) =>
    StructField(s"$idx", elementType, nullable = true)
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Way better, thanks!

StructType(fields)
}

@transient lazy val numberOfArrays: Int = children.length

@transient lazy val genericArrayData = classOf[GenericArrayData].getName

def emptyInputGenCode(ev: ExprCode): ExprCode = {
ev.copy(code"""
|${CodeGenerator.javaType(dataType)} ${ev.value} = new $genericArrayData(new Object[0]);
|boolean ${ev.isNull} = false;
""".stripMargin)
}

def nonEmptyInputGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val genericInternalRow = classOf[GenericInternalRow].getName
val arrVals = ctx.freshName("arrVals")
val biggestCardinality = ctx.freshName("biggestCardinality")

val currentRow = ctx.freshName("currentRow")
val j = ctx.freshName("j")
val i = ctx.freshName("i")
val args = ctx.freshName("args")

val evals = children.map(_.genCode(ctx))
val getValuesAndCardinalities = evals.zipWithIndex.map { case (eval, index) =>
s"""
|if ($biggestCardinality != -1) {
| ${eval.code}
| if (!${eval.isNull}) {
| $arrVals[$index] = ${eval.value};
| $biggestCardinality = Math.max($biggestCardinality, ${eval.value}.numElements());
| } else {
| $biggestCardinality = -1;
| }
|}
""".stripMargin
}

val splittedGetValuesAndCardinalities = ctx.splitExpressions(
expressions = getValuesAndCardinalities,
funcName = "getValuesAndCardinalities",
returnType = "int",
makeSplitFunction = body =>
s"""
|$body
|return $biggestCardinality;
""".stripMargin,
foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"),
arguments =
("ArrayData[]", arrVals) ::
("int", biggestCardinality) :: Nil)

val getValueForType = arrayElementTypes.zipWithIndex.map { case (eleType, idx) =>
val g = CodeGenerator.getValue(s"$arrVals[$idx]", eleType, i)
s"""
|if ($i < $arrVals[$idx].numElements() && !$arrVals[$idx].isNullAt($i)) {
| $currentRow[$idx] = $g;
|} else {
| $currentRow[$idx] = null;
|}
""".stripMargin
}

val getValueForTypeSplitted = ctx.splitExpressions(
expressions = getValueForType,
funcName = "extractValue",
arguments =
("int", i) ::
("Object[]", currentRow) ::
("ArrayData[]", arrVals) :: Nil)

val initVariables = s"""
|ArrayData[] $arrVals = new ArrayData[$numberOfArrays];
|int $biggestCardinality = 0;
|${CodeGenerator.javaType(dataType)} ${ev.value} = null;
""".stripMargin

ev.copy(code"""
|$initVariables
|$splittedGetValuesAndCardinalities
|boolean ${ev.isNull} = $biggestCardinality == -1;
|if (!${ev.isNull}) {
| Object[] $args = new Object[$biggestCardinality];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!${ev.isNull}) {
   Object[] $args = ...

We usually don't set a value if the result is null.

| for (int $i = 0; $i < $biggestCardinality; $i ++) {
| Object[] $currentRow = new Object[$numberOfArrays];
| $getValueForTypeSplitted
| $args[$i] = new $genericInternalRow($currentRow);
| }
| ${ev.value} = new $genericArrayData($args);
|}
""".stripMargin)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
if (numberOfArrays == 0) {
emptyInputGenCode(ev)
} else {
nonEmptyInputGenCode(ctx, ev)
}
}

override def eval(input: InternalRow): Any = {
val inputArrays = children.map(_.eval(input).asInstanceOf[ArrayData])
if (inputArrays.contains(null)) {
null
} else {
val biggestCardinality = if (inputArrays.isEmpty) {
0
} else {
inputArrays.map(_.numElements()).max
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can compute the biggestCardinality as:

val biggestCardinality = if (inputArrays.isEmpty) {
  0
} else {
  inputArrays.map(_.numElements()).max
}


val result = new Array[InternalRow](biggestCardinality)
val zippedArrs: Seq[(ArrayData, Int)] = inputArrays.zipWithIndex

for (i <- 0 until biggestCardinality) {
val currentLayer: Seq[Object] = zippedArrs.map { case (arr, index) =>
if (i < arr.numElements() && !arr.isNullAt(i)) {
arr.get(i, arrayElementTypes(index))
} else {
null
}
}

result(i) = InternalRow.apply(currentLayer: _*)
}
new GenericArrayData(result)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need override def prettyName: String = "arrays_zip".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}

/**
* Returns an unordered array containing the values of the map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -315,6 +316,91 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
Some(Literal.create(null, StringType))), null)
}

test("ArraysZip") {
val literals = Seq(
Literal.create(Seq(9001, 9002, 9003, null), ArrayType(IntegerType)),
Literal.create(Seq(null, 1L, null, 4L, 11L), ArrayType(LongType)),
Literal.create(Seq(-1, -3, 900, null), ArrayType(IntegerType)),
Literal.create(Seq("a", null, "c"), ArrayType(StringType)),
Literal.create(Seq(null, false, true), ArrayType(BooleanType)),
Literal.create(Seq(1.1, null, 1.3, null), ArrayType(DoubleType)),
Literal.create(Seq(), ArrayType(NullType)),
Literal.create(Seq(null), ArrayType(NullType)),
Literal.create(Seq(192.toByte), ArrayType(ByteType)),
Literal.create(
Seq(Seq(1, 2, 3), null, Seq(4, 5), Seq(1, null, 3)), ArrayType(ArrayType(IntegerType))),
Literal.create(Seq(Array[Byte](1.toByte, 5.toByte)), ArrayType(BinaryType))
)

checkEvaluation(ArraysZip(Seq(literals(0), literals(1))),
List(Row(9001, null), Row(9002, 1L), Row(9003, null), Row(null, 4L), Row(null, 11L)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why do you use List here and the following tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that at some point I was using Seq but the tests were not passing due to it being eval'd to WrapperArray[something], while with Lists it is being eval'd as [something].
Should I stick with Seq?


checkEvaluation(ArraysZip(Seq(literals(0), literals(2))),
List(Row(9001, -1), Row(9002, -3), Row(9003, 900), Row(null, null)))

checkEvaluation(ArraysZip(Seq(literals(0), literals(3))),
List(Row(9001, "a"), Row(9002, null), Row(9003, "c"), Row(null, null)))

checkEvaluation(ArraysZip(Seq(literals(0), literals(4))),
List(Row(9001, null), Row(9002, false), Row(9003, true), Row(null, null)))

checkEvaluation(ArraysZip(Seq(literals(0), literals(5))),
List(Row(9001, 1.1), Row(9002, null), Row(9003, 1.3), Row(null, null)))

checkEvaluation(ArraysZip(Seq(literals(0), literals(6))),
List(Row(9001, null), Row(9002, null), Row(9003, null), Row(null, null)))

checkEvaluation(ArraysZip(Seq(literals(0), literals(7))),
List(Row(9001, null), Row(9002, null), Row(9003, null), Row(null, null)))

checkEvaluation(ArraysZip(Seq(literals(0), literals(1), literals(2), literals(3))),
List(
Row(9001, null, -1, "a"),
Row(9002, 1L, -3, null),
Row(9003, null, 900, "c"),
Row(null, 4L, null, null),
Row(null, 11L, null, null)))

checkEvaluation(ArraysZip(Seq(literals(4), literals(5), literals(6), literals(7), literals(8))),
List(
Row(null, 1.1, null, null, 192.toByte),
Row(false, null, null, null, null),
Row(true, 1.3, null, null, null),
Row(null, null, null, null, null)))

checkEvaluation(ArraysZip(Seq(literals(9), literals(0))),
List(
Row(List(1, 2, 3), 9001),
Row(null, 9002),
Row(List(4, 5), 9003),
Row(List(1, null, 3), null)))

checkEvaluation(ArraysZip(Seq(literals(7), literals(10))),
List(Row(null, Array[Byte](1.toByte, 5.toByte))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some tests with many input arrays? eg.100 or 1000 if needed in order to force the splitExpression to actually split the generated code in order to have test coverage for that? you can check if splitExpression is splitting the code in debug mode to be sure that it happens. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and looks like the functions are being correctly splitted.


val longLiteral =
Literal.create((0 to 1000).toSeq, ArrayType(IntegerType))

checkEvaluation(ArraysZip(Seq(literals(0), longLiteral)),
List(Row(9001, 0), Row(9002, 1), Row(9003, 2)) ++
(3 to 1000).map { Row(null, _) }.toList)

val manyLiterals = (0 to 1000).map { _ =>
Literal.create(Seq(1), ArrayType(IntegerType))
}.toSeq

val numbers = List(
Row(Seq(9001) ++ (0 to 1000).map { _ => 1 }.toSeq: _*),
Row(Seq(9002) ++ (0 to 1000).map { _ => null }.toSeq: _*),
Row(Seq(9003) ++ (0 to 1000).map { _ => null }.toSeq: _*),
Row(Seq(null) ++ (0 to 1000).map { _ => null }.toSeq: _*))
checkEvaluation(ArraysZip(Seq(literals(0)) ++ manyLiterals),
List(numbers(0), numbers(1), numbers(2), numbers(3)))

checkEvaluation(ArraysZip(Seq(literals(0), Literal.create(null, ArrayType(IntegerType)))), null)
checkEvaluation(ArraysZip(Seq()), List())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a case for something like Zip(Seq(null, literals(0))) or Zip(Seq(literals(0), null))?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what if Zip(Seq())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to test the case with Seq(null, literals(0)), but it breaks before reaching checkEvalWithCodeGen/withoutCodegen (looks like when you iterate in a sequence with a null element it explodes). I checked and I don't saw any tests that accept Seq[Expression] testing this case (for instance: Concat turn null inputs in Literals with null values).

Should I change something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but looks like I can't create an empty genericarray. I'm pushing a code with the tests that cover these scenarios commented so maybe anyone could give me suggestions while I look for another solution.


test("Array Min") {
checkEvaluation(ArrayMin(Literal.create(Seq(-11, 10, 2), ArrayType(IntegerType))), -11)
checkEvaluation(
Expand Down
8 changes: 8 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3508,6 +3508,14 @@ object functions {
*/
def map_entries(e: Column): Column = withExpr { MapEntries(e.expr) }

/**
* Returns a merged array of structs in which the N-th struct contains all N-th values of input
* arrays.
* @group collection_funcs
* @since 2.4.0
*/
def arrays_zip(e: Column*): Column = withExpr { ArraysZip(e.map(_.expr)) }

//////////////////////////////////////////////////////////////////////////////////////////////
// Mask functions
//////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,53 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
)
}

test("dataframe arrays_zip function") {
val df1 = Seq((Seq(9001, 9002, 9003), Seq(4, 5, 6))).toDF("val1", "val2")
val df2 = Seq((Seq("a", "b"), Seq(true, false), Seq(10, 11))).toDF("val1", "val2", "val3")
val df3 = Seq((Seq("a", "b"), Seq(4, 5, 6))).toDF("val1", "val2")
val df4 = Seq((Seq("a", "b", null), Seq(4L))).toDF("val1", "val2")
val df5 = Seq((Seq(-1), Seq(null), Seq(), Seq(null, null))).toDF("val1", "val2", "val3", "val4")
val df6 = Seq((Seq(192.toByte, 256.toByte), Seq(1.1), Seq(), Seq(null, null)))
.toDF("v1", "v2", "v3", "v4")
val df7 = Seq((Seq(Seq(1, 2, 3), Seq(4, 5)), Seq(1.1, 2.2))).toDF("v1", "v2")
val df8 = Seq((Seq(Array[Byte](1.toByte, 5.toByte)), Seq(null))).toDF("v1", "v2")

val expectedValue1 = Row(Seq(Row(9001, 4), Row(9002, 5), Row(9003, 6)))
checkAnswer(df1.select(arrays_zip($"val1", $"val2")), expectedValue1)
checkAnswer(df1.selectExpr("arrays_zip(val1, val2)"), expectedValue1)

val expectedValue2 = Row(Seq(Row("a", true, 10), Row("b", false, 11)))
checkAnswer(df2.select(arrays_zip($"val1", $"val2", $"val3")), expectedValue2)
checkAnswer(df2.selectExpr("arrays_zip(val1, val2, val3)"), expectedValue2)

val expectedValue3 = Row(Seq(Row("a", 4), Row("b", 5), Row(null, 6)))
checkAnswer(df3.select(arrays_zip($"val1", $"val2")), expectedValue3)
checkAnswer(df3.selectExpr("arrays_zip(val1, val2)"), expectedValue3)

val expectedValue4 = Row(Seq(Row("a", 4L), Row("b", null), Row(null, null)))
checkAnswer(df4.select(arrays_zip($"val1", $"val2")), expectedValue4)
checkAnswer(df4.selectExpr("arrays_zip(val1, val2)"), expectedValue4)

val expectedValue5 = Row(Seq(Row(-1, null, null, null), Row(null, null, null, null)))
checkAnswer(df5.select(arrays_zip($"val1", $"val2", $"val3", $"val4")), expectedValue5)
checkAnswer(df5.selectExpr("arrays_zip(val1, val2, val3, val4)"), expectedValue5)

val expectedValue6 = Row(Seq(
Row(192.toByte, 1.1, null, null), Row(256.toByte, null, null, null)))
checkAnswer(df6.select(arrays_zip($"v1", $"v2", $"v3", $"v4")), expectedValue6)
checkAnswer(df6.selectExpr("arrays_zip(v1, v2, v3, v4)"), expectedValue6)

val expectedValue7 = Row(Seq(
Row(Seq(1, 2, 3), 1.1), Row(Seq(4, 5), 2.2)))
checkAnswer(df7.select(arrays_zip($"v1", $"v2")), expectedValue7)
checkAnswer(df7.selectExpr("arrays_zip(v1, v2)"), expectedValue7)

val expectedValue8 = Row(Seq(
Row(Array[Byte](1.toByte, 5.toByte), null)))
checkAnswer(df8.select(arrays_zip($"v1", $"v2")), expectedValue8)
checkAnswer(df8.selectExpr("arrays_zip(v1, v2)"), expectedValue8)
}

test("map size function") {
val df = Seq(
(Map[Int, Int](1 -> 1, 2 -> 2), "x"),
Expand Down