Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16043][SQL] Prepare GenericArrayData implementation specialized for a primitive array #13758

Closed
wants to merge 75 commits into from

Conversation

kiszk
Copy link
Member

@kiszk kiszk commented Jun 18, 2016

What changes were proposed in this pull request?

This PR addresses a ToDo of GenericArrayData class. Current implementation of GenericArrayData leads to boxing/unboxing if type of array elements are primitive. It would be good to eliminate boxing/unboxing from the view of runtime memory footprint and performance.

This PR eliminattes boxing/unboxing by enhancing implementation of GenericArrayData. Since it does not change current API new GenericArrayData(...), it minimized amount of changes.
This PR also optimize generate code of projection for an primitive type array. While we know primitive type array does not require null check and has contiguous data region, this PR performs bulk data copy by using Platform.copy without null checks.

Here are major improvements:

  1. Hold an array in a primitive array (previously Object[] is used to hold a primitive array and boxing happened in a constructor)
  2. a method get<Type>() gets a value from an primitive array (previously unboxing happened)
  3. a method to<Type>Array() performs data copy using System.arraycopy (previously unboxing happened)
  4. a method UnsafeArrayWrite.writePrimitive<PrimitiveType>Array() performs data copy using Platform.copy.

Here are performance results of micro benchmarks

OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Get int primitive array:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Generic                                        277 /  366        605.0           1.7       1.0X
Specialized                                    214 /  251        785.1           1.3       1.3X

Get double primitive array:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Generic                                       1976 / 1991         84.9          11.8       1.0X
Specialized                                    589 / 1050        285.1           3.5       3.4X

Read GenericArrayData Int:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Generic                                        208 /  214       1008.3           1.0       1.0X
Specialized                                    142 /  158       1471.7           0.7       1.5X

Read GenericArrayData Double:            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Generic                                        621 /  683        337.7           3.0       1.0X
Specialized                                    265 /  297        790.4           1.3       2.3X

How was this patch tested?

add unit tests

@SparkQA
Copy link

SparkQA commented Jun 18, 2016

Test build #60768 has finished for PR 13758 at commit 17bdfcf.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 18, 2016

Test build #60782 has finished for PR 13758 at commit d4b8f07.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 19, 2016

Test build #60792 has finished for PR 13758 at commit c980f74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -142,3 +164,415 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData {
result
}
}

final class GenericIntArrayData(private val primitiveArray: Array[Int])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the class hierarchy is a bit off. We should make GenericArrayData abstract and introduce a GenericeRefArrayData as a replacement for the current GenericArrayData.

The downside of the introducing this many ArrayData classes is that ArrayData operations will be mega-morphic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. It would be good to make GenericArrayData abstract.
As you said, ArrayData will be mega-morphic in the class hierarchy, actual use should be monomorphic. To make each class final alleviate runtime overhead.

@SparkQA
Copy link

SparkQA commented Jun 21, 2016

Test build #60944 has finished for PR 13758 at commit 2da3cfb.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2016

Test build #60947 has finished for PR 13758 at commit e10a890.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2016

Test build #61001 has finished for PR 13758 at commit e04ca2c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -23,7 +23,60 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

class GenericArrayData(val array: Array[Any]) extends ArrayData {
object GenericArrayData {
def allocate(seq: Seq[Any]): GenericArrayData = new GenericRefArrayData(seq)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make all allocate methods return the type they are actually allocating. That will increase the chance that we deal with a monomorphic callsite in further code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, you are right

@hvanhovell
Copy link
Contributor

@kiszk it would be nice to know what the influence of this PR is on performance. Could you perhaps elaborate on this?

@kiszk
Copy link
Member Author

kiszk commented Jun 22, 2016

@hvanhovell, yes, it is good idea. Actually, I wrote a benchmark program org.apache.spark.sql.catalyst.util.GenericArrayBenchmark (not committed yet). An issue in my environment is that I cannot run a benchmark program under sql/catalyst.

The following command does not execute my benchmark program...

build/sbt "catalyst/test-only *GenericArrayBenchmark*"

@hvanhovell
Copy link
Contributor

Did you model it after MiscBenchmark? https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala

I can take a look if you add it to the PR.

@@ -159,17 +159,17 @@ object CatalystTypeConverters {
override def toCatalystImpl(scalaValue: Any): ArrayData = {
scalaValue match {
case a: Array[_] =>
new GenericArrayData(a.map(elementConverter.toCatalyst))
GenericArrayData.allocate(a.map(elementConverter.toCatalyst))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These allocates will create a GenericRefArrayData object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I feel so for now since type of toCatalyst seems to be Any. Is it better to prepare specialized code to make a type concrete here?

@kiszk
Copy link
Member Author

kiszk commented Jun 22, 2016

@hvanhovell , I added a file of Benchmark (not ran yet). I would appreciate it if you have a time to look at this.

It is very strange to me that I can run a Benchmark program under sql/core (e.g. MiscBenchmark) by using build/sbt "sql/test-only *MiscBenchmark*".

/**
* Benchmark [[GenericArrayData]] for Dense and Sparse with primitive type
*/
object GenericArrayDataBenchmark {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from MiscBenchmark. It is a class that extends BenchmarkBase. You probably have to move it to sql/core though.

@SparkQA
Copy link

SparkQA commented Jun 22, 2016

Test build #61009 has finished for PR 13758 at commit 133d4c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2016

Test build #61030 has finished for PR 13758 at commit ef84f46.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2016

Test build #61048 has started for PR 13758 at commit 4990282.

@SparkQA
Copy link

SparkQA commented Jun 22, 2016

Test build #61057 has finished for PR 13758 at commit bf06a0c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2016

Test build #61090 has finished for PR 13758 at commit 2338c2a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68345 has finished for PR 13758 at commit 503dbde.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68344 has finished for PR 13758 at commit 20c3d1d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68349 has finished for PR 13758 at commit c7ed68f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Nov 8, 2016

@hvanhovell Yes, as you said, an issue in a projection to write an array was resolved by #15044.
I updated this PR to show another use case of this PR to read an array in Dataset. What do you think?

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68362 has finished for PR 13758 at commit 6bf54ec.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68361 has finished for PR 13758 at commit c82fbf3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@kiszk, can you update the PR description to explain where is the performance benefits of this PR? For the given example, df.selectExpr("Array(value + 1.1d, value + 2.2d)").show, can we just apply the same optimization in #15044 to CreateArray?

@kiszk
Copy link
Member Author

kiszk commented Nov 9, 2016

@cloud-fan yes, we could take the same approach as #15044. When I have just implement it in my local environment, it can achieve similar performance improvement.
I will submit that approach to #13909 within next few hours, then cc you and @hvanhovell.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68397 has finished for PR 13758 at commit 7697e5f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 10, 2016

This PR also optimize generate code of projection for an primitive type array. While we know primitive type array does not require null check and has contiguous data region, this PR performs bulk data copy by using Platform.copy without null checks.

if the input array is primitive array, it should in unsafe format and we will just copy the binary, so what's the use case for this optimization?

Besides, proving a faster GenericArrayData is not an optimization to me, if we don't get an end-to-end performance boost.

@cloud-fan
Copy link
Contributor

I'm sorry that if the TODO in GenericArrayData mislead you, but after #13680 , I don't think this TODO makes sense anymore, as GenericArrayData is meant to be slow. All we need to do is to make sure we convert input primitive array to unsafe array directly, not GenericArrayData.

@cloud-fan
Copy link
Contributor

shall we close it?

@kiszk
Copy link
Member Author

kiszk commented Dec 26, 2016

I think that it is good time to close this when #13909 is closed.

@viirya
Copy link
Member

viirya commented Dec 29, 2016

I think we can close this now.

@kiszk
Copy link
Member Author

kiszk commented Dec 29, 2016

Issues, which this PR addresses, have been solved by other approaches (i.e. use UnsafePrimitiveArray).

@kiszk kiszk closed this Dec 29, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

Waiting for merging apache#13680

This PR optimizes `SerializeFromObject()` for an primitive array. This is derived from apache#13758 to address one of problems by using a simple way in apache#13758.

The current implementation always generates `GenericArrayData` from `SerializeFromObject()` for any type of an array in a logical plan. This involves a boxing at a constructor of `GenericArrayData` when `SerializedFromObject()` has an primitive array.

This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for a primitive array. It can avoid boxing to create an instance of `ArrayData` in the generated code by Catalyst.

This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` or `CatalystTypeConverters.createToCatalystConverter`.

Performance improvement of `SerializeFromObject()` is up to 2.0x

```
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without this PR
Write an array in Dataset:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            556 /  608         15.1          66.3       1.0X
Double                                        1668 / 1746          5.0         198.8       0.3X

with this PR
Write an array in Dataset:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            352 /  401         23.8          42.0       1.0X
Double                                         821 /  885         10.2          97.9       0.4X
```

Here is an example program that will happen in mllib as described in [SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070).

```
sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show
```

Generated code before applying this PR

``` java
/* 039 */   protected void processNext() throws java.io.IOException {
/* 040 */     while (inputadapter_input.hasNext()) {
/* 041 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 042 */       int[] inputadapter_value = (int[])inputadapter_row.get(0, null);
/* 043 */
/* 044 */       Object mapelements_obj = ((Expression) references[0]).eval(null);
/* 045 */       scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj;
/* 046 */
/* 047 */       boolean mapelements_isNull = false || false;
/* 048 */       int[] mapelements_value = null;
/* 049 */       if (!mapelements_isNull) {
/* 050 */         Object mapelements_funcResult = null;
/* 051 */         mapelements_funcResult = mapelements_value1.apply(inputadapter_value);
/* 052 */         if (mapelements_funcResult == null) {
/* 053 */           mapelements_isNull = true;
/* 054 */         } else {
/* 055 */           mapelements_value = (int[]) mapelements_funcResult;
/* 056 */         }
/* 057 */
/* 058 */       }
/* 059 */       mapelements_isNull = mapelements_value == null;
/* 060 */
/* 061 */       serializefromobject_argIsNulls[0] = mapelements_isNull;
/* 062 */       serializefromobject_argValue = mapelements_value;
/* 063 */
/* 064 */       boolean serializefromobject_isNull = false;
/* 065 */       for (int idx = 0; idx < 1; idx++) {
/* 066 */         if (serializefromobject_argIsNulls[idx]) { serializefromobject_isNull = true; break; }
/* 067 */       }
/* 068 */
/* 069 */       final ArrayData serializefromobject_value = serializefromobject_isNull ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
/* 070 */       serializefromobject_holder.reset();
/* 071 */
/* 072 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 073 */
/* 074 */       if (serializefromobject_isNull) {
/* 075 */         serializefromobject_rowWriter.setNullAt(0);
/* 076 */       } else {
/* 077 */         // Remember the current cursor so that we can calculate how many bytes are
/* 078 */         // written later.
/* 079 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 080 */
/* 081 */         if (serializefromobject_value instanceof UnsafeArrayData) {
/* 082 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 083 */           // grow the global buffer before writing data.
/* 084 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 085 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 086 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 087 */
/* 088 */         } else {
/* 089 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 090 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 091 */
/* 092 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 093 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 094 */               serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 095 */             } else {
/* 096 */               final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
/* 097 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 098 */             }
/* 099 */           }
/* 100 */         }
/* 101 */
/* 102 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 103 */       }
/* 104 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 105 */       append(serializefromobject_result);
/* 106 */       if (shouldStop()) return;
/* 107 */     }
/* 108 */   }
/* 109 */ }
```

Generated code after applying this PR

``` java
/* 035 */   protected void processNext() throws java.io.IOException {
/* 036 */     while (inputadapter_input.hasNext()) {
/* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 038 */       int[] inputadapter_value = (int[])inputadapter_row.get(0, null);
/* 039 */
/* 040 */       Object mapelements_obj = ((Expression) references[0]).eval(null);
/* 041 */       scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj;
/* 042 */
/* 043 */       boolean mapelements_isNull = false || false;
/* 044 */       int[] mapelements_value = null;
/* 045 */       if (!mapelements_isNull) {
/* 046 */         Object mapelements_funcResult = null;
/* 047 */         mapelements_funcResult = mapelements_value1.apply(inputadapter_value);
/* 048 */         if (mapelements_funcResult == null) {
/* 049 */           mapelements_isNull = true;
/* 050 */         } else {
/* 051 */           mapelements_value = (int[]) mapelements_funcResult;
/* 052 */         }
/* 053 */
/* 054 */       }
/* 055 */       mapelements_isNull = mapelements_value == null;
/* 056 */
/* 057 */       boolean serializefromobject_isNull = mapelements_isNull;
/* 058 */       final ArrayData serializefromobject_value = serializefromobject_isNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(mapelements_value);
/* 059 */       serializefromobject_isNull = serializefromobject_value == null;
/* 060 */       serializefromobject_holder.reset();
/* 061 */
/* 062 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 063 */
/* 064 */       if (serializefromobject_isNull) {
/* 065 */         serializefromobject_rowWriter.setNullAt(0);
/* 066 */       } else {
/* 067 */         // Remember the current cursor so that we can calculate how many bytes are
/* 068 */         // written later.
/* 069 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 070 */
/* 071 */         if (serializefromobject_value instanceof UnsafeArrayData) {
/* 072 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 073 */           // grow the global buffer before writing data.
/* 074 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 075 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 076 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 077 */
/* 078 */         } else {
/* 079 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 080 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 081 */
/* 082 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 083 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 084 */               serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 085 */             } else {
/* 086 */               final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
/* 087 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 088 */             }
/* 089 */           }
/* 090 */         }
/* 091 */
/* 092 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 093 */       }
/* 094 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 095 */       append(serializefromobject_result);
/* 096 */       if (shouldStop()) return;
/* 097 */     }
/* 098 */   }
/* 099 */ }
```
## How was this patch tested?

Added a test in `DatasetSuite`, `RowEncoderSuite`, and `CatalystTypeConvertersSuite`

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#15044 from kiszk/SPARK-17490.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants