-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16792][SQL] Dataset containing a Case Class with a List type causes a CompileException (converting sequence to list) #16240
Conversation
(edit - old commit info; doesn't apply anymore) I would like to add that the conversion is specific to We can also support |
Test build #3488 has finished for PR 16240 at commit
|
WARNING: Breaks Seq.toDS for Seq[Product]
Added support for arbitrary sequences. Now also Queues, ArrayBuffers and such can be used in datasets (all are serialized into ArrayType). I had to alter and add new implicit encoders into However, I encountered another problem with implicits. When constructing a complex Dataset using Example code where the problem manifests: Seq(Queue(Tuple1(1))).toDS() I added a workaround by defining a specific implicit just for If anybody knows how to fix this, let me know. |
Possible optimization: Instead of conversions using I will wait for a response to this PR before attempting any more modifications. |
newLongSeqEncoder | ||
|
||
/** @since 2.2.0 */ | ||
implicit def newDoubleListEncoder[T <: Seq[Double] with Product : TypeTag]: Encoder[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be newDoubleSeqWithProductEncoder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should. Thanks
/cc @cloud-fan |
The overall strategy LGTM.
Does scala have a clear definition for this case? i.e. we have implicit for both type For the optimization, we can do it in follow-up. |
None of them. The compilation will fail. That is why I had to provide those additional implicits.
|
How about we assign priority to implicit rules like http://stackoverflow.com/questions/1886953/is-there-a-way-to-control-which-implicit-conversion-will-be-the-default-used ? I think we should prefer |
I actually read that before but IDEA complained when I tried to place the |
val cls = mirror.runtimeClass(t.typeSymbol.asClass) | ||
import scala.collection.generic.CanBuildFrom | ||
import scala.reflect.ClassTag | ||
import scala.util.{Try, Success} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark code style discourage the usage of Try
and Success
, can you refactor your code a little bit? i.e. move cls.getDeclaredMethod("canBuildFrom", classOf[ClassTag[_]])
out of the Invoke
code block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I tried looking up the code style you mentioned, but only found the Databricks' Scala Code Style Guide. And that is not mentioned in the Spark docs as far as I know.
|
||
/** @since 1.6.1 */ | ||
implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() | ||
implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : TypeTag]: Encoder[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my only concern now. Can you provide more details about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is the same as all the other ones, just with Product subclasses. If you were concerned about the TypeTag
on A
, it was actually not needed as T
's tag already contains all the information. I just tested it to be sure and removed it.
|
||
/** @since 1.6.1 */ | ||
implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() | ||
implicit def newProductSeqEncoder[A <: Product, T <: Seq[A] : TypeTag]: Encoder[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just use newProductSeqEncoder[T <: Seq[Product] : TypeTag]: Encoder[T]
here? Then we don't need the workaround implicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Seq
declares covariance on T
so it works and solves all the problems I was having. Thanks
…lting workarounds
@@ -130,6 +130,30 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { | |||
checkDataset(Seq(Array(Tuple1(1))).toDS(), Array(Tuple1(1))) | |||
} | |||
|
|||
test("arbitrary sequences") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also test nested sequences, e.g. List(Queue(1))
, and sequences inside product, e.g. List(1) -> Queue(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some sequence-product combination tests.
Nested sequences were never supported (tried on master and 2.0.2). That would probably be worthy of another ticket.
retest this please |
import org.apache.spark.sql.test.SharedSQLContext | ||
|
||
case class IntClass(value: Int) | ||
|
||
case class SeqCC(s: Seq[Int]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does CC
short for? How about SeqClass
?
LGTM, please create 2 more tickets for the optimization you metioned in #16240 (comment) and the nested custom collection problem. |
Test build #70859 has finished for PR 16240 at commit
|
you need to fix mima:
|
Not sure how to run MiMa tests locally so I tried my best to figure out what was necessary. Hope this fixes it. |
For future reference: https://github.com/apache/spark/blob/master/dev/mima (script to run mima) |
ok to test |
Test build #70943 has finished for PR 16240 at commit
|
* @since 1.6.1 | ||
* @deprecated use [[newIntSequenceEncoder]] | ||
*/ | ||
def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh you doesn't need to do this, just update the project/MimaExcludes.scala
to add something like ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newDoubleSeqEncoder")
. Please see the history of the MimaExcludes.scala
to see how others update this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, I'm not sure I agree... Do we want to break binary compatibility for libraries that might be using this function? That could have even been resolved implicitly, so it would be confusing when it breaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see, makes sense
thanks, merging to master! |
@michalsenkyr please create 2 more tickets for the optimization you metioned in #16240 (comment) and the nested custom collection problem. |
…auses a CompileException (converting sequence to list) ## What changes were proposed in this pull request? Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`. Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed `ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered `SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Also manual execution of the following sets of commands in the Spark shell: ```scala case class TestCC(key: Int, letters: List[String]) val ds1 = sc.makeRDD(Seq( (List("D")), (List("S","H")), (List("F","H")), (List("D","L","L")) )).map(x=>(x.length,x)).toDF("key","letters").as[TestCC] val test1=ds1.map{_.key} test1.show ``` ```scala case class X(l: List[String]) spark.createDataset(Seq(List("A"))).map(X).show ``` ```scala spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect ``` After adding arbitrary sequence support also tested with the following commands: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <[email protected]> Closes apache#16240 from michalsenkyr/sql-caseclass-list-fix.
…auses a CompileException (converting sequence to list) ## What changes were proposed in this pull request? Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`. Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed `ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered `SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Also manual execution of the following sets of commands in the Spark shell: ```scala case class TestCC(key: Int, letters: List[String]) val ds1 = sc.makeRDD(Seq( (List("D")), (List("S","H")), (List("F","H")), (List("D","L","L")) )).map(x=>(x.length,x)).toDF("key","letters").as[TestCC] val test1=ds1.map{_.key} test1.show ``` ```scala case class X(l: List[String]) spark.createDataset(Seq(List("A"))).map(X).show ``` ```scala spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect ``` After adding arbitrary sequence support also tested with the following commands: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <[email protected]> Closes apache#16240 from michalsenkyr/sql-caseclass-list-fix.
## What changes were proposed in this pull request? Optimization of arbitrary Scala sequence deserialization introduced by apache#16240. The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases. This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly. Example codegen for simple `List` (obtained using `Seq(List(1)).toDS().map(identity).queryExecution.debug.codegen`): Before: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean deserializetoobject_resultIsNull; /* 010 */ private java.lang.Object[] deserializetoobject_argValue; /* 011 */ private boolean MapObjects_loopIsNull1; /* 012 */ private int MapObjects_loopValue0; /* 013 */ private boolean deserializetoobject_resultIsNull1; /* 014 */ private scala.collection.generic.CanBuildFrom deserializetoobject_argValue1; /* 015 */ private UnsafeRow deserializetoobject_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 018 */ private scala.collection.immutable.List mapelements_argValue; /* 019 */ private UnsafeRow mapelements_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 022 */ private scala.collection.immutable.List serializefromobject_argValue; /* 023 */ private UnsafeRow serializefromobject_result; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 027 */ /* 028 */ public GeneratedIterator(Object[] references) { /* 029 */ this.references = references; /* 030 */ } /* 031 */ /* 032 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 033 */ partitionIndex = index; /* 034 */ this.inputs = inputs; /* 035 */ inputadapter_input = inputs[0]; /* 036 */ /* 037 */ deserializetoobject_result = new UnsafeRow(1); /* 038 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 039 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 040 */ /* 041 */ mapelements_result = new UnsafeRow(1); /* 042 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 043 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 044 */ /* 045 */ serializefromobject_result = new UnsafeRow(1); /* 046 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 047 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 048 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 054 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 055 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 056 */ /* 057 */ deserializetoobject_resultIsNull = false; /* 058 */ /* 059 */ if (!deserializetoobject_resultIsNull) { /* 060 */ ArrayData deserializetoobject_value3 = null; /* 061 */ /* 062 */ if (!false) { /* 063 */ Integer[] deserializetoobject_convertedArray = null; /* 064 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 065 */ deserializetoobject_convertedArray = new Integer[deserializetoobject_dataLength]; /* 066 */ /* 067 */ int deserializetoobject_loopIndex = 0; /* 068 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 069 */ MapObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 070 */ MapObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 071 */ /* 072 */ if (MapObjects_loopIsNull1) { /* 073 */ throw new RuntimeException(((java.lang.String) references[0])); /* 074 */ } /* 075 */ if (false) { /* 076 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 077 */ } else { /* 078 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue0; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_loopIndex += 1; /* 082 */ } /* 083 */ /* 084 */ deserializetoobject_value3 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /* 085 */ } /* 086 */ boolean deserializetoobject_isNull2 = true; /* 087 */ java.lang.Object[] deserializetoobject_value2 = null; /* 088 */ if (!false) { /* 089 */ deserializetoobject_isNull2 = false; /* 090 */ if (!deserializetoobject_isNull2) { /* 091 */ Object deserializetoobject_funcResult = null; /* 092 */ deserializetoobject_funcResult = deserializetoobject_value3.array(); /* 093 */ if (deserializetoobject_funcResult == null) { /* 094 */ deserializetoobject_isNull2 = true; /* 095 */ } else { /* 096 */ deserializetoobject_value2 = (java.lang.Object[]) deserializetoobject_funcResult; /* 097 */ } /* 098 */ /* 099 */ } /* 100 */ deserializetoobject_isNull2 = deserializetoobject_value2 == null; /* 101 */ } /* 102 */ deserializetoobject_resultIsNull = deserializetoobject_isNull2; /* 103 */ deserializetoobject_argValue = deserializetoobject_value2; /* 104 */ } /* 105 */ /* 106 */ boolean deserializetoobject_isNull1 = deserializetoobject_resultIsNull; /* 107 */ final scala.collection.Seq deserializetoobject_value1 = deserializetoobject_resultIsNull ? null : scala.collection.mutable.WrappedArray.make(deserializetoobject_argValue); /* 108 */ deserializetoobject_isNull1 = deserializetoobject_value1 == null; /* 109 */ boolean deserializetoobject_isNull = true; /* 110 */ scala.collection.immutable.List deserializetoobject_value = null; /* 111 */ if (!deserializetoobject_isNull1) { /* 112 */ deserializetoobject_resultIsNull1 = false; /* 113 */ /* 114 */ if (!deserializetoobject_resultIsNull1) { /* 115 */ boolean deserializetoobject_isNull6 = false; /* 116 */ final scala.collection.generic.CanBuildFrom deserializetoobject_value6 = false ? null : scala.collection.immutable.List.canBuildFrom(); /* 117 */ deserializetoobject_isNull6 = deserializetoobject_value6 == null; /* 118 */ deserializetoobject_resultIsNull1 = deserializetoobject_isNull6; /* 119 */ deserializetoobject_argValue1 = deserializetoobject_value6; /* 120 */ } /* 121 */ /* 122 */ deserializetoobject_isNull = deserializetoobject_resultIsNull1; /* 123 */ if (!deserializetoobject_isNull) { /* 124 */ Object deserializetoobject_funcResult1 = null; /* 125 */ deserializetoobject_funcResult1 = deserializetoobject_value1.to(deserializetoobject_argValue1); /* 126 */ if (deserializetoobject_funcResult1 == null) { /* 127 */ deserializetoobject_isNull = true; /* 128 */ } else { /* 129 */ deserializetoobject_value = (scala.collection.immutable.List) deserializetoobject_funcResult1; /* 130 */ } /* 131 */ /* 132 */ } /* 133 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 134 */ } /* 135 */ /* 136 */ boolean mapelements_isNull = true; /* 137 */ scala.collection.immutable.List mapelements_value = null; /* 138 */ if (!false) { /* 139 */ mapelements_argValue = deserializetoobject_value; /* 140 */ /* 141 */ mapelements_isNull = false; /* 142 */ if (!mapelements_isNull) { /* 143 */ Object mapelements_funcResult = null; /* 144 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 145 */ if (mapelements_funcResult == null) { /* 146 */ mapelements_isNull = true; /* 147 */ } else { /* 148 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 149 */ } /* 150 */ /* 151 */ } /* 152 */ mapelements_isNull = mapelements_value == null; /* 153 */ } /* 154 */ /* 155 */ if (mapelements_isNull) { /* 156 */ throw new RuntimeException(((java.lang.String) references[2])); /* 157 */ } /* 158 */ serializefromobject_argValue = mapelements_value; /* 159 */ /* 160 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 161 */ serializefromobject_holder.reset(); /* 162 */ /* 163 */ // Remember the current cursor so that we can calculate how many bytes are /* 164 */ // written later. /* 165 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 166 */ /* 167 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 168 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 169 */ // grow the global buffer before writing data. /* 170 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 171 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 172 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 173 */ /* 174 */ } else { /* 175 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 176 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 177 */ /* 178 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 179 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 180 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 181 */ } else { /* 182 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 183 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 184 */ } /* 185 */ } /* 186 */ } /* 187 */ /* 188 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 189 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 190 */ append(serializefromobject_result); /* 191 */ if (shouldStop()) return; /* 192 */ } /* 193 */ } /* 194 */ } ``` After: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjects_loopIsNull1; /* 010 */ private int CollectObjects_loopValue0; /* 011 */ private UnsafeRow deserializetoobject_result; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 014 */ private scala.collection.immutable.List mapelements_argValue; /* 015 */ private UnsafeRow mapelements_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 018 */ private scala.collection.immutable.List serializefromobject_argValue; /* 019 */ private UnsafeRow serializefromobject_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 023 */ /* 024 */ public GeneratedIterator(Object[] references) { /* 025 */ this.references = references; /* 026 */ } /* 027 */ /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 029 */ partitionIndex = index; /* 030 */ this.inputs = inputs; /* 031 */ inputadapter_input = inputs[0]; /* 032 */ /* 033 */ deserializetoobject_result = new UnsafeRow(1); /* 034 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 035 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 036 */ /* 037 */ mapelements_result = new UnsafeRow(1); /* 038 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 039 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 040 */ /* 041 */ serializefromobject_result = new UnsafeRow(1); /* 042 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 043 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 044 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 045 */ /* 046 */ } /* 047 */ /* 048 */ protected void processNext() throws java.io.IOException { /* 049 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 050 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 051 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 052 */ /* 053 */ scala.collection.immutable.List deserializetoobject_value = null; /* 054 */ /* 055 */ if (!false) { /* 056 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 057 */ scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder(); /* 058 */ CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength); /* 059 */ /* 060 */ int deserializetoobject_loopIndex = 0; /* 061 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 062 */ CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 063 */ CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 064 */ /* 065 */ if (CollectObjects_loopIsNull1) { /* 066 */ throw new RuntimeException(((java.lang.String) references[0])); /* 067 */ } /* 068 */ if (false) { /* 069 */ CollectObjects_builderValue2.$plus$eq(null); /* 070 */ } else { /* 071 */ CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0); /* 072 */ } /* 073 */ /* 074 */ deserializetoobject_loopIndex += 1; /* 075 */ } /* 076 */ /* 077 */ deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result(); /* 078 */ } /* 079 */ /* 080 */ boolean mapelements_isNull = true; /* 081 */ scala.collection.immutable.List mapelements_value = null; /* 082 */ if (!false) { /* 083 */ mapelements_argValue = deserializetoobject_value; /* 084 */ /* 085 */ mapelements_isNull = false; /* 086 */ if (!mapelements_isNull) { /* 087 */ Object mapelements_funcResult = null; /* 088 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 089 */ if (mapelements_funcResult == null) { /* 090 */ mapelements_isNull = true; /* 091 */ } else { /* 092 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 093 */ } /* 094 */ /* 095 */ } /* 096 */ mapelements_isNull = mapelements_value == null; /* 097 */ } /* 098 */ /* 099 */ if (mapelements_isNull) { /* 100 */ throw new RuntimeException(((java.lang.String) references[2])); /* 101 */ } /* 102 */ serializefromobject_argValue = mapelements_value; /* 103 */ /* 104 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 105 */ serializefromobject_holder.reset(); /* 106 */ /* 107 */ // Remember the current cursor so that we can calculate how many bytes are /* 108 */ // written later. /* 109 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 110 */ /* 111 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 112 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 113 */ // grow the global buffer before writing data. /* 114 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 115 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 116 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 117 */ /* 118 */ } else { /* 119 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 120 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 121 */ /* 122 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 123 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 124 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 125 */ } else { /* 126 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 127 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 128 */ } /* 129 */ } /* 130 */ } /* 131 */ /* 132 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 133 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 134 */ append(serializefromobject_result); /* 135 */ if (shouldStop()) return; /* 136 */ } /* 137 */ } /* 138 */ } ``` Benchmark results before: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 269 / 370 0.0 269125.8 1.0X List 154 / 176 0.0 154453.5 1.7X mutable.Queue 210 / 233 0.0 209691.6 1.3X ``` Benchmark results after: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 255 / 316 0.0 254697.3 1.0X List 152 / 177 0.0 152410.0 1.7X mutable.Queue 213 / 235 0.0 213470.0 1.2X ``` ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Additionally in Spark Shell: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <[email protected]> Closes apache#16541 from michalsenkyr/dataset-seq-builder.
What changes were proposed in this pull request?
Added a
to
call at the end of the code generated byScalaReflection.deserializerFor
if the requested type is not a supertype ofWrappedArray[_]
that usesCanBuildFrom[_, _, _]
to convert result into an arbitrary subtype ofSeq[_]
.Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed
ScalaReflection.serializerFor
could already be used to serialize anySeq[_]
so it was not alteredSQLImplicits
had to be altered and new implicit encoders added to permit serialization of other sequence typesAlso fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException
How was this patch tested?
./build/mvn -DskipTests clean package && ./dev/run-tests
Also manual execution of the following sets of commands in the Spark shell:
After adding arbitrary sequence support also tested with the following commands: