From 4e538913901de6cb439e9919e958a60ab698fe24 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Jan 2018 19:30:34 +0800 Subject: [PATCH 1/2] add calendar interval type support to ColumnVector --- .../spark/unsafe/types/CalendarInterval.java | 4 +- .../vectorized/MutableColumnarRow.java | 4 +- .../sql/vectorized/ArrowColumnVector.java | 2 +- .../spark/sql/vectorized/ColumnVector.java | 24 +++++++++- .../spark/sql/vectorized/ColumnarArray.java | 4 +- .../spark/sql/vectorized/ColumnarRow.java | 4 +- .../vectorized/ColumnarBatchSuite.scala | 45 +++++++++++++++++-- 7 files changed, 71 insertions(+), 16 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 621f2c6bf3777..abab7289fcf23 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -87,7 +87,7 @@ public static CalendarInterval fromString(String s) { } } - public static long toLongWithRange(String fieldName, + private static long toLongWithRange(String fieldName, String s, long minValue, long maxValue) throws IllegalArgumentException { long result = 0; if (s != null) { @@ -235,7 +235,7 @@ public static CalendarInterval fromSingleUnitString(String unit, String s) /** * Parse second_nano string in ss.nnnnnnnnn format to microseconds */ - public static long parseSecondNano(String secondNano) throws IllegalArgumentException { + private static long parseSecondNano(String secondNano) throws IllegalArgumentException { String[] parts = secondNano.split("\\."); if (parts.length == 1) { return toLongWithRange("second", parts[0], Long.MIN_VALUE / MICROS_PER_SECOND, diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java index 2bab095d4d951..66668f3753604 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java @@ -146,9 +146,7 @@ public byte[] getBinary(int ordinal) { @Override public CalendarInterval getInterval(int ordinal) { if (columns[ordinal].isNullAt(rowId)) return null; - final int months = columns[ordinal].getChild(0).getInt(rowId); - final long microseconds = columns[ordinal].getChild(1).getLong(rowId); - return new CalendarInterval(months, microseconds); + return columns[ordinal].getInterval(rowId); } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 9803c3dec6de2..a75d76bd0f82e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.UTF8String; /** - * A column vector backed by Apache Arrow. Currently time interval type and map type are not + * A column vector backed by Apache Arrow. Currently calendar interval type and map type are not * supported. */ @InterfaceStability.Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 4b955ceddd0f2..767a110aedf28 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -20,6 +20,7 @@ import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; /** @@ -195,6 +196,7 @@ public double[] getDoubles(int rowId, int count) { * struct field. */ public final ColumnarRow getStruct(int rowId) { + if (isNullAt(rowId)) return null; return new ColumnarRow(this, rowId); } @@ -235,10 +237,30 @@ public MapData getMap(int ordinal) { */ public abstract byte[] getBinary(int rowId); + /** + * Returns the calendar interval type value for rowId. + * + * In Spark, calendar interval type value is basically an integer value representing the number of + * months in this interval, and a long value representing the number of microseconds in this + * interval. A interval type vector is same as a struct type vector with 2 fields: `months` and + * `microseconds`. + * + * To support interval type, implementations must implement {@link #getChild(int)} and define 2 + * child vectors: the first child vector is a int type vector, containing all the month values of + * all the interval values in this vector. The second child vector is a long type vector, + * containing all the microsecond values of all the interval values in this vector. + */ + public final CalendarInterval getInterval(int rowId) { + if (isNullAt(rowId)) return null; + final int months = getChild(0).getInt(rowId); + final long microseconds = getChild(1).getLong(rowId); + return new CalendarInterval(months, microseconds); + } + /** * Returns the ordinal's child column vector. */ - public abstract ColumnVector getChild(int ordinal); + protected abstract ColumnVector getChild(int ordinal); /** * Data type for this column. diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java index 0d2c3ec8648d3..72c07ee7cad3f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java @@ -135,9 +135,7 @@ public byte[] getBinary(int ordinal) { @Override public CalendarInterval getInterval(int ordinal) { - int month = data.getChild(0).getInt(offset + ordinal); - long microseconds = data.getChild(1).getLong(offset + ordinal); - return new CalendarInterval(month, microseconds); + return data.getInterval(offset + ordinal); } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java index 25db7e09d20d0..6ca749d7c6e85 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java @@ -139,9 +139,7 @@ public byte[] getBinary(int ordinal) { @Override public CalendarInterval getInterval(int ordinal) { if (data.getChild(ordinal).isNullAt(rowId)) return null; - final int months = data.getChild(ordinal).getChild(0).getInt(rowId); - final long microseconds = data.getChild(ordinal).getChild(1).getLong(rowId); - return new CalendarInterval(months, microseconds); + return data.getChild(ordinal).getInterval(rowId); } @Override diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 1873c24ab063c..925c101fe1fee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -620,6 +620,39 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 0) } + testVector("CalendarInterval APIs", 4, CalendarIntervalType) { + column => + val reference = mutable.ArrayBuffer.empty[CalendarInterval] + + val months = column.getChild(0) + val microseconds = column.getChild(1) + assert(months.dataType() == IntegerType) + assert(microseconds.dataType() == LongType) + + months.putInt(0, 1) + microseconds.putLong(0, 100) + reference += new CalendarInterval(1, 100) + + months.putInt(1, 0) + microseconds.putLong(1, 2000) + reference += new CalendarInterval(0, 2000) + + column.putNull(2) + reference += null + + months.putInt(3, 20) + microseconds.putLong(3, 0) + reference += new CalendarInterval(20, 0) + + reference.zipWithIndex.foreach { case (v, i) => + val errMsg = "VectorType=" + column.getClass.getSimpleName + assert(v == column.getInterval(i), errMsg) + if (v == null) assert(column.isNullAt(i), errMsg) + } + + column.close() + } + testVector("Int Array", 10, new ArrayType(IntegerType, true)) { column => @@ -739,14 +772,20 @@ class ColumnarBatchSuite extends SparkFunSuite { c1.putInt(0, 123) c2.putDouble(0, 3.45) - c1.putInt(1, 456) - c2.putDouble(1, 5.67) + + column.putNull(1) + + c1.putInt(2, 456) + c2.putDouble(2, 5.67) val s = column.getStruct(0) assert(s.getInt(0) == 123) assert(s.getDouble(1) == 3.45) - val s2 = column.getStruct(1) + assert(column.isNullAt(1)) + assert(column.getStruct(1) == null) + + val s2 = column.getStruct(2) assert(s2.getInt(0) == 456) assert(s2.getDouble(1) == 5.67) } From 2f23a1d4a6f6968b1c1209b94ca340ca25cc67e1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Jan 2018 20:55:25 +0800 Subject: [PATCH 2/2] address comments --- .../org/apache/spark/unsafe/types/CalendarInterval.java | 4 ++-- .../org/apache/spark/sql/vectorized/ColumnVector.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index abab7289fcf23..621f2c6bf3777 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -87,7 +87,7 @@ public static CalendarInterval fromString(String s) { } } - private static long toLongWithRange(String fieldName, + public static long toLongWithRange(String fieldName, String s, long minValue, long maxValue) throws IllegalArgumentException { long result = 0; if (s != null) { @@ -235,7 +235,7 @@ public static CalendarInterval fromSingleUnitString(String unit, String s) /** * Parse second_nano string in ss.nnnnnnnnn format to microseconds */ - private static long parseSecondNano(String secondNano) throws IllegalArgumentException { + public static long parseSecondNano(String secondNano) throws IllegalArgumentException { String[] parts = secondNano.split("\\."); if (parts.length == 1) { return toLongWithRange("second", parts[0], Long.MIN_VALUE / MICROS_PER_SECOND, diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 767a110aedf28..111f5d9b358d4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -242,11 +242,11 @@ public MapData getMap(int ordinal) { * * In Spark, calendar interval type value is basically an integer value representing the number of * months in this interval, and a long value representing the number of microseconds in this - * interval. A interval type vector is same as a struct type vector with 2 fields: `months` and - * `microseconds`. + * interval. An interval type vector is the same as a struct type vector with 2 fields: `months` + * and `microseconds`. * * To support interval type, implementations must implement {@link #getChild(int)} and define 2 - * child vectors: the first child vector is a int type vector, containing all the month values of + * child vectors: the first child vector is an int type vector, containing all the month values of * all the interval values in this vector. The second child vector is a long type vector, * containing all the microsecond values of all the interval values in this vector. */ @@ -258,7 +258,7 @@ public final CalendarInterval getInterval(int rowId) { } /** - * Returns the ordinal's child column vector. + * @return child [[ColumnVector]] at the given ordinal. */ protected abstract ColumnVector getChild(int ordinal);