Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8777] [SQL] Add random data generator test utilities to Spark SQL #7176

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.lang.Double.longBitsToDouble
import java.lang.Float.intBitsToFloat
import java.math.MathContext

import scala.util.Random

import org.apache.spark.sql.types._

/**
* Random data generators for Spark SQL DataTypes. These generators do not generate uniformly random
* values; instead, they're biased to return "interesting" values (such as maximum / minimum values)
* with higher probability.
*/
object RandomDataGenerator {

/**
* The conditional probability of a non-null value being drawn from a set of "interesting" values
* instead of being chosen uniformly at random.
*/
private val PROBABILITY_OF_INTERESTING_VALUE: Float = 0.5f

/**
* The probability of the generated value being null
*/
private val PROBABILITY_OF_NULL: Float = 0.1f

private val MAX_STR_LEN: Int = 1024
private val MAX_ARR_SIZE: Int = 128
private val MAX_MAP_SIZE: Int = 128

/**
* Helper function for constructing a biased random number generator which returns "interesting"
* values with a higher probability.
*/
private def randomNumeric[T](
rand: Random,
uniformRand: Random => T,
interestingValues: Seq[T]): Some[() => T] = {
val f = () => {
if (rand.nextFloat() <= PROBABILITY_OF_INTERESTING_VALUE) {
interestingValues(rand.nextInt(interestingValues.length))
} else {
uniformRand(rand)
}
}
Some(f)
}

/**
* Returns a function which generates random values for the given [[DataType]], or `None` if no
* random data generator is defined for that data type. The generated values will use an external
* representation of the data type; for example, the random generator for [[DateType]] will return
* instances of [[java.sql.Date]] and the generator for [[StructType]] will return a
* [[org.apache.spark.Row]].
*
* @param dataType the type to generate values for
* @param nullable whether null values should be generated
* @param seed an optional seed for the random number generator
* @return a function which can be called to generate random values.
*/
def forType(
dataType: DataType,
nullable: Boolean = true,
seed: Option[Long] = None): Option[() => Any] = {
val rand = new Random()
seed.foreach(rand.setSeed)

val valueGenerator: Option[() => Any] = dataType match {
case StringType => Some(() => rand.nextString(rand.nextInt(MAX_STR_LEN)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does nextString cover unicode characters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

case BinaryType => Some(() => {
val arr = new Array[Byte](rand.nextInt(MAX_STR_LEN))
rand.nextBytes(arr)
arr
})
case BooleanType => Some(() => rand.nextBoolean())
case DateType => Some(() => new java.sql.Date(rand.nextInt()))
case TimestampType => Some(() => new java.sql.Timestamp(rand.nextLong()))
case DecimalType.Unlimited => Some(
() => BigDecimal.apply(rand.nextLong, rand.nextInt, MathContext.UNLIMITED))
case DoubleType => randomNumeric[Double](
rand, r => longBitsToDouble(r.nextLong()), Seq(Double.MinValue, Double.MinPositiveValue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using longBitsToDouble for better performance? Quoted from longBitsToDouble Javadoc:

     * <p>If the argument is any value in the range
     * {@code 0x7ff0000000000001L} through
     * {@code 0x7fffffffffffffffL} or in the range
     * {@code 0xfff0000000000001L} through
     * {@code 0xffffffffffffffffL}, the result is a NaN.  No IEEE
     * 754 floating-point operation provided by Java can distinguish
     * between two NaN values of the same type with different bit
     * patterns.  Distinct values of NaN are only distinguishable by
     * use of the {@code Double.doubleToRawLongBits} method.

This implies more chances than expected to generate NaNs. But it's probably OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here was to produce doubles that were uniformly distributed over the range of possible double values (`rand.nextDouble() just returns doubles in the range 0.0 to 1.0). Empirically, the number of NaNs produced by this seems to be quite small and a back-of-the-envelope calculation seems to back this up; I think that

((0x7fffffffffffffff - 0x7ff0000000000001) + (0xffffffffffffffff - 0xfff0000000000001)) / 2^64

works out to be a roughly 0.05% chance of producing a NaN through this method (see https://www.wolframalpha.com/input/?i=%28%280x7fffffffffffffff+-+0x7ff0000000000001%29+%2B+%280xffffffffffffffff+-+0xfff0000000000001%29%29+%2F+2%5E64). I think this is small enough to ignore for our purposes, but we can revisit later if it's a problem.

Double.MaxValue, Double.PositiveInfinity, Double.NegativeInfinity, Double.NaN, 0.0))
case FloatType => randomNumeric[Float](
rand, r => intBitsToFloat(r.nextInt()), Seq(Float.MinValue, Float.MinPositiveValue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar issue as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment at #7176 (comment)

Float.MaxValue, Float.PositiveInfinity, Float.NegativeInfinity, Float.NaN, 0.0f))
case ByteType => randomNumeric[Byte](
rand, _.nextInt().toByte, Seq(Byte.MinValue, Byte.MaxValue, 0.toByte))
case IntegerType => randomNumeric[Int](
rand, _.nextInt(), Seq(Int.MinValue, Int.MaxValue, 0))
case LongType => randomNumeric[Long](
rand, _.nextLong(), Seq(Long.MinValue, Long.MaxValue, 0L))
case ShortType => randomNumeric[Short](
rand, _.nextInt().toShort, Seq(Short.MinValue, Short.MaxValue, 0.toShort))
case NullType => Some(() => null)
case ArrayType(elementType, containsNull) => {
forType(elementType, nullable = containsNull, seed = Some(rand.nextLong())).map {
elementGenerator => () => Array.fill(rand.nextInt(MAX_ARR_SIZE))(elementGenerator())
}
}
case MapType(keyType, valueType, valueContainsNull) => {
for (
keyGenerator <- forType(keyType, nullable = false, seed = Some(rand.nextLong()));
valueGenerator <-
forType(valueType, nullable = valueContainsNull, seed = Some(rand.nextLong()))
) yield {
() => {
Seq.fill(rand.nextInt(MAX_MAP_SIZE))((keyGenerator(), valueGenerator())).toMap
}
}
}
case StructType(fields) => {
val maybeFieldGenerators: Seq[Option[() => Any]] = fields.map { field =>
forType(field.dataType, nullable = field.nullable, seed = Some(rand.nextLong()))
}
if (maybeFieldGenerators.forall(_.isDefined)) {
val fieldGenerators: Seq[() => Any] = maybeFieldGenerators.map(_.get)
Some(() => Row.fromSeq(fieldGenerators.map(_.apply())))
} else {
None
}
}
case unsupportedType => None
}
// Handle nullability by wrapping the non-null value generator:
valueGenerator.map { valueGenerator =>
if (nullable) {
() => {
if (rand.nextFloat() <= PROBABILITY_OF_NULL) {
null
} else {
valueGenerator()
}
}
} else {
valueGenerator
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.types._

/**
* Tests of [[RandomDataGenerator]].
*/
class RandomDataGeneratorSuite extends SparkFunSuite {

/**
* Tests random data generation for the given type by using it to generate random values then
* converting those values into their Catalyst equivalents using CatalystTypeConverters.
*/
def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = {
val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
val generator = RandomDataGenerator.forType(dataType, nullable).getOrElse {
fail(s"Random data generator was not defined for $dataType")
}
if (nullable) {
assert(Iterator.fill(100)(generator()).contains(null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the probability is quite low, but there is still a chance that we may break the assertion here(I just met one in a test...), I know it's not a big deal to rerun a test ,but can we pass in the seed when create the generator and make the result deterministic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @JoshRosen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increasing the size of the iterator and fixing the random seed seems like a good fix. Feel free to submit a PR and I'll review quickly.

} else {
assert(Iterator.fill(100)(generator()).forall(_ != null))
}
for (_ <- 1 to 10) {
val generatedValue = generator()
toCatalyst(generatedValue)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw an exception if no generator is defined for the given DataType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea; this uncovered the fact that I forgot to implement a generator for Timestamp.

}

// Basic types:
for (
dataType <- DataTypeTestUtils.atomicTypes;
nullable <- Seq(true, false)
if !dataType.isInstanceOf[DecimalType] ||
dataType.asInstanceOf[DecimalType].precisionInfo.isEmpty
) {
test(s"$dataType (nullable=$nullable)") {
testRandomDataGeneration(dataType)
}
}

for (
arrayType <- DataTypeTestUtils.atomicArrayTypes
if RandomDataGenerator.forType(arrayType.elementType, arrayType.containsNull).isDefined
) {
test(s"$arrayType") {
testRandomDataGeneration(arrayType)
}
}

val atomicTypesWithDataGenerators =
DataTypeTestUtils.atomicTypes.filter(RandomDataGenerator.forType(_).isDefined)

// Complex types:
for (
keyType <- atomicTypesWithDataGenerators;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably use for { ... } for multi-line for-comprehension and thus we can remove the trailing ;.

valueType <- atomicTypesWithDataGenerators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value type of a map can be any type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm going to come back and fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth deferring this to a followup patch; I think that it should work but don't want to necessarily test all maps-of-maps or maps-of-arrays here.

// Scala's BigDecimal.hashCode can lead to OutOfMemoryError on Scala 2.10 (see SI-6173) and
// Spark can hit NumberFormatException errors when converting certain BigDecimals (SPARK-8802).
// For these reasons, we don't support generation of maps with decimal keys.
if !keyType.isInstanceOf[DecimalType]
) {
val mapType = MapType(keyType, valueType)
test(s"$mapType") {
testRandomDataGeneration(mapType)
}
}

for (
colOneType <- atomicTypesWithDataGenerators;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the trailing ;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the semicolons when we're doing a for comprehension over multiple inputs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I didn't notice you were using (). You can omit the ; if you use {} instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm to me it is less clear to drop the ; here, although i don't have a strong preference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a super strong preference either.

colTwoType <- atomicTypesWithDataGenerators
) {
val structType = StructType(StructField("a", colOneType) :: StructField("b", colTwoType) :: Nil)
test(s"$structType") {
testRandomDataGeneration(structType)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

/**
* Utility functions for working with DataTypes in tests.
*/
object DataTypeTestUtils {

/**
* Instances of all [[IntegralType]]s.
*/
val integralType: Set[IntegralType] = Set(
ByteType, ShortType, IntegerType, LongType
)

/**
* Instances of all [[FractionalType]]s, including both fixed- and unlimited-precision
* decimal types.
*/
val fractionalTypes: Set[FractionalType] = Set(
DecimalType(precisionInfo = None),
DecimalType(2, 1),
DoubleType,
FloatType
)

/**
* Instances of all [[NumericType]]s.
*/
val numericTypes: Set[NumericType] = integralType ++ fractionalTypes

/**
* Instances of all [[AtomicType]]s.
*/
val atomicTypes: Set[DataType] = numericTypes ++ Set(
BinaryType,
BooleanType,
DateType,
StringType,
TimestampType
)

/**
* Instances of [[ArrayType]] for all [[AtomicType]]s. Arrays of these types may contain null.
*/
val atomicArrayTypes: Set[ArrayType] = atomicTypes.map(ArrayType(_, containsNull = true))
}