Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23935][SQL] Adding map_entries function #21236

Closed

Conversation

mn-mikke
Copy link
Contributor

@mn-mikke mn-mikke commented May 4, 2018

What changes were proposed in this pull request?

This PR adds map_entries function that returns an unordered array of all entries in the given map.

How was this patch tested?

New tests added into:

  • CollectionExpressionSuite
  • DataFrameFunctionsSuite

CodeGen examples

Primitive types

val df = Seq(Map(1 -> 5, 2 -> 6)).toDF("m")
df.filter('m.isNotNull).select(map_entries('m)).debugCodegen

Result:

/* 042 */         boolean project_isNull_0 = false;
/* 043 */
/* 044 */         ArrayData project_value_0 = null;
/* 045 */
/* 046 */         final int project_numElements_0 = inputadapter_value_0.numElements();
/* 047 */         final ArrayData project_keys_0 = inputadapter_value_0.keyArray();
/* 048 */         final ArrayData project_values_0 = inputadapter_value_0.valueArray();
/* 049 */
/* 050 */         final long project_size_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 051 */           project_numElements_0,
/* 052 */           32);
/* 053 */         if (project_size_0 > 2147483632) {
/* 054 */           final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 055 */           for (int z = 0; z < project_numElements_0; z++) {
/* 056 */             project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{project_keys_0.getInt(z), project_values_0.getInt(z)});
/* 057 */           }
/* 058 */           project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
/* 059 */
/* 060 */         } else {
/* 061 */           final byte[] project_arrayBytes_0 = new byte[(int)project_size_0];
/* 062 */           UnsafeArrayData project_unsafeArrayData_0 = new UnsafeArrayData();
/* 063 */           Platform.putLong(project_arrayBytes_0, 16, project_numElements_0);
/* 064 */           project_unsafeArrayData_0.pointTo(project_arrayBytes_0, 16, (int)project_size_0);
/* 065 */
/* 066 */           final int project_structsOffset_0 = UnsafeArrayData.calculateHeaderPortionInBytes(project_numElements_0) + project_numElements_0 * 8;
/* 067 */           UnsafeRow project_unsafeRow_0 = new UnsafeRow(2);
/* 068 */           for (int z = 0; z < project_numElements_0; z++) {
/* 069 */             long offset = project_structsOffset_0 + z * 24L;
/* 070 */             project_unsafeArrayData_0.setLong(z, (offset << 32) + 24L);
/* 071 */             project_unsafeRow_0.pointTo(project_arrayBytes_0, 16 + offset, 24);
/* 072 */             project_unsafeRow_0.setInt(0, project_keys_0.getInt(z));
/* 073 */             project_unsafeRow_0.setInt(1, project_values_0.getInt(z));
/* 074 */           }
/* 075 */           project_value_0 = project_unsafeArrayData_0;
/* 076 */
/* 077 */         }

Non-primitive types

val df = Seq(Map("a" -> "foo", "b" -> null)).toDF("m")
df.filter('m.isNotNull).select(map_entries('m)).debugCodegen

Result:

/* 042 */         boolean project_isNull_0 = false;
/* 043 */
/* 044 */         ArrayData project_value_0 = null;
/* 045 */
/* 046 */         final int project_numElements_0 = inputadapter_value_0.numElements();
/* 047 */         final ArrayData project_keys_0 = inputadapter_value_0.keyArray();
/* 048 */         final ArrayData project_values_0 = inputadapter_value_0.valueArray();
/* 049 */
/* 050 */         final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 051 */         for (int z = 0; z < project_numElements_0; z++) {
/* 052 */           project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{project_keys_0.getUTF8String(z), project_values_0.getUTF8String(z)});
/* 053 */         }
/* 054 */         project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);

@mn-mikke
Copy link
Contributor Author

mn-mikke commented May 4, 2018

cc @ueshin @gatorsmile

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented May 4, 2018

Test build #90216 has finished for PR 21236 at commit 086e223.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInputTypes

@SparkQA
Copy link

SparkQA commented May 5, 2018

Test build #90221 has finished for PR 21236 at commit b9e2409.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

s"""
|final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can calculate structSize beforehand and inline it?

s"""
|final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2};
|final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize);
|final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this into else-clause?

mn-mikke added 2 commits May 7, 2018 15:15
…p_entries-to-master

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@SparkQA
Copy link

SparkQA commented May 7, 2018

Test build #90318 has finished for PR 21236 at commit d05ad9b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 8, 2018

Test build #90367 has finished for PR 21236 at commit 6aa90ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for one question.


val baseOffset = Platform.BYTE_ARRAY_OFFSET
val longSize = LongType.defaultSize
val structSize = UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering it is right to use longSize here?
I know the value is 8 and is same as the word size, but feel like the meaning is different?
cc @gatorsmile @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin Really good question. I'm eager to learn about the true purpose of the DataType.defaultSize function. Currently, it's used in this meaning at more places (e.g.GenArrayData.genCodeToCreateArrayData and CodeGenerator.createUnsafeArray.)

What about using Long.BYTES from Java 8 instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, 8 is the better choice since it is not related to an element size of long.
To my best guess, it would be best to define a new constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk Thanks for your suggestion, but it seems to me that LongType.defaultSize could be used in this case. It seems that the purpose of defaultSize is not only the calculation of estimated data size in statistics. GenerateUnsafeProjection.writeArrayToBuffer, InterpretedUnsafeProjection.getElementSize and other parts utilize defaultSize in the same way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for the element size of arrays. I agree with @kiszk to use 8.
Maybe we need to add a constant to represent the word size in UnsafeRow or somewhere in the future pr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK, I misunderstood the comments. Thanks guys!

| $unsafeArrayData.pointTo($data, $baseOffset, (int)$byteArraySize);
| UnsafeRow $unsafeRow = new UnsafeRow(2);
| for (int z = 0; z < $numElements; z++) {
| long offset = $structsOffset + z * $structSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: $structSize -> ${$structSize}L

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90557 has finished for PR 21236 at commit 56ff20a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val baseOffset = Platform.BYTE_ARRAY_OFFSET
val longSize = LongType.defaultSize
val structSize = UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for the element size of arrays. I agree with @kiszk to use 8.
Maybe we need to add a constant to represent the word size in UnsafeRow or somewhere in the future pr.

s"$values.isNullAt(z) ? null : (Object)${getValue(values)}"
} else {
getValue(values)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

s"""
|final long $byteArraySize = $calculateArraySize($numElements, ${longSize + structSize});
|if ($byteArraySize > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
| ${genCodeForAnyElements(ctx, keys, values, arrayData, numElements)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should we use this idiom for other array functions? WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I separated the logic that I can leverage for map_from_entries function. Moreover, I think it should be possible to replace UnsafeArrayData.createUnsafeArray with that logic, but will do it in a different PR.

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90596 has finished for PR 21236 at commit 1bd0d5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 17, 2018

Test build #90720 has finished for PR 21236 at commit baa61e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

@ueshin, @kiszk Thank you for valuable comments! Do you have any more?

@ueshin
Copy link
Member

ueshin commented May 21, 2018

I'd retrigger the build for just checking again.

@ueshin
Copy link
Member

ueshin commented May 21, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90880 has finished for PR 21236 at commit baa61e5.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90881 has finished for PR 21236 at commit baa61e5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented May 21, 2018

Jenkins, retest this please.

@mn-mikke
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90887 has finished for PR 21236 at commit baa61e5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90888 has finished for PR 21236 at commit baa61e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented May 21, 2018

Thanks! merging to master.

@asfgit asfgit closed this in a6e883f May 21, 2018
asfgit pushed a commit that referenced this pull request Jul 17, 2018
… collection expressions.

## What changes were proposed in this pull request?

The PR tries to avoid serialization of private fields of already added collection functions and follows up on comments in [SPARK-23922](#21028) and [SPARK-23935](#21236)

## How was this patch tested?

Run tests from:
- CollectionExpressionSuite.scala
- DataFrameFunctionsSuite.scala

Author: Marek Novotny <[email protected]>

Closes #21352 from mn-mikke/SPARK-24305.
@@ -98,6 +98,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
if (expected.isNaN) result.isNaN else expected == result
case (result: Float, expected: Float) =>
if (expected.isNaN) result.isNaN else expected == result
case (result: UnsafeRow, expected: GenericInternalRow) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mn-mikke I was just looking over compiler warnings, and noticed it claims this case is never triggered. I think it's because it would always first match the (InternalRow, InternalRow) case above. Should it go before that then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srowen,
(InternalRow, InternalRow) case was introduced later in 21838 and covers the logic of the case with UnsafeRow. So we can just remove the unreachable piece of code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger that, looks like Wenchen just did so. Thanks!

asfgit pushed a commit that referenced this pull request Oct 25, 2018
## What changes were proposed in this pull request?

- Revert [SPARK-23935][SQL] Adding map_entries function: #21236
- Revert [SPARK-23937][SQL] Add map_filter SQL function: #21986
- Revert [SPARK-23940][SQL] Add transform_values SQL function: #22045
- Revert [SPARK-23939][SQL] Add transform_keys function: #22013
- Revert [SPARK-23938][SQL] Add map_zip_with function: #22017
- Revert the changes of map_entries in [SPARK-24331][SPARKR][SQL] Adding arrays_overlap, array_repeat, map_entries to SparkR: #21434

## How was this patch tested?
The existing tests.

Closes #22827 from gatorsmile/revertMap2.4.

Authored-by: gatorsmile <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants