Skip to content

Commit

Permalink
Merge remote branch 'upstream/master' into SPARK-12708
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and root committed Jan 13, 2016
2 parents 62c04ca + cb7b864 commit 3678e30
Show file tree
Hide file tree
Showing 280 changed files with 4,173 additions and 3,275 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
ec2/lib/
rat-results.txt
scalastyle.txt
scalastyle-output.xml
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ exportMethods("%in%",
"count",
"countDistinct",
"crc32",
"hash",
"cume_dist",
"date_add",
"date_format",
Expand Down
20 changes: 20 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,26 @@ setMethod("crc32",
column(jc)
})

#' hash
#'
#' Calculates the hash code of given columns, and returns the result as a int column.
#'
#' @rdname hash
#' @name hash
#' @family misc_funcs
#' @export
#' @examples \dontrun{hash(df$c)}
setMethod("hash",
signature(x = "Column"),
function(x, ...) {
jcols <- lapply(list(x, ...), function (x) {
stopifnot(class(x) == "Column")
x@jc
})
jc <- callJStatic("org.apache.spark.sql.functions", "hash", jcols)
column(jc)
})

#' dayofmonth
#'
#' Extracts the day of the month as an integer from a given date/timestamp/string.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct")
#' @export
setGeneric("crc32", function(x) { standardGeneric("crc32") })

#' @rdname hash
#' @export
setGeneric("hash", function(x, ...) { standardGeneric("hash") })

#' @rdname cume_dist
#' @export
setGeneric("cume_dist", function(x) { standardGeneric("cume_dist") })
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ test_that("column functions", {
c <- column("a")
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
c3 <- cosh(c) + count(c) + crc32(c) + exp(c)
c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9</version>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
187 changes: 187 additions & 0 deletions core/src/main/java/org/apache/spark/api/java/Optional.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java;

import java.io.Serializable;

import com.google.common.base.Preconditions;

/**
* <p>Like {@code java.util.Optional} in Java 8, {@code scala.Option} in Scala, and
* {@code com.google.common.base.Optional} in Google Guava, this class represents a
* value of a given type that may or may not exist. It is used in methods that wish
* to optionally return a value, in preference to returning {@code null}.</p>
*
* <p>In fact, the class here is a reimplementation of the essential API of both
* {@code java.util.Optional} and {@code com.google.common.base.Optional}. From
* {@code java.util.Optional}, it implements:</p>
*
* <ul>
* <li>{@link #empty()}</li>
* <li>{@link #of(Object)}</li>
* <li>{@link #ofNullable(Object)}</li>
* <li>{@link #get()}</li>
* <li>{@link #orElse(Object)}</li>
* <li>{@link #isPresent()}</li>
* </ul>
*
* <p>From {@code com.google.common.base.Optional} it implements:</p>
*
* <ul>
* <li>{@link #absent()}</li>
* <li>{@link #of(Object)}</li>
* <li>{@link #fromNullable(Object)}</li>
* <li>{@link #get()}</li>
* <li>{@link #or(Object)}</li>
* <li>{@link #orNull()}</li>
* <li>{@link #isPresent()}</li>
* </ul>
*
* <p>{@code java.util.Optional} itself is not used at this time because the
* project does not require Java 8. Using {@code com.google.common.base.Optional}
* has in the past caused serious library version conflicts with Guava that can't
* be resolved by shading. Hence this work-alike clone.</p>
*
* @param <T> type of value held inside
*/
public final class Optional<T> implements Serializable {

private static final Optional<?> EMPTY = new Optional<>();

private final T value;

private Optional() {
this.value = null;
}

private Optional(T value) {
Preconditions.checkNotNull(value);
this.value = value;
}

// java.util.Optional API (subset)

/**
* @return an empty {@code Optional}
*/
public static <T> Optional<T> empty() {
@SuppressWarnings("unchecked")
Optional<T> t = (Optional<T>) EMPTY;
return t;
}

/**
* @param value non-null value to wrap
* @return {@code Optional} wrapping this value
* @throws NullPointerException if value is null
*/
public static <T> Optional<T> of(T value) {
return new Optional<>(value);
}

/**
* @param value value to wrap, which may be null
* @return {@code Optional} wrapping this value, which may be empty
*/
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return empty();
} else {
return of(value);
}
}

/**
* @return the value wrapped by this {@code Optional}
* @throws NullPointerException if this is empty (contains no value)
*/
public T get() {
Preconditions.checkNotNull(value);
return value;
}

/**
* @param other value to return if this is empty
* @return this {@code Optional}'s value if present, or else the given value
*/
public T orElse(T other) {
return value != null ? value : other;
}

/**
* @return true iff this {@code Optional} contains a value (non-empty)
*/
public boolean isPresent() {
return value != null;
}

// Guava API (subset)
// of(), get() and isPresent() are identically present in the Guava API

/**
* @return an empty {@code Optional}
*/
public static <T> Optional<T> absent() {
return empty();
}

/**
* @param value value to wrap, which may be null
* @return {@code Optional} wrapping this value, which may be empty
*/
public static <T> Optional<T> fromNullable(T value) {
return ofNullable(value);
}

/**
* @param other value to return if this is empty
* @return this {@code Optional}'s value if present, or else the given value
*/
public T or(T other) {
return value != null ? value : other;
}

/**
* @return this {@code Optional}'s value if present, or else null
*/
public T orNull() {
return value;
}

// Common methods

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Optional)) {
return false;
}
Optional<?> other = (Optional<?>) obj;
return value == null ? other.value == null : value.equals(other.value);
}

@Override
public int hashCode() {
return value == null ? 0 : value.hashCode();
}

@Override
public String toString() {
return value == null ? "Optional.empty" : String.format("Optional[%s]", value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
Expand Down Expand Up @@ -655,7 +654,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.{Comparator, Iterator => JIterator, List => JList}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark._
Expand Down Expand Up @@ -122,7 +121,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
Expand All @@ -132,7 +130,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
Expand All @@ -142,7 +139,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import java.util.Map.Entry

import scala.collection.mutable

import com.google.common.base.Optional

private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
option match {
case Some(value) => Optional.of(value)
case None => Optional.absent()
if (option.isDefined) {
Optional.of(option.get)
} else {
Optional.empty[T]
}

// Workaround for SPARK-3926 / SI-8911
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
Loading

0 comments on commit 3678e30

Please sign in to comment.