Skip to content

Commit

Permalink
Merge branch 'master' into support-char-literals
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk authored Jun 19, 2018
2 parents 4210146 + 9dbe53e commit 450079d
Show file tree
Hide file tree
Showing 23 changed files with 402 additions and 175 deletions.
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,4 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
echo "Using \`mvn\` from path: $MVN_BIN" 1>&2

# Last, call the `mvn` command as usual
${MVN_BIN} -DzincPort=${ZINC_PORT} "$@"
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
11 changes: 8 additions & 3 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def build_spark_unidoc_sbt(hadoop_version):
exec_sbt(profiles_and_goals)


def build_spark_assembly_sbt(hadoop_version):
def build_spark_assembly_sbt(hadoop_version, checkstyle=False):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["assembly/package"]
Expand All @@ -366,6 +366,9 @@ def build_spark_assembly_sbt(hadoop_version):
" ".join(profiles_and_goals))
exec_sbt(profiles_and_goals)

if checkstyle:
run_java_style_checks()

# Note that we skip Unidoc build only if Hadoop 2.6 is explicitly set in this SBT build.
# Due to a different dependency resolution in SBT & Unidoc by an unknown reason, the
# documentation build fails on a specific machine & environment in Jenkins but it was unable
Expand Down Expand Up @@ -570,11 +573,13 @@ def main():
or f.endswith("scalastyle-config.xml")
for f in changed_files):
run_scala_style_checks()
should_run_java_style_checks = False
if not changed_files or any(f.endswith(".java")
or f.endswith("checkstyle.xml")
or f.endswith("checkstyle-suppressions.xml")
for f in changed_files):
run_java_style_checks()
# Run SBT Checkstyle after the build to prevent a side-effect to the build.
should_run_java_style_checks = True
if not changed_files or any(f.endswith("lint-python")
or f.endswith("tox.ini")
or f.endswith(".py")
Expand Down Expand Up @@ -603,7 +608,7 @@ def main():
detect_binary_inop_with_mima(hadoop_version)
# Since we did not build assembly/package before running dev/mima, we need to
# do it here because the tests still rely on it; see SPARK-13294 for details.
build_spark_assembly_sbt(hadoop_version)
build_spark_assembly_sbt(hadoop_version, should_run_java_style_checks)

# run the test suites
run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
encoding=None):
dropFieldIfAllNull=None, encoding=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
Expand Down Expand Up @@ -246,6 +246,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param samplingRatio: defines fraction of input JSON objects used for schema inferring.
If None is set, it uses the default value, ``1.0``.
:param dropFieldIfAllNull: whether to ignore column of all null values or empty
array/struct during schema inference. If None is set, it
uses the default value, ``false``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.Reader;

import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
Expand All @@ -37,9 +40,15 @@
* This is based on Hive's UDFXPathUtil implementation.
*/
public class UDFXPathUtil {
public static final String SAX_FEATURE_PREFIX = "http://xml.org/sax/features/";
public static final String EXTERNAL_GENERAL_ENTITIES_FEATURE = "external-general-entities";
public static final String EXTERNAL_PARAMETER_ENTITIES_FEATURE = "external-parameter-entities";
private DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
private DocumentBuilder builder = null;
private XPath xpath = XPathFactory.newInstance().newXPath();
private ReusableStringReader reader = new ReusableStringReader();
private InputSource inputSource = new InputSource(reader);

private XPathExpression expression = null;
private String oldPath = null;

Expand All @@ -65,14 +74,31 @@ public Object eval(String xml, String path, QName qname) throws XPathExpressionE
return null;
}

if (builder == null){
try {
initializeDocumentBuilderFactory();
builder = dbf.newDocumentBuilder();
} catch (ParserConfigurationException e) {
throw new RuntimeException(
"Error instantiating DocumentBuilder, cannot build xml parser", e);
}
}

reader.set(xml);
try {
return expression.evaluate(inputSource, qname);
return expression.evaluate(builder.parse(inputSource), qname);
} catch (XPathExpressionException e) {
throw new RuntimeException("Invalid XML document: " + e.getMessage() + "\n" + xml, e);
} catch (Exception e) {
throw new RuntimeException("Error loading expression '" + oldPath + "'", e);
}
}

private void initializeDocumentBuilderFactory() throws ParserConfigurationException {
dbf.setFeature(SAX_FEATURE_PREFIX + EXTERNAL_GENERAL_ENTITIES_FEATURE, false);
dbf.setFeature(SAX_FEATURE_PREFIX + EXTERNAL_PARAMETER_ENTITIES_FEATURE, false);
}

public Boolean evalBoolean(String xml, String path) throws XPathExpressionException {
return (Boolean) eval(xml, path, XPathConstants.BOOLEAN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object Encoders {
* - primitive types: boolean, int, double, etc.
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal
* - java.math.BigDecimal, java.math.BigInteger
* - time related: java.sql.Date, java.sql.Timestamp
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ object ExpressionEncoder {
case b: BoundReference if b == originalInputObject => newInputObject
})

if (enc.flat) {
val serializerExpr = if (enc.flat) {
newSerializer.head
} else {
// For non-flat encoder, the input object is not top level anymore after being combined to
Expand All @@ -146,6 +146,7 @@ object ExpressionEncoder {
Invoke(Literal.fromObject(None), "equals", BooleanType, newInputObject :: Nil))
If(nullCheck, Literal.create(null, struct.dataType), struct)
}
Alias(serializerExpr, s"_${index + 1}")()
}

val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ private[sql] class JSONOptions(
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

// Whether to ignore column of all null values or empty array/struct during schema inference
val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)

val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ class UDFXPathUtilSuite extends SparkFunSuite {
assert(ret == "foo")
}

test("embedFailure") {
import org.apache.commons.io.FileUtils
import java.io.File
val secretValue = String.valueOf(Math.random)
val tempFile = File.createTempFile("verifyembed", ".tmp")
tempFile.deleteOnExit()
val fname = tempFile.getAbsolutePath

FileUtils.writeStringToFile(tempFile, secretValue)

val xml =
s"""<?xml version="1.0" encoding="utf-8"?>
|<!DOCTYPE test [
| <!ENTITY embed SYSTEM "$fname">
|]>
|<foo>&embed;</foo>
""".stripMargin
val evaled = new UDFXPathUtil().evalString(xml, "/foo")
assert(evaled.isEmpty)
}

test("number eval") {
var ret =
util.evalNumber("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/c[2]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class XPathExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {

// Test error message for invalid XML document
val e1 = intercept[RuntimeException] { testExpr("<a>/a>", "a", null.asInstanceOf[T]) }
assert(e1.getCause.getMessage.contains("Invalid XML document") &&
e1.getCause.getMessage.contains("<a>/a>"))
assert(e1.getCause.getCause.getMessage.contains(
"XML document structures must start and end within the same entity."))
assert(e1.getMessage.contains("<a>/a>"))

// Test error message for invalid xpath
val e2 = intercept[RuntimeException] { testExpr("<a></a>", "!#$", null.asInstanceOf[T]) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report statistics to Spark.
*
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
* DataSourceReader. Implementations that return more accurate statistics based on projection and
* filters will not improve query performance until the planner can push operators before getting
* stats.
* Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader.
* Implementations that return more accurate statistics based on pushed operators will not improve
* query performance until the planner can push operators before getting stats.
*/
@InterfaceStability.Evolving
public interface SupportsReportStatistics extends DataSourceReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* that should be used for parsing.</li>
* <li>`samplingRatio` (default is 1.0): defines fraction of input JSON objects used
* for schema inferring.</li>
* <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of all null values or
* empty array/struct during schema inference.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.vectorized._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -169,8 +169,8 @@ case class InMemoryTableScanExec(
// But the cached version could alias output, so we need to replace output.
override def outputPartitioning: Partitioning = {
relation.cachedPlan.outputPartitioning match {
case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
case _ => relation.cachedPlan.outputPartitioning
case e: Expression => updateAttribute(e).asInstanceOf[Partitioning]
case other => other
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private[sql] object JsonInferSchema {
// active SparkSession and `SQLConf.get` may point to the wrong configs.
val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)

canonicalizeType(rootType) match {
canonicalizeType(rootType, configOptions) match {
case Some(st: StructType) => st
case _ =>
// canonicalizeType erases all empty structs, including the only one we want to keep
Expand Down Expand Up @@ -181,33 +181,33 @@ private[sql] object JsonInferSchema {
}

/**
* Convert NullType to StringType and remove StructTypes with no fields
* Recursively canonicalizes inferred types, e.g., removes StructTypes with no fields,
* drops NullTypes or converts them to StringType based on provided options.
*/
private def canonicalizeType(tpe: DataType): Option[DataType] = tpe match {
case at @ ArrayType(elementType, _) =>
for {
canonicalType <- canonicalizeType(elementType)
} yield {
at.copy(canonicalType)
}
private def canonicalizeType(tpe: DataType, options: JSONOptions): Option[DataType] = tpe match {
case at: ArrayType =>
canonicalizeType(at.elementType, options)
.map(t => at.copy(elementType = t))

case StructType(fields) =>
val canonicalFields: Array[StructField] = for {
field <- fields
if field.name.length > 0
canonicalType <- canonicalizeType(field.dataType)
} yield {
field.copy(dataType = canonicalType)
val canonicalFields = fields.filter(_.name.nonEmpty).flatMap { f =>
canonicalizeType(f.dataType, options)
.map(t => f.copy(dataType = t))
}

if (canonicalFields.length > 0) {
Some(StructType(canonicalFields))
// SPARK-8093: empty structs should be deleted
if (canonicalFields.isEmpty) {
None
} else {
// per SPARK-8093: empty structs should be deleted
Some(StructType(canonicalFields))
}

case NullType =>
if (options.dropFieldIfAllNull) {
None
} else {
Some(StringType)
}

case NullType => Some(StringType)
case other => Some(other)
}

Expand Down
Loading

0 comments on commit 450079d

Please sign in to comment.