Skip to content

Commit

Permalink
Merge branch 'master' into filter_estimation_devision_by_zero
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/security.md
#	python/pyspark/ml/tests.py
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
#	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
  • Loading branch information
Mykhailo Shtelma committed Apr 10, 2018
2 parents 984faf5 + 6498884 commit 5883c3c
Show file tree
Hide file tree
Showing 175 changed files with 2,657 additions and 832 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;

/**
* Simulates Hive's hashing function from Hive v1.2.1
Expand All @@ -38,12 +39,21 @@ public static int hashLong(long input) {
return (int) ((input >>> 32) ^ input);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
public static int hashUnsafeBytesBlock(MemoryBlock mb) {
long lengthInBytes = mb.size();
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
for (int i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) Platform.getByte(base, offset + i);
for (long i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) mb.getByte(i);
}
return result;
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
}

public static int hashUTF8String(UTF8String str) {
return hashUnsafeBytesBlock(str.getMemoryBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static void setMemory(long address, byte value, long size) {
}

public static void copyMemory(
Object src, long srcOffset, Object dst, long dstOffset, long length) {
Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy
// forward or backwards. This is necessary in case src and dst overlap.
if (dstOffset < srcOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

public class ByteArrayMethods {

Expand Down Expand Up @@ -48,15 +49,25 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;

private static final boolean unaligned = Platform.unaligned();
/**
* MemoryBlock equality check for MemoryBlocks.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEqualsBlock(
MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) {
return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
}

/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) {
int i = 0;

// check if stars align and we can get both offsets to be aligned
// check if starts align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
Expand All @@ -33,16 +32,12 @@ public final class LongArray {
private static final long WIDTH = 8;

private final MemoryBlock memory;
private final Object baseObj;
private final long baseOffset;

private final long length;

public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}

Expand All @@ -51,11 +46,11 @@ public MemoryBlock memoryBlock() {
}

public Object getBaseObject() {
return baseObj;
return memory.getBaseObject();
}

public long getBaseOffset() {
return baseOffset;
return memory.getBaseOffset();
}

/**
Expand All @@ -69,8 +64,8 @@ public long size() {
* Fill this all with 0L.
*/
public void zeroOut() {
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
Platform.putLong(baseObj, off, 0);
for (long off = 0; off < length * WIDTH; off += WIDTH) {
memory.putLong(off, 0);
}
}

Expand All @@ -80,7 +75,7 @@ public void zeroOut() {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
memory.putLong(index * WIDTH, value);
}

/**
Expand All @@ -89,6 +84,6 @@ public void set(int index, long value) {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
return Platform.getLong(baseObj, baseOffset + index * WIDTH);
return memory.getLong(index * WIDTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.unsafe.hash;

import org.apache.spark.unsafe.Platform;
import com.google.common.primitives.Ints;

import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;

/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
Expand Down Expand Up @@ -49,49 +52,70 @@ public static int hashInt(int input, int seed) {
}

public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
return hashUnsafeWords(base, offset, lengthInBytes, seed);
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
int h1 = hashBytesByIntBlock(base, seed);
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int halfWord = Platform.getByte(base, offset + i);
int halfWord = base.getByte(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}

public static int hashUTF8String(UTF8String str, int seed) {
return hashUnsafeBytesBlock(str.getMemoryBlock(), seed);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
// This is compatible with original and another implementations.
return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and other implementations.
// Use this method for new components after Spark 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
k1 ^= (base.getByte(i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}

private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
private static int hashBytesByIntBlock(MemoryBlock base, int seed) {
long lengthInBytes = base.size();
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = Platform.getInt(base, offset + i);
for (long i = 0; i < lengthInBytes; i += 4) {
int halfWord = base.getInt(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;

import com.google.common.primitives.Ints;

import org.apache.spark.unsafe.Platform;

/**
* A consecutive block of memory with a byte array on Java heap.
*/
public final class ByteArrayMemoryBlock extends MemoryBlock {

private final byte[] array;

public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
super(obj, offset, size);
this.array = obj;
assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
"The sum of size " + size + " and offset " + offset + " should not be larger than " +
"the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
}

public ByteArrayMemoryBlock(long length) {
this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
}

@Override
public MemoryBlock subBlock(long offset, long size) {
checkSubBlockRange(offset, size);
if (offset == 0 && size == this.size()) return this;
return new ByteArrayMemoryBlock(array, this.offset + offset, size);
}

public byte[] getByteArray() { return array; }

/**
* Creates a memory block pointing to the memory used by the byte array.
*/
public static ByteArrayMemoryBlock fromArray(final byte[] array) {
return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
}

@Override
public int getInt(long offset) {
return Platform.getInt(array, this.offset + offset);
}

@Override
public void putInt(long offset, int value) {
Platform.putInt(array, this.offset + offset, value);
}

@Override
public boolean getBoolean(long offset) {
return Platform.getBoolean(array, this.offset + offset);
}

@Override
public void putBoolean(long offset, boolean value) {
Platform.putBoolean(array, this.offset + offset, value);
}

@Override
public byte getByte(long offset) {
return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
}

@Override
public void putByte(long offset, byte value) {
array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
}

@Override
public short getShort(long offset) {
return Platform.getShort(array, this.offset + offset);
}

@Override
public void putShort(long offset, short value) {
Platform.putShort(array, this.offset + offset, value);
}

@Override
public long getLong(long offset) {
return Platform.getLong(array, this.offset + offset);
}

@Override
public void putLong(long offset, long value) {
Platform.putLong(array, this.offset + offset, value);
}

@Override
public float getFloat(long offset) {
return Platform.getFloat(array, this.offset + offset);
}

@Override
public void putFloat(long offset, float value) {
Platform.putFloat(array, this.offset + offset, value);
}

@Override
public double getDouble(long offset) {
return Platform.getDouble(array, this.offset + offset);
}

@Override
public void putDouble(long offset, double value) {
Platform.putDouble(array, this.offset + offset, value);
}
}
Loading

0 comments on commit 5883c3c

Please sign in to comment.