-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16043][SQL] Prepare GenericArrayData implementation specialized for a primitive array #13758
Conversation
Test build #60768 has finished for PR 13758 at commit
|
Test build #60782 has finished for PR 13758 at commit
|
Test build #60792 has finished for PR 13758 at commit
|
@@ -142,3 +164,415 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData { | |||
result | |||
} | |||
} | |||
|
|||
final class GenericIntArrayData(private val primitiveArray: Array[Int]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the class hierarchy is a bit off. We should make GenericArrayData
abstract and introduce a GenericeRefArrayData
as a replacement for the current GenericArrayData
.
The downside of the introducing this many ArrayData
classes is that ArrayData
operations will be mega-morphic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion. It would be good to make GenericArrayData
abstract.
As you said, ArrayData
will be mega-morphic in the class hierarchy, actual use should be monomorphic. To make each class final
alleviate runtime overhead.
Test build #60944 has finished for PR 13758 at commit
|
Test build #60947 has finished for PR 13758 at commit
|
Test build #61001 has finished for PR 13758 at commit
|
@@ -23,7 +23,60 @@ import org.apache.spark.sql.catalyst.InternalRow | |||
import org.apache.spark.sql.types.{DataType, Decimal} | |||
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} | |||
|
|||
class GenericArrayData(val array: Array[Any]) extends ArrayData { | |||
object GenericArrayData { | |||
def allocate(seq: Seq[Any]): GenericArrayData = new GenericRefArrayData(seq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make all allocate methods return the type they are actually allocating. That will increase the chance that we deal with a monomorphic callsite in further code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, you are right
@kiszk it would be nice to know what the influence of this PR is on performance. Could you perhaps elaborate on this? |
@hvanhovell, yes, it is good idea. Actually, I wrote a benchmark program The following command does not execute my benchmark program...
|
Did you model it after MiscBenchmark? https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala I can take a look if you add it to the PR. |
@@ -159,17 +159,17 @@ object CatalystTypeConverters { | |||
override def toCatalystImpl(scalaValue: Any): ArrayData = { | |||
scalaValue match { | |||
case a: Array[_] => | |||
new GenericArrayData(a.map(elementConverter.toCatalyst)) | |||
GenericArrayData.allocate(a.map(elementConverter.toCatalyst)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These allocates will create a GenericRefArrayData object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I feel so for now since type of toCatalyst
seems to be Any
. Is it better to prepare specialized code to make a type concrete here?
@hvanhovell , I added a file of Benchmark (not ran yet). I would appreciate it if you have a time to look at this. It is very strange to me that I can run a Benchmark program under |
/** | ||
* Benchmark [[GenericArrayData]] for Dense and Sparse with primitive type | ||
*/ | ||
object GenericArrayDataBenchmark { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from MiscBenchmark
. It is a class that extends BenchmarkBase. You probably have to move it to sql/core though.
Test build #61009 has finished for PR 13758 at commit
|
Test build #61030 has finished for PR 13758 at commit
|
Test build #61048 has started for PR 13758 at commit |
Test build #61057 has finished for PR 13758 at commit
|
Test build #61090 has finished for PR 13758 at commit
|
Test build #68345 has finished for PR 13758 at commit
|
Test build #68344 has finished for PR 13758 at commit
|
Test build #68349 has finished for PR 13758 at commit
|
@hvanhovell Yes, as you said, an issue in a projection to write an array was resolved by #15044. |
Test build #68362 has finished for PR 13758 at commit
|
Test build #68361 has finished for PR 13758 at commit
|
@cloud-fan yes, we could take the same approach as #15044. When I have just implement it in my local environment, it can achieve similar performance improvement. |
Test build #68397 has finished for PR 13758 at commit
|
if the input array is primitive array, it should in unsafe format and we will just copy the binary, so what's the use case for this optimization? Besides, proving a faster |
I'm sorry that if the TODO in |
shall we close it? |
I think that it is good time to close this when #13909 is closed. |
I think we can close this now. |
Issues, which this PR addresses, have been solved by other approaches (i.e. use |
## What changes were proposed in this pull request? Waiting for merging apache#13680 This PR optimizes `SerializeFromObject()` for an primitive array. This is derived from apache#13758 to address one of problems by using a simple way in apache#13758. The current implementation always generates `GenericArrayData` from `SerializeFromObject()` for any type of an array in a logical plan. This involves a boxing at a constructor of `GenericArrayData` when `SerializedFromObject()` has an primitive array. This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for a primitive array. It can avoid boxing to create an instance of `ArrayData` in the generated code by Catalyst. This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` or `CatalystTypeConverters.createToCatalystConverter`. Performance improvement of `SerializeFromObject()` is up to 2.0x ``` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without this PR Write an array in Dataset: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 556 / 608 15.1 66.3 1.0X Double 1668 / 1746 5.0 198.8 0.3X with this PR Write an array in Dataset: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 352 / 401 23.8 42.0 1.0X Double 821 / 885 10.2 97.9 0.4X ``` Here is an example program that will happen in mllib as described in [SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070). ``` sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show ``` Generated code before applying this PR ``` java /* 039 */ protected void processNext() throws java.io.IOException { /* 040 */ while (inputadapter_input.hasNext()) { /* 041 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 042 */ int[] inputadapter_value = (int[])inputadapter_row.get(0, null); /* 043 */ /* 044 */ Object mapelements_obj = ((Expression) references[0]).eval(null); /* 045 */ scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj; /* 046 */ /* 047 */ boolean mapelements_isNull = false || false; /* 048 */ int[] mapelements_value = null; /* 049 */ if (!mapelements_isNull) { /* 050 */ Object mapelements_funcResult = null; /* 051 */ mapelements_funcResult = mapelements_value1.apply(inputadapter_value); /* 052 */ if (mapelements_funcResult == null) { /* 053 */ mapelements_isNull = true; /* 054 */ } else { /* 055 */ mapelements_value = (int[]) mapelements_funcResult; /* 056 */ } /* 057 */ /* 058 */ } /* 059 */ mapelements_isNull = mapelements_value == null; /* 060 */ /* 061 */ serializefromobject_argIsNulls[0] = mapelements_isNull; /* 062 */ serializefromobject_argValue = mapelements_value; /* 063 */ /* 064 */ boolean serializefromobject_isNull = false; /* 065 */ for (int idx = 0; idx < 1; idx++) { /* 066 */ if (serializefromobject_argIsNulls[idx]) { serializefromobject_isNull = true; break; } /* 067 */ } /* 068 */ /* 069 */ final ArrayData serializefromobject_value = serializefromobject_isNull ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 070 */ serializefromobject_holder.reset(); /* 071 */ /* 072 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 073 */ /* 074 */ if (serializefromobject_isNull) { /* 075 */ serializefromobject_rowWriter.setNullAt(0); /* 076 */ } else { /* 077 */ // Remember the current cursor so that we can calculate how many bytes are /* 078 */ // written later. /* 079 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 080 */ /* 081 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 082 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 083 */ // grow the global buffer before writing data. /* 084 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 085 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 086 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 087 */ /* 088 */ } else { /* 089 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 090 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 091 */ /* 092 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 093 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 094 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 095 */ } else { /* 096 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 097 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 098 */ } /* 099 */ } /* 100 */ } /* 101 */ /* 102 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 103 */ } /* 104 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 105 */ append(serializefromobject_result); /* 106 */ if (shouldStop()) return; /* 107 */ } /* 108 */ } /* 109 */ } ``` Generated code after applying this PR ``` java /* 035 */ protected void processNext() throws java.io.IOException { /* 036 */ while (inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ int[] inputadapter_value = (int[])inputadapter_row.get(0, null); /* 039 */ /* 040 */ Object mapelements_obj = ((Expression) references[0]).eval(null); /* 041 */ scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj; /* 042 */ /* 043 */ boolean mapelements_isNull = false || false; /* 044 */ int[] mapelements_value = null; /* 045 */ if (!mapelements_isNull) { /* 046 */ Object mapelements_funcResult = null; /* 047 */ mapelements_funcResult = mapelements_value1.apply(inputadapter_value); /* 048 */ if (mapelements_funcResult == null) { /* 049 */ mapelements_isNull = true; /* 050 */ } else { /* 051 */ mapelements_value = (int[]) mapelements_funcResult; /* 052 */ } /* 053 */ /* 054 */ } /* 055 */ mapelements_isNull = mapelements_value == null; /* 056 */ /* 057 */ boolean serializefromobject_isNull = mapelements_isNull; /* 058 */ final ArrayData serializefromobject_value = serializefromobject_isNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(mapelements_value); /* 059 */ serializefromobject_isNull = serializefromobject_value == null; /* 060 */ serializefromobject_holder.reset(); /* 061 */ /* 062 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 063 */ /* 064 */ if (serializefromobject_isNull) { /* 065 */ serializefromobject_rowWriter.setNullAt(0); /* 066 */ } else { /* 067 */ // Remember the current cursor so that we can calculate how many bytes are /* 068 */ // written later. /* 069 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 070 */ /* 071 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 072 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 073 */ // grow the global buffer before writing data. /* 074 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 075 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 076 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 077 */ /* 078 */ } else { /* 079 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 080 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 081 */ /* 082 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 083 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 084 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 085 */ } else { /* 086 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 087 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 088 */ } /* 089 */ } /* 090 */ } /* 091 */ /* 092 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 093 */ } /* 094 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 095 */ append(serializefromobject_result); /* 096 */ if (shouldStop()) return; /* 097 */ } /* 098 */ } /* 099 */ } ``` ## How was this patch tested? Added a test in `DatasetSuite`, `RowEncoderSuite`, and `CatalystTypeConvertersSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#15044 from kiszk/SPARK-17490.
What changes were proposed in this pull request?
This PR addresses a ToDo of
GenericArrayData
class. Current implementation ofGenericArrayData
leads to boxing/unboxing if type of array elements are primitive. It would be good to eliminate boxing/unboxing from the view of runtime memory footprint and performance.This PR eliminattes boxing/unboxing by enhancing implementation of
GenericArrayData
. Since it does not change current APInew GenericArrayData(...)
, it minimized amount of changes.This PR also optimize generate code of projection for an primitive type array. While we know primitive type array does not require null check and has contiguous data region, this PR performs bulk data copy by using
Platform.copy
without null checks.Here are major improvements:
Object[]
is used to hold a primitive array and boxing happened in a constructor)get<Type>()
gets a value from an primitive array (previously unboxing happened)to<Type>Array()
performs data copy usingSystem.arraycopy
(previously unboxing happened)UnsafeArrayWrite.writePrimitive<PrimitiveType>Array()
performs data copy usingPlatform.copy
.Here are performance results of micro benchmarks
How was this patch tested?
add unit tests