From 34ebaa5f1fa58aaddaf2d94642fa0ba7d8f7f609 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 18 Feb 2025 14:29:35 +0800 Subject: [PATCH 1/3] [VL]Fix mode in UnsafeColumnarBuildSideRelation not get properly serialize --- .../UnsafeColumnarBuildSideRelation.scala | 6 +- .../UnsafeColumnarBuildSideRelationTest.scala | 100 ++++++++++++++++++ 2 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index ac9aef3bdd9a..47f659d189a7 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -104,7 +104,7 @@ case class UnsafeColumnarBuildSideRelation( override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException { kryo.writeObject(out, output.toList) - kryo.writeObject(out, mode) + kryo.writeClassAndObject(out, mode) out.writeInt(batches.arraySize) kryo.writeObject(out, batches.bytesBufferLengths) out.writeLong(batches.totalBytes) @@ -136,14 +136,14 @@ case class UnsafeColumnarBuildSideRelation( for (i <- 0 until totalArraySize) { val length = bytesBufferLengths(i) val tmpBuffer = new Array[Byte](length) - in.read(tmpBuffer) + in.readFully(tmpBuffer) batches.putBytesBuffer(i, tmpBuffer) } } override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { output = kryo.readObject(in, classOf[List[_]]).asInstanceOf[Seq[Attribute]] - mode = kryo.readObject(in, classOf[BroadcastMode]) + mode = kryo.readClassAndObject(in).asInstanceOf[BroadcastMode] val totalArraySize = in.readInt() val bytesBufferLengths = kryo.readObject(in, classOf[Array[Int]]) val totalBytes = in.readLong() diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala new file mode 100644 index 000000000000..55e224adf33b --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.unsafe; + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager} +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode +import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType; + +class UnsafeColumnarBuildSideRelationTest extends SharedSparkSession { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.memory.offHeap.enabled", "true") + } + + var unsafeRelWithIdentityMode: UnsafeColumnarBuildSideRelation = null + var unsafeRelWithHashMode: UnsafeColumnarBuildSideRelation = null + + override def beforeAll(): Unit = { + super.beforeAll() + val taskMemoryManager = new TaskMemoryManager( + new UnifiedMemoryManager(SparkEnv.get.conf, Long.MaxValue, Long.MaxValue / 2, 1), + 0) + val a = AttributeReference("a", StringType, nullable = false, null)() + val output = Seq(a) + val totalArraySize = 1 + val perArraySize = new Array[Int](totalArraySize) + perArraySize(0) = 10 + val bytesArray = UnsafeBytesBufferArray( + 1, + perArraySize, + 10, + taskMemoryManager + ) + bytesArray.putBytesBuffer(0, "1234567890".getBytes()) + unsafeRelWithIdentityMode = UnsafeColumnarBuildSideRelation( + output, + bytesArray, + IdentityBroadcastMode + ) + unsafeRelWithHashMode = UnsafeColumnarBuildSideRelation( + output, + bytesArray, + HashedRelationBroadcastMode(output, isNullAware = false) + ) + } + + test("Java default serialization") { + val javaSerialization = new JavaSerializer(SparkEnv.get.conf) + val serializerInstance = javaSerialization.newInstance() + + // test unsafeRelWithIdentityMode + val buffer = serializerInstance.serialize(unsafeRelWithIdentityMode) + val obj = serializerInstance.deserialize[UnsafeColumnarBuildSideRelation](buffer) + assert(obj != null) + assert(obj.mode == IdentityBroadcastMode) + + // test unsafeRelWithHashMode + val buffer2 = serializerInstance.serialize(unsafeRelWithHashMode) + val obj2 = serializerInstance.deserialize[UnsafeColumnarBuildSideRelation](buffer2) + assert(obj2 != null) + assert(obj2.mode.isInstanceOf[HashedRelationBroadcastMode]) + } + + test("Kryo serialization") { + val kryo = new KryoSerializer(SparkEnv.get.conf) + val serializerInstance = kryo.newInstance() + + // test unsafeRelWithIdentityMode + val buffer = serializerInstance.serialize(unsafeRelWithIdentityMode) + val obj = serializerInstance.deserialize[UnsafeColumnarBuildSideRelation](buffer) + assert(obj != null) + assert(obj.mode == IdentityBroadcastMode) + + // test unsafeRelWithHashMode + val buffer2 = serializerInstance.serialize(unsafeRelWithHashMode) + val obj2 = serializerInstance.deserialize[UnsafeColumnarBuildSideRelation](buffer2) + assert(obj2 != null) + assert(obj2.mode.isInstanceOf[HashedRelationBroadcastMode]) + } + +} From 785dea394a8ef161c34e0e4e36825c7d2ce6ac07 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 18 Feb 2025 14:37:50 +0800 Subject: [PATCH 2/3] fix init error --- .../execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala index 55e224adf33b..f47c8bd562b8 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StringType; class UnsafeColumnarBuildSideRelationTest extends SharedSparkSession { override protected def sparkConf: SparkConf = { super.sparkConf + .set("spark.memory.offHeap.size", "200M") .set("spark.memory.offHeap.enabled", "true") } From aa38bff98f11484d171fd82e1b7b5dc56918a911 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 18 Feb 2025 19:31:53 +0800 Subject: [PATCH 3/3] trigger ci