Skip to content

Commit

Permalink
[SPARK-23218][SQL] simplify ColumnVector.getArray
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

`ColumnVector` is very flexible about how to implement array type. As a result `ColumnVector` has 3 abstract methods for array type: `arrayData`, `getArrayOffset`, `getArrayLength`. For example, in `WritableColumnVector` we use the first child vector as the array data vector, and store offsets and lengths in 2 arrays in the parent vector. `ArrowColumnVector` has a different implementation.

This PR simplifies `ColumnVector` by using only one abstract method for array type: `getArray`.

## How was this patch tested?

existing tests.

rerun `ColumnarBatchBenchmark`, there is no performance regression.

Author: Wenchen Fan <[email protected]>

Closes #20395 from cloud-fan/vector.

(cherry picked from commit dd8e257)
Signed-off-by: gatorsmile <[email protected]>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jan 26, 2018
1 parent ab1b5d9 commit ca3613b
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.unsafe.types.UTF8String;

/**
Expand Down Expand Up @@ -145,16 +146,6 @@ public double getDouble(int rowId) {
return doubleData.vector[getRowIndex(rowId)];
}

@Override
public int getArrayLength(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public int getArrayOffset(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
Expand All @@ -177,7 +168,7 @@ public byte[] getBinary(int rowId) {
}

@Override
public org.apache.spark.sql.vectorized.ColumnVector arrayData() {
public ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -602,7 +603,17 @@ public final int appendStruct(boolean isNull) {
// `WritableColumnVector` puts the data of array in the first child column vector, and puts the
// array offsets and lengths in the current column vector.
@Override
public WritableColumnVector arrayData() { return childColumns[0]; }
public final ColumnarArray getArray(int rowId) {
return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId));
}

public WritableColumnVector arrayData() {
return childColumns[0];
}

public abstract int getArrayLength(int rowId);

public abstract int getArrayOffset(int rowId);

@Override
public WritableColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.spark.sql.vectorized;

import io.netty.buffer.ArrowBuf;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.*;
import org.apache.arrow.vector.holders.NullableVarCharHolder;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;

/**
* A column vector backed by Apache Arrow.
* A column vector backed by Apache Arrow. Currently time interval type and map type are not
* supported.
*/
@InterfaceStability.Evolving
public final class ArrowColumnVector extends ColumnVector {

private final ArrowVectorAccessor accessor;
Expand Down Expand Up @@ -90,16 +94,6 @@ public double getDouble(int rowId) {
return accessor.getDouble(rowId);
}

@Override
public int getArrayLength(int rowId) {
return accessor.getArrayLength(rowId);
}

@Override
public int getArrayOffset(int rowId) {
return accessor.getArrayOffset(rowId);
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
return accessor.getDecimal(rowId, precision, scale);
Expand All @@ -116,7 +110,9 @@ public byte[] getBinary(int rowId) {
}

@Override
public ArrowColumnVector arrayData() { return childColumns[0]; }
public ColumnarArray getArray(int rowId) {
return accessor.getArray(rowId);
}

@Override
public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
Expand Down Expand Up @@ -151,9 +147,6 @@ public ArrowColumnVector(ValueVector vector) {
} else if (vector instanceof ListVector) {
ListVector listVector = (ListVector) vector;
accessor = new ArrayAccessor(listVector);

childColumns = new ArrowColumnVector[1];
childColumns[0] = new ArrowColumnVector(listVector.getDataVector());
} else if (vector instanceof NullableMapVector) {
NullableMapVector mapVector = (NullableMapVector) vector;
accessor = new StructAccessor(mapVector);
Expand All @@ -180,10 +173,6 @@ boolean isNullAt(int rowId) {
return vector.isNull(rowId);
}

final int getValueCount() {
return vector.getValueCount();
}

final int getNullCount() {
return vector.getNullCount();
}
Expand Down Expand Up @@ -232,11 +221,7 @@ byte[] getBinary(int rowId) {
throw new UnsupportedOperationException();
}

int getArrayLength(int rowId) {
throw new UnsupportedOperationException();
}

int getArrayOffset(int rowId) {
ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}
}
Expand Down Expand Up @@ -433,10 +418,12 @@ final long getLong(int rowId) {
private static class ArrayAccessor extends ArrowVectorAccessor {

private final ListVector accessor;
private final ArrowColumnVector arrayData;

ArrayAccessor(ListVector vector) {
super(vector);
this.accessor = vector;
this.arrayData = new ArrowColumnVector(vector.getDataVector());
}

@Override
Expand All @@ -450,13 +437,12 @@ final boolean isNullAt(int rowId) {
}

@Override
final int getArrayLength(int rowId) {
return accessor.getInnerValueCountAt(rowId);
}

@Override
final int getArrayOffset(int rowId) {
return accessor.getOffsetBuffer().getInt(rowId * accessor.OFFSET_WIDTH);
final ColumnarArray getArray(int rowId) {
ArrowBuf offsets = accessor.getOffsetBuffer();
int index = rowId * accessor.OFFSET_WIDTH;
int start = offsets.getInt(index);
int end = offsets.getInt(index + accessor.OFFSET_WIDTH);
return new ColumnarArray(arrayData, start, end - start);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.vectorized;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
Expand All @@ -29,11 +30,14 @@
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in this ColumnVector.
*
* Spark only calls specific `get` method according to the data type of this {@link ColumnVector},
* e.g. if it's int type, Spark is guaranteed to only call {@link #getInt(int)} or
* {@link #getInts(int, int)}.
*
* ColumnVector supports all the data types including nested types. To handle nested types,
* ColumnVector can have children and is a tree structure. For struct type, it stores the actual
* data of each field in the corresponding child ColumnVector, and only stores null information in
* the parent ColumnVector. For array type, it stores the actual array elements in the child
* ColumnVector, and stores null information, array offsets and lengths in the parent ColumnVector.
* ColumnVector can have children and is a tree structure. Please refer to {@link #getStruct(int)},
* {@link #getArray(int)} and {@link #getMap(int)} for the details about how to implement nested
* types.
*
* ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
* memory again and again.
Expand All @@ -43,6 +47,7 @@
* format. Since it is expected to reuse the ColumnVector instance while loading data, the storage
* footprint is negligible.
*/
@InterfaceStability.Evolving
public abstract class ColumnVector implements AutoCloseable {

/**
Expand Down Expand Up @@ -70,12 +75,12 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract boolean isNullAt(int rowId);

/**
* Returns the value for rowId.
* Returns the boolean type value for rowId.
*/
public abstract boolean getBoolean(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets boolean type values from [rowId, rowId + count)
*/
public boolean[] getBooleans(int rowId, int count) {
boolean[] res = new boolean[count];
Expand All @@ -86,12 +91,12 @@ public boolean[] getBooleans(int rowId, int count) {
}

/**
* Returns the value for rowId.
* Returns the byte type value for rowId.
*/
public abstract byte getByte(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets byte type values from [rowId, rowId + count)
*/
public byte[] getBytes(int rowId, int count) {
byte[] res = new byte[count];
Expand All @@ -102,12 +107,12 @@ public byte[] getBytes(int rowId, int count) {
}

/**
* Returns the value for rowId.
* Returns the short type value for rowId.
*/
public abstract short getShort(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets short type values from [rowId, rowId + count)
*/
public short[] getShorts(int rowId, int count) {
short[] res = new short[count];
Expand All @@ -118,12 +123,12 @@ public short[] getShorts(int rowId, int count) {
}

/**
* Returns the value for rowId.
* Returns the int type value for rowId.
*/
public abstract int getInt(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets int type values from [rowId, rowId + count)
*/
public int[] getInts(int rowId, int count) {
int[] res = new int[count];
Expand All @@ -134,12 +139,12 @@ public int[] getInts(int rowId, int count) {
}

/**
* Returns the value for rowId.
* Returns the long type value for rowId.
*/
public abstract long getLong(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets long type values from [rowId, rowId + count)
*/
public long[] getLongs(int rowId, int count) {
long[] res = new long[count];
Expand All @@ -150,12 +155,12 @@ public long[] getLongs(int rowId, int count) {
}

/**
* Returns the value for rowId.
* Returns the float type value for rowId.
*/
public abstract float getFloat(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets float type values from [rowId, rowId + count)
*/
public float[] getFloats(int rowId, int count) {
float[] res = new float[count];
Expand All @@ -166,12 +171,12 @@ public float[] getFloats(int rowId, int count) {
}

/**
* Returns the value for rowId.
* Returns the double type value for rowId.
*/
public abstract double getDouble(int rowId);

/**
* Gets values from [rowId, rowId + count)
* Gets double type values from [rowId, rowId + count)
*/
public double[] getDoubles(int rowId, int count) {
double[] res = new double[count];
Expand All @@ -182,57 +187,54 @@ public double[] getDoubles(int rowId, int count) {
}

/**
* Returns the length of the array for rowId.
*/
public abstract int getArrayLength(int rowId);

/**
* Returns the offset of the array for rowId.
*/
public abstract int getArrayOffset(int rowId);

/**
* Returns the struct for rowId.
* Returns the struct type value for rowId.
*
* To support struct type, implementations must implement {@link #getChild(int)} and make this
* vector a tree structure. The number of child vectors must be same as the number of fields of
* the struct type, and each child vector is responsible to store the data for its corresponding
* struct field.
*/
public final ColumnarRow getStruct(int rowId) {
return new ColumnarRow(this, rowId);
}

/**
* Returns the array for rowId.
* Returns the array type value for rowId.
*
* To support array type, implementations must construct an {@link ColumnarArray} and return it in
* this method. {@link ColumnarArray} requires a {@link ColumnVector} that stores the data of all
* the elements of all the arrays in this vector, and an offset and length which points to a range
* in that {@link ColumnVector}, and the range represents the array for rowId. Implementations
* are free to decide where to put the data vector and offsets and lengths. For example, we can
* use the first child vector as the data vector, and store offsets and lengths in 2 int arrays in
* this vector.
*/
public final ColumnarArray getArray(int rowId) {
return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId));
}
public abstract ColumnarArray getArray(int rowId);

/**
* Returns the map for rowId.
* Returns the map type value for rowId.
*/
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
}

/**
* Returns the decimal for rowId.
* Returns the decimal type value for rowId.
*/
public abstract Decimal getDecimal(int rowId, int precision, int scale);

/**
* Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of
* this column vector, please copy it if you want to keep it after this column vector is freed.
* Returns the string type value for rowId. Note that the returned UTF8String may point to the
* data of this column vector, please copy it if you want to keep it after this column vector is
* freed.
*/
public abstract UTF8String getUTF8String(int rowId);

/**
* Returns the byte array for rowId.
* Returns the binary type value for rowId.
*/
public abstract byte[] getBinary(int rowId);

/**
* Returns the data for the underlying array.
*/
public abstract ColumnVector arrayData();

/**
* Returns the ordinal's child column vector.
*/
Expand Down
Loading

0 comments on commit ca3613b

Please sign in to comment.