Skip to content

Commit

Permalink
[SPARK-23280][SQL] add map type support to ColumnVector
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Fill the last missing piece of `ColumnVector`: the map type support.

The idea is similar to the array type support. A map is basically 2 arrays: keys and values. We ask the implementations to provide a key array, a value array, and an offset and length to specify the range of this map in the key/value array.

In `WritableColumnVector`, we put the key array in first child vector, and value array in second child vector, and offsets and lengths in the current vector, which is very similar to how array type is implemented here.

## How was this patch tested?

a new test

Author: Wenchen Fan <[email protected]>

Closes #20450 from cloud-fan/map.
  • Loading branch information
cloud-fan committed Feb 1, 2018
1 parent f470df2 commit 52e00f7
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

/**
Expand Down Expand Up @@ -177,6 +178,11 @@ public ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
Expand All @@ -30,6 +32,7 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -109,6 +112,18 @@ public static int[] toJavaIntArray(ColumnarArray array) {
return array.toIntArray();
}

public static Map<Integer, Integer> toJavaIntMap(ColumnarMap map) {
int[] keys = toJavaIntArray(map.keyArray());
int[] values = toJavaIntArray(map.valueArray());
assert keys.length == values.length;

Map<Integer, Integer> result = new HashMap<>();
for (int i = 0; i < keys.length; i++) {
result.put(keys[i], values[i]);
}
return result;
}

private static void appendValue(WritableColumnVector dst, DataType t, Object o) {
if (o == null) {
if (t instanceof CalendarIntervalType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[]
private long nulls;
private long data;

// Set iff the type is array.
// Only set if type is Array or Map.
private long lengthData;
private long offsetData;

Expand Down Expand Up @@ -530,7 +530,7 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) {
@Override
protected void reserveInternal(int newCapacity) {
int oldCapacity = (nulls == 0L) ? 0 : capacity;
if (isArray()) {
if (isArray() || type instanceof MapType) {
this.lengthData =
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
this.offsetData =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] f
private float[] floatData;
private double[] doubleData;

// Only set if type is Array.
// Only set if type is Array or Map.
private int[] arrayLengths;
private int[] arrayOffsets;

Expand Down Expand Up @@ -503,7 +503,7 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) {
// Spilt this function out since it is the slow path.
@Override
protected void reserveInternal(int newCapacity) {
if (isArray()) {
if (isArray() || type instanceof MapType) {
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
if (this.arrayLengths != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -612,6 +613,13 @@ public final ColumnarArray getArray(int rowId) {
return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId));
}

// `WritableColumnVector` puts the key array in the first child column vector, value array in the
// second child column vector, and puts the offsets and lengths in the current column vector.
@Override
public final ColumnarMap getMap(int rowId) {
return new ColumnarMap(getChild(0), getChild(1), getArrayOffset(rowId), getArrayLength(rowId));
}

public WritableColumnVector arrayData() {
return childColumns[0];
}
Expand Down Expand Up @@ -705,6 +713,11 @@ protected WritableColumnVector(int capacity, DataType type) {
for (int i = 0; i < childColumns.length; ++i) {
this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
}
} else if (type instanceof MapType) {
MapType mapType = (MapType) type;
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
} else if (type instanceof CalendarIntervalType) {
// Two columns. Months as int. Microseconds as Long.
this.childColumns = new WritableColumnVector[2];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public ColumnarArray getArray(int rowId) {
return accessor.getArray(rowId);
}

@Override
public ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,18 @@ public final ColumnarRow getStruct(int rowId) {

/**
* Returns the map type value for rowId.
*
* In Spark, map type value is basically a key data array and a value data array. A key from the
* key array with a index and a value from the value array with the same index contribute to
* an entry of this map type value.
*
* To support map type, implementations must construct an {@link ColumnarMap} and return it in
* this method. {@link ColumnarMap} requires a {@link ColumnVector} that stores the data of all
* the keys of all the maps in this vector, and another {@link ColumnVector} that stores the data
* of all the values of all the maps in this vector, and a pair of offset and length which
* specify the range of the key/value array that belongs to the map type value at rowId.
*/
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
}
public abstract ColumnarMap getMap(int ordinal);

/**
* Returns the decimal type value for rowId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public ColumnarArray getArray(int ordinal) {
}

@Override
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
public ColumnarMap getMap(int ordinal) {
return data.getMap(offset + ordinal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.util.MapData;

/**
* Map abstraction in {@link ColumnVector}.
*/
public final class ColumnarMap extends MapData {
private final ColumnarArray keys;
private final ColumnarArray values;
private final int length;

public ColumnarMap(ColumnVector keys, ColumnVector values, int offset, int length) {
this.length = length;
this.keys = new ColumnarArray(keys, offset, length);
this.values = new ColumnarArray(values, offset, length);
}

@Override
public int numElements() { return length; }

@Override
public ColumnarArray keyArray() {
return keys;
}

@Override
public ColumnarArray valueArray() {
return values;
}

@Override
public ColumnarMap copy() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ public ColumnarArray getArray(int ordinal) {
}

@Override
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
public ColumnarMap getMap(int ordinal) {
if (data.getChild(ordinal).isNullAt(rowId)) return null;
return data.getChild(ordinal).getMap(rowId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,35 +673,37 @@ class ColumnarBatchSuite extends SparkFunSuite {
i += 1
}

// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
// Populate it with arrays [0], [1, 2], null, [], [3, 4, 5]
column.putArray(0, 0, 1)
column.putArray(1, 1, 2)
column.putArray(2, 2, 0)
column.putArray(3, 3, 3)
column.putNull(2)
column.putArray(3, 3, 0)
column.putArray(4, 3, 3)

assert(column.getArray(0).numElements == 1)
assert(column.getArray(1).numElements == 2)
assert(column.isNullAt(2))
assert(column.getArray(3).numElements == 0)
assert(column.getArray(4).numElements == 3)

val a1 = ColumnVectorUtils.toJavaIntArray(column.getArray(0))
val a2 = ColumnVectorUtils.toJavaIntArray(column.getArray(1))
val a3 = ColumnVectorUtils.toJavaIntArray(column.getArray(2))
val a4 = ColumnVectorUtils.toJavaIntArray(column.getArray(3))
val a3 = ColumnVectorUtils.toJavaIntArray(column.getArray(3))
val a4 = ColumnVectorUtils.toJavaIntArray(column.getArray(4))
assert(a1 === Array(0))
assert(a2 === Array(1, 2))
assert(a3 === Array.empty[Int])
assert(a4 === Array(3, 4, 5))

// Verify the ArrayData APIs
assert(column.getArray(0).numElements() == 1)
// Verify the ArrayData get APIs
assert(column.getArray(0).getInt(0) == 0)

assert(column.getArray(1).numElements() == 2)
assert(column.getArray(1).getInt(0) == 1)
assert(column.getArray(1).getInt(1) == 2)

assert(column.getArray(2).numElements() == 0)

assert(column.getArray(3).numElements() == 3)
assert(column.getArray(3).getInt(0) == 3)
assert(column.getArray(3).getInt(1) == 4)
assert(column.getArray(3).getInt(2) == 5)
assert(column.getArray(4).getInt(0) == 3)
assert(column.getArray(4).getInt(1) == 4)
assert(column.getArray(4).getInt(2) == 5)

// Add a longer array which requires resizing
column.reset()
Expand All @@ -711,8 +713,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(data.capacity == array.length * 2)
data.putInts(0, array.length, array, 0)
column.putArray(0, 0, array.length)
assert(ColumnVectorUtils.toJavaIntArray(column.getArray(0))
=== array)
assert(ColumnVectorUtils.toJavaIntArray(column.getArray(0)) === array)
}

test("toArray for primitive types") {
Expand Down Expand Up @@ -770,6 +771,43 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}

test("Int Map") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
val column = allocate(10, new MapType(IntegerType, IntegerType, false), memMode)
(0 to 1).foreach { colIndex =>
val data = column.getChild(colIndex)
(0 to 5).foreach {i =>
data.putInt(i, i * (colIndex + 1))
}
}

// Populate it with maps [0->0], [1->2, 2->4], null, [], [3->6, 4->8, 5->10]
column.putArray(0, 0, 1)
column.putArray(1, 1, 2)
column.putNull(2)
column.putArray(3, 3, 0)
column.putArray(4, 3, 3)

assert(column.getMap(0).numElements == 1)
assert(column.getMap(1).numElements == 2)
assert(column.isNullAt(2))
assert(column.getMap(3).numElements == 0)
assert(column.getMap(4).numElements == 3)

val a1 = ColumnVectorUtils.toJavaIntMap(column.getMap(0))
val a2 = ColumnVectorUtils.toJavaIntMap(column.getMap(1))
val a4 = ColumnVectorUtils.toJavaIntMap(column.getMap(3))
val a5 = ColumnVectorUtils.toJavaIntMap(column.getMap(4))

assert(a1.asScala == Map(0 -> 0))
assert(a2.asScala == Map(1 -> 2, 2 -> 4))
assert(a4.asScala == Map())
assert(a5.asScala == Map(3 -> 6, 4 -> 8, 5 -> 10))

column.close()
}
}

testVector(
"Struct Column",
10,
Expand Down

0 comments on commit 52e00f7

Please sign in to comment.