Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23280][SQL] add map type support to ColumnVector #20450

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

/**
Expand Down Expand Up @@ -172,6 +173,11 @@ public ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
Expand All @@ -30,6 +32,7 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -109,6 +112,18 @@ public static int[] toJavaIntArray(ColumnarArray array) {
return array.toIntArray();
}

public static Map<Integer, Integer> toJavaIntMap(ColumnarMap map) {
int[] keys = toJavaIntArray(map.keyArray());
int[] values = toJavaIntArray(map.valueArray());
assert keys.length == values.length;

Map<Integer, Integer> result = new HashMap<>();
for (int i = 0; i < keys.length; i++) {
result.put(keys[i], values[i]);
}
return result;
}

private static void appendValue(WritableColumnVector dst, DataType t, Object o) {
if (o == null) {
if (t instanceof CalendarIntervalType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[]
private long nulls;
private long data;

// Set iff the type is array.
// Only set if type is Array or Map.
private long lengthData;
private long offsetData;

Expand Down Expand Up @@ -530,7 +530,7 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) {
@Override
protected void reserveInternal(int newCapacity) {
int oldCapacity = (nulls == 0L) ? 0 : capacity;
if (isArray()) {
if (isArray() || type instanceof MapType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may also have a method isMap().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an overkill, isArray needs to take care of many types, but isMap we only accept one type: MapType.

this.lengthData =
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
this.offsetData =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] f
private float[] floatData;
private double[] doubleData;

// Only set if type is Array.
// Only set if type is Array or Map.
private int[] arrayLengths;
private int[] arrayOffsets;

Expand Down Expand Up @@ -503,7 +503,7 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) {
// Spilt this function out since it is the slow path.
@Override
protected void reserveInternal(int newCapacity) {
if (isArray()) {
if (isArray() || type instanceof MapType) {
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
if (this.arrayLengths != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -607,6 +608,13 @@ public final ColumnarArray getArray(int rowId) {
return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId));
}

// `WritableColumnVector` puts the key array in the first child column vector, value array in the
// second child column vector, and puts the offsets and lengths in the current column vector.
@Override
public final ColumnarMap getMap(int rowId) {
return new ColumnarMap(getChild(0), getChild(1), getArrayOffset(rowId), getArrayLength(rowId));
}

public WritableColumnVector arrayData() {
return childColumns[0];
}
Expand Down Expand Up @@ -700,6 +708,11 @@ protected WritableColumnVector(int capacity, DataType type) {
for (int i = 0; i < childColumns.length; ++i) {
this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
}
} else if (type instanceof MapType) {
MapType mapType = (MapType) type;
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
} else if (type instanceof CalendarIntervalType) {
// Two columns. Months as int. Microseconds as Long.
this.childColumns = new WritableColumnVector[2];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public ColumnarArray getArray(int rowId) {
return accessor.getArray(rowId);
}

@Override
public ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,18 @@ public final ColumnarRow getStruct(int rowId) {

/**
* Returns the map type value for rowId.
*
* In Spark, map type value is basically a key data array and a value data array. A key from the
* key array with a index and a value from the value array with the same index contribute to
* an entry of this map type value.
*
* To support map type, implementations must construct an {@link ColumnarMap} and return it in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

construct an -> construct a.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very minor, may not worth to wait for another QA round. Maybe we can fix it in your "return null" PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. LGTM.

* this method. {@link ColumnarMap} requires a {@link ColumnVector} that stores the data of all
* the keys of all the maps in this vector, and another {@link ColumnVector} that stores the data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maps -> map entries ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keys of map entries sounds weird...

* of all the values of all the maps in this vector, and a pair of offset and length which
* specify the range of the key/value array that belongs to the map type value at rowId.
*/
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
}
public abstract ColumnarMap getMap(int ordinal);

/**
* Returns the decimal type value for rowId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public ColumnarArray getArray(int ordinal) {
}

@Override
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
public ColumnarMap getMap(int ordinal) {
return data.getMap(offset + ordinal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.util.MapData;

/**
* Map abstraction in {@link ColumnVector}.
*/
public final class ColumnarMap extends MapData {
private final ColumnarArray keys;
private final ColumnarArray values;
private final int length;

public ColumnarMap(ColumnVector keys, ColumnVector values, int offset, int length) {
this.length = length;
this.keys = new ColumnarArray(keys, offset, length);
this.values = new ColumnarArray(values, offset, length);
}

@Override
public int numElements() { return length; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numElements or length?

Copy link
Contributor Author

@cloud-fan cloud-fan Feb 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a API from parent, we can't change it.


@Override
public ColumnarArray keyArray() {
return keys;
}

@Override
public ColumnarArray valueArray() {
return values;
}

@Override
public ColumnarMap copy() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ public ColumnarArray getArray(int ordinal) {
}

@Override
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
public ColumnarMap getMap(int ordinal) {
if (data.getChild(ordinal).isNullAt(rowId)) return null;
return data.getChild(ordinal).getMap(rowId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,35 +664,37 @@ class ColumnarBatchSuite extends SparkFunSuite {
i += 1
}

// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
// Populate it with arrays [0], [1, 2], null, [], [3, 4, 5]
column.putArray(0, 0, 1)
column.putArray(1, 1, 2)
column.putArray(2, 2, 0)
column.putArray(3, 3, 3)
column.putNull(2)
column.putArray(3, 3, 0)
column.putArray(4, 3, 3)

assert(column.getArray(0).numElements == 1)
assert(column.getArray(1).numElements == 2)
assert(column.isNullAt(2))
assert(column.getArray(3).numElements == 0)
assert(column.getArray(4).numElements == 3)

val a1 = ColumnVectorUtils.toJavaIntArray(column.getArray(0))
val a2 = ColumnVectorUtils.toJavaIntArray(column.getArray(1))
val a3 = ColumnVectorUtils.toJavaIntArray(column.getArray(2))
val a4 = ColumnVectorUtils.toJavaIntArray(column.getArray(3))
val a3 = ColumnVectorUtils.toJavaIntArray(column.getArray(3))
val a4 = ColumnVectorUtils.toJavaIntArray(column.getArray(4))
assert(a1 === Array(0))
assert(a2 === Array(1, 2))
assert(a3 === Array.empty[Int])
assert(a4 === Array(3, 4, 5))

// Verify the ArrayData APIs
assert(column.getArray(0).numElements() == 1)
// Verify the ArrayData get APIs
assert(column.getArray(0).getInt(0) == 0)

assert(column.getArray(1).numElements() == 2)
assert(column.getArray(1).getInt(0) == 1)
assert(column.getArray(1).getInt(1) == 2)

assert(column.getArray(2).numElements() == 0)

assert(column.getArray(3).numElements() == 3)
assert(column.getArray(3).getInt(0) == 3)
assert(column.getArray(3).getInt(1) == 4)
assert(column.getArray(3).getInt(2) == 5)
assert(column.getArray(4).getInt(0) == 3)
assert(column.getArray(4).getInt(1) == 4)
assert(column.getArray(4).getInt(2) == 5)

// Add a longer array which requires resizing
column.reset()
Expand All @@ -702,8 +704,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(data.capacity == array.length * 2)
data.putInts(0, array.length, array, 0)
column.putArray(0, 0, array.length)
assert(ColumnVectorUtils.toJavaIntArray(column.getArray(0))
=== array)
assert(ColumnVectorUtils.toJavaIntArray(column.getArray(0)) === array)
}

test("toArray for primitive types") {
Expand Down Expand Up @@ -761,6 +762,43 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}

test("Int Map") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
val column = allocate(10, new MapType(IntegerType, IntegerType, false), memMode)
(0 to 1).foreach { colIndex =>
val data = column.getChild(colIndex)
(0 to 5).foreach {i =>
data.putInt(i, i * (colIndex + 1))
}
}

// Populate it with maps [0->0], [1->2, 2->4], null, [], [3->6, 4->8, 5->10]
column.putArray(0, 0, 1)
column.putArray(1, 1, 2)
column.putNull(2)
column.putArray(3, 3, 0)
column.putArray(4, 3, 3)

assert(column.getMap(0).numElements == 1)
assert(column.getMap(1).numElements == 2)
assert(column.isNullAt(2))
assert(column.getMap(3).numElements == 0)
assert(column.getMap(4).numElements == 3)

val a1 = ColumnVectorUtils.toJavaIntMap(column.getMap(0))
val a2 = ColumnVectorUtils.toJavaIntMap(column.getMap(1))
val a4 = ColumnVectorUtils.toJavaIntMap(column.getMap(3))
val a5 = ColumnVectorUtils.toJavaIntMap(column.getMap(4))

assert(a1.asScala == Map(0 -> 0))
assert(a2.asScala == Map(1 -> 2, 2 -> 4))
assert(a4.asScala == Map())
assert(a5.asScala == Map(3 -> 6, 4 -> 8, 5 -> 10))

column.close()
}
}

testVector(
"Struct Column",
10,
Expand Down