-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23938][SQL] Add map_zip_with function #22017
Conversation
Test build #94325 has finished for PR 22017 at commit
|
} | ||
} | ||
|
||
private def getMapType(expr: Expression) = expr.dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like you to use the same util method. I suggested to introduce object HigherOrderFunction
at #21986 (comment).
nullable2: Boolean, | ||
dt3: DataType, | ||
nullable3: Boolean, | ||
f: (Expression, Expression, Expression) => Expression): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
case BinaryType => false | ||
case _: AtomicType => true | ||
case _ => false | ||
@transient protected lazy val elementTypeSupportEquals = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can avoid the braces
val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray()) | ||
val values = new GenericArrayData(new Array[Any](keys.numElements())) | ||
keys.foreach(keyType, (idx: Int, key: Any) => { | ||
val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach is very inefficient. The computational complexity is very high (N^2 in the size of the biggest map). I think here can implement something more efficient avoid also the changes for the code refactoring. I'd propose to get also the index where a key has been found in each map, so that we can access the values by index. In this way the overall complexity would be O(N).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for mentioning this! I'm not happy with the current complexity either. I've assumed that the implementation of maps will change into something with O(1) element access in future. By then, the complexity would be O(N) for types supporting equals as well and we would safe a portion of duplicated code.
If you think that maps will remain like this for a long time, really like your suggestion with indexes.
@ueshin What's your view on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no plan to have a different map implementation and anyway there is a lot of code which depends on having the array based version of MapData. Regarding the duplicated code, to be honest, I think that avoiding the refactoring introduced by that would also make this PR cleaner...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will change it. Thanks a lot!
} | ||
|
||
test("map_zip_with function - invalid") | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please move the brace at the end of the previous line (there are other place where this should be done, please update them too)
Test build #94362 has finished for PR 22017 at commit
|
|
||
abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes { | ||
object GetMapValueUtil | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: brace should in previous line.
|
||
override def functions: Seq[Expression] = function :: Nil | ||
|
||
override def nullable: Boolean = left.nullable || right.nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left.nullable && right.nullable
? Because if one side is empty map, NULL will be passed as the value for each key in other side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullable
flag is rather related to the cases when the whole map is null
. The case that you are referring to is handled by valueContainsNull
flag of MapType
(see the line 496).
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
Test build #94394 has finished for PR 22017 at commit
|
Test build #94393 has finished for PR 22017 at commit
|
} | ||
|
||
private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { | ||
val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why Array[Option[Int]]
instead of (Option[Int], Option[Int])
? Moreover, I can't understand why we need this at all. As we have the HashMap
, we can just add there the indexes and return it as an array..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we changed it to (Option[Int], Option[Int])
, wouldn't we need two similar i
loops instead of one?
My motivation for using also the ArrayBuffer
is preserve the order of keys. A random order would break map comparison in tests. Maybe you will come with idea how to compare maps in tests better :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we changed it to (Option[Int], Option[Int]), wouldn't we need two similar i loops instead of one?
I really don't think so, it would be the same as now I think
well, maybe we can fix map comparison in tests... :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't think so, it would be the same as now I think
Let's assume that indexes
are tuple for now. indexes(z).isEmpty
could replace with indexes.productElement(z).isEmpty
, but how to replace indexes(z) = Some(i)
? Since tuple is immutable, I don't see how to replace ith element with copy
function. Maybe we could implement a dedicated class to hold indexes, but is it worth doing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the HashMap is mutable, you can just: hashMap += key -> newTuple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The array based solution is 20% faster indeed according to my benchmark, but I think it is not critical as I run the benchmark performing the operation 1.000.000 times and the absolute difference was 2 ms. So I prefer the cleaner solution (that is using tuples). @ueshin what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally prefer the cleaner solution, but actually I'd prefer the previous approach in this case for 2 reasons:
- We shouldn't ignore 20% of performance difference.
- I'm not sure we can modify the comparison of
MapType
inExpressionEvalHelper
here. We might need another pr to make sure the modification is valid.
We still need to add comments what the arrays are for and the reason, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Are you comfortable with reverting back to the previous version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mn-mikke we can use LinkedHashMap
in order to preserve key order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this idea, thanks!
private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { | ||
val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] | ||
val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]] | ||
val keys = Array(keys1, keys2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to do something like for (arr <- Seq(keys1, keys2))
?
j += 1 | ||
} | ||
if (!found) { | ||
assertSizeOfArrayBuffer(arrayBuffer.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we check this only once at the end in order to avoid the overhead at each iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this line is to avoid OutOfMemoryError
exception when max array size is exceeded and throw something more accurate. Maybe I'm missing something, but wouldn't we break it we checked this only once at the end? The max size could be exceeded in any iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, because you are using an ArrayBuffer....makes sense, thanks
arrayBuffer | ||
} | ||
|
||
private def getValue(valueData: ArrayData, eType: DataType, index: Option[Int]) = index match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this? It can be index.map(valueData.get(_, eType)).getOrElse(null)
and we are using it only in one place (twice, but in the same place)...
Test build #94458 has finished for PR 22017 at commit
|
|
||
@transient lazy val functionForEval: Expression = functionsForEval.head | ||
|
||
@transient lazy val (keyType, leftValueType, _) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyType
should be TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, thanks!
WDYT about introducing a coercion rule handling different key types? For cases like (IntType
, LongType
) might be handy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Thanks!
var i = 0 | ||
while (i < keys1.numElements) { | ||
val key = keys1.get(i, keyType) | ||
if(!hashMap.contains(key)) hashMap.put(key, (Some(i), None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's use brackets:
if (!hashMap.contains(key)) {
hashMap.put(key, (Some(i), None))
}
if (unsafeRow != expectedRow) { | ||
val field = StructField("field", expression.dataType) | ||
val dataType = StructType(field :: field :: Nil) | ||
if (!checkResult(unsafeRow, expectedRow, dataType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnsafeRow
s are compared based on equality of backing arrays. This approach doesn't work well when ignoring order in unsafe representation of maps.
This reverts commit 6aeaaa8
…indexes represented as arrays.
Test build #94508 has finished for PR 22017 at commit
|
Test build #94526 has finished for PR 22017 at commit
|
LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too, apart from one question
HigherOrderFunction.mapKeyValueArgumentType(right.dataType) | ||
|
||
@transient lazy val keyType = | ||
TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this? We are enforcing that the two maps have the same key type, can't we just get one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though there is a coercion rule for unification of key types. The key types may differ in nullability flags if they are complex. In theory, we could use ==
and findTightestCommonType
in the coercion rule since there is no codegen to be optimized for null
checks. But unfortunatelly, bind
gets called once before execution of coercion rules, so findTightestCommonType
is important for setting up a correct input type for lamda function.
Maybe, we could play with order of analysis rules, but I'm not sure about all the consequences. @ueshin could shad some light on analysis rules ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the current analysis rules order might cause some problem. Let me think about it for a while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I submitted a pr to fix analysis rules order to fix argument types before bind
#22075.
Btw, we should use findCommonTypeDifferentOnlyInNullFlags
for this because after the type coercion, the difference between two key types must be only nullabilities. Sorry for confusing you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, if checkInputDataTypes
was executed before bind
, findTightestCommonType
could play the same role. But yeah, findCommonTypeDifferentOnlyInNullFlags
will be semantically more accurate. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After #22075, checkArgumentDataType()
introduced in it will be executed before bind
, so the key types should be "sameType" and we will be able to use findCommonTypeDifferentOnlyInNullFlags
. We still need checkInputDataTypes
to be executed after bind
to check the whole data types are valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. We also need to check the output data type of lambda functions for the expressions like ArrayFilter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Year, we need it.
/** | ||
* Similar to [[findTightestCommonType]] but with string promotion. | ||
*/ | ||
def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why except Decimals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have maps with decimals of different precision as keys. Cast
will fail in analysis phase since it can't cast a key to nullable (potential lost of precision). IMHO, the type mismatch exception from this function will be more accurate. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, good catch! But it led me to another issue. We can't choose those types possibly to be null as a map key. Instead of adding the method, how about modifying findTypeForComplex
as something like:
private def findTypeForComplex(
t1: DataType,
t2: DataType,
findTypeFunc: (DataType, DataType) => Option[DataType]): Option[DataType] = (t1, t2) match {
...
case (MapType(kt1, vt1, valueContainsNull1), MapType(kt2, vt2, valueContainsNull2)) =>
findTypeFunc(kt1, kt2)
.filter(kt => !Cast.forceNullable(kt1, kt) && !Cast.forceNullable(kt2, kt))
.flatMap { kt =>
findTypeFunc(vt1, vt2).map { vt =>
MapType(kt, vt, valueContainsNull1 || valueContainsNull2)
}
}
...
}
We might need to have another pr to discuss this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thoughts, do we really need those? Seems like the current coercions rules don't contain possibly cast to null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I see that this is a matter of findTypeForComplex
. I'll submit another pr later. Maybe we can go back to findWiderTypeForTwo
in TypeCoercion
and findCommonTypeDifferentOnlyInNullFlag
for keyType
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I submitted a pr #22086.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for both your PRs! I will submit changes once they get in.
Test build #94570 has finished for PR 22017 at commit
|
retest this please |
Test build #94585 has finished for PR 22017 at commit
|
retest this please |
Test build #94685 has finished for PR 22017 at commit
|
…NullFlags + coercion rule refactoring
Test build #94712 has finished for PR 22017 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
case class MapZipWith(left: Expression, right: Expression, function: Expression) | ||
extends HigherOrderFunction with CodegenFallback { | ||
|
||
@transient lazy val functionForEval: Expression = functionsForEval.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we use def
here to follow the comment #21954 (comment)?
} | ||
|
||
// Nothing to check since the data type of the lambda function can be anything. | ||
override def checkInputDataTypes(): TypeCheckResult = TypeCheckResult.TypeCheckSuccess |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd call checkArgumentDataTypes()
here again.
Row(null))) | ||
} | ||
|
||
test("map_zip_with function - map of complex types") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: non-primitive
instead of complex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType | ||
|
||
@transient lazy val keyType = | ||
TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the null flag be false for both them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If leftKeyType
is ArrayType(IntegerType, false)
and rightKeyType
is ArrayType(IntegerType, true)
for instance, the coercion rule is not executed leftKeyType.sameType(rightKeyType) == true
.
An array with nulls seems to be a valid key.:
scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show()
+---------------------------------------+
|map(array(1, 2, CAST(NULL AS INT)), 12)|
+---------------------------------------+
| [[1, 2,] -> 12]|
+---------------------------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks
Test build #94737 has finished for PR 22017 at commit
|
Thanks! merging to master. |
…should be true. ## What changes were proposed in this pull request? This is a follow-up pr of #22017 which added `map_zip_with` function. In the test, when creating a lambda function, we use the `valueContainsNull` values for the nullabilities of the value arguments, but we should've used `true` as the same as `bind` method because the values might be `null` if the keys don't match. ## How was this patch tested? Added small tests and existing tests. Closes #22126 from ueshin/issues/SPARK-23938/fix_tests. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Takuya UESHIN <[email protected]>
## What changes were proposed in this pull request? - Revert [SPARK-23935][SQL] Adding map_entries function: #21236 - Revert [SPARK-23937][SQL] Add map_filter SQL function: #21986 - Revert [SPARK-23940][SQL] Add transform_values SQL function: #22045 - Revert [SPARK-23939][SQL] Add transform_keys function: #22013 - Revert [SPARK-23938][SQL] Add map_zip_with function: #22017 - Revert the changes of map_entries in [SPARK-24331][SPARKR][SQL] Adding arrays_overlap, array_repeat, map_entries to SparkR: #21434 ## How was this patch tested? The existing tests. Closes #22827 from gatorsmile/revertMap2.4. Authored-by: gatorsmile <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR adds a new SQL function called
map_zip_with
. It merges the two given maps into a single map by applying function to the pair of values with the same key.How was this patch tested?
Added new tests into: