Skip to content

Commit

Permalink
Fetch upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed May 26, 2016
2 parents a1ccd5f + 53d4abe commit 7eadefd
Show file tree
Hide file tree
Showing 601 changed files with 12,179 additions and 5,644 deletions.
8 changes: 7 additions & 1 deletion R/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# R on Spark

SparkR is an R package that provides a light-weight frontend to use Spark from R.

### Installing sparkR

Libraries of sparkR need to be created in `$SPARK_HOME/R/lib`. This can be done by running the script `$SPARK_HOME/R/install-dev.sh`.
By default the above script uses the system wide installation of R. However, this can be changed to any user installed location of R by setting the environment variable `R_HOME` the full path of the base directory where R is installed, before running install-dev.sh script.
Example:

```
# where /home/username/R is where R is installed and /home/username/R/bin contains the files R and RScript
export R_HOME=/home/username/R
Expand All @@ -17,6 +19,7 @@ export R_HOME=/home/username/R
#### Build Spark

Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run

```
build/mvn -DskipTests -Psparkr package
```
Expand All @@ -38,6 +41,7 @@ To set other options like driver memory, executor memory etc. you can pass in th
#### Using SparkR from RStudio

If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example

```
# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/Users/username/spark")
Expand All @@ -64,13 +68,15 @@ To run one of them, use `./bin/spark-submit <filename> <args>`. For example:

./bin/spark-submit examples/src/main/r/dataframe.R

You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first):
You can also run the unit tests for SparkR by running. You need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first:

R -e 'install.packages("testthat", repos="http://cran.us.r-project.org")'
./R/run-tests.sh

### Running on YARN

The `./bin/spark-submit` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run

```
export YARN_CONF_DIR=/etc/hadoop/conf
./bin/spark-submit --master yarn examples/src/main/r/dataframe.R
Expand Down
20 changes: 20 additions & 0 deletions R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,23 @@ include Rtools and R in `PATH`.
directory in Maven in `PATH`.
4. Set `MAVEN_OPTS` as described in [Building Spark](http://spark.apache.org/docs/latest/building-spark.html).
5. Open a command shell (`cmd`) in the Spark directory and run `mvn -DskipTests -Psparkr package`

## Unit tests

To run the SparkR unit tests on Windows, the following steps are required —assuming you are in the Spark root directory and do not have Apache Hadoop installed already:

1. Create a folder to download Hadoop related files for Windows. For example, `cd ..` and `mkdir hadoop`.

2. Download the relevant Hadoop bin package from [steveloughran/winutils](https://github.com/steveloughran/winutils). While these are not official ASF artifacts, they are built from the ASF release git hashes by a Hadoop PMC member on a dedicated Windows VM. For further reading, consult [Windows Problems on the Hadoop wiki](https://wiki.apache.org/hadoop/WindowsProblems).

3. Install the files into `hadoop\bin`; make sure that `winutils.exe` and `hadoop.dll` are present.

4. Set the environment variable `HADOOP_HOME` to the full path to the newly created `hadoop` directory.

5. Run unit tests for SparkR by running the command below. You need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first:

```
R -e "install.packages('testthat', repos='http://cran.us.r-project.org')"
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.defualt.name="file:///" R\pkg\tests\run-all.R
```
6 changes: 5 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,11 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' }
setMethod("subset", signature(x = "SparkDataFrame"),
function(x, subset, select, drop = F, ...) {
x[subset, select, drop = drop]
if (missing(subset)) {
x[, select, drop = drop, ...]
} else {
x[subset, select, drop = drop, ...]
}
})

#' Select
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ parquetFile <- function(sqlContext, ...) {
#' Create a SparkDataFrame from a text file.
#'
#' Loads a text file and returns a SparkDataFrame with a single string column named "value".
#' If the directory structure of the text files contains partitioning information, those are
#' ignored in the resulting DataFrame.
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param sqlContext SQLContext to use
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2226,7 +2226,7 @@ setMethod("window", signature(x = "Column"),
#' @export
#' @examples \dontrun{locate('b', df$c, 1)}
setMethod("locate", signature(substr = "character", str = "Column"),
function(substr, str, pos = 0) {
function(substr, str, pos = 1) {
jc <- callJStatic("org.apache.spark.sql.functions",
"locate",
substr, str@jc, as.integer(pos))
Expand Down
6 changes: 5 additions & 1 deletion R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ test_that("Check masked functions", {
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop", "window", "as.data.frame")
namesOfMaskedCompletely <- c("cov", "filter", "sample")
if (as.numeric(R.version$major) == 3 && as.numeric(R.version$minor) > 2) {
namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
namesOfMaskedCompletely <- c("endsWith", "startsWith", namesOfMaskedCompletely)
}
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
Expand All @@ -36,7 +41,6 @@ test_that("Check masked functions", {
any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1])))
}))
maskedCompletely <- masked[!funcHasAny]
namesOfMaskedCompletely <- c("cov", "filter", "sample")
expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely))
expect_equal(sort(maskedCompletely), sort(namesOfMaskedCompletely))
})
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ test_that("string operators", {
l2 <- list(list(a = "aaads"))
df2 <- createDataFrame(sqlContext, l2)
expect_equal(collect(select(df2, locate("aa", df2$a)))[1, 1], 1)
expect_equal(collect(select(df2, locate("aa", df2$a, 1)))[1, 1], 2)
expect_equal(collect(select(df2, locate("aa", df2$a, 2)))[1, 1], 2)
expect_equal(collect(select(df2, lpad(df2$a, 8, "#")))[1, 1], "###aaads") # nolint
expect_equal(collect(select(df2, rpad(df2$a, 8, "#")))[1, 1], "aaads###") # nolint

Expand Down
4 changes: 4 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- Provided dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,14 +80,32 @@ public static String bytesToString(ByteBuffer b) {
return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
}

/*
/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
* Throws an exception if deletion is unsuccessful.
*
* @param file Input file / dir to be deleted
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file) throws IOException {
if (file == null) { return; }

// On Unix systems, use operating system command to run faster
// If that does not work out, fallback to the Java IO way
if (SystemUtils.IS_OS_UNIX) {
try {
deleteRecursivelyUsingUnixNative(file);
return;
} catch (IOException e) {
logger.warn("Attempt to delete using native Unix OS command failed for path = {}. " +
"Falling back to Java IO way", file.getAbsolutePath(), e);
}
}

deleteRecursivelyUsingJavaIO(file);
}

private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
if (file.isDirectory() && !isSymlink(file)) {
IOException savedIOException = null;
for (File child : listFilesSafely(file)) {
Expand All @@ -109,6 +128,32 @@ public static void deleteRecursively(File file) throws IOException {
}
}

private static void deleteRecursivelyUsingUnixNative(File file) throws IOException {
ProcessBuilder builder = new ProcessBuilder("rm", "-rf", file.getAbsolutePath());
Process process = null;
int exitCode = -1;

try {
// In order to avoid deadlocks, consume the stdout (and stderr) of the process
builder.redirectErrorStream(true);
builder.redirectOutput(new File("/dev/null"));

process = builder.start();

exitCode = process.waitFor();
} catch (Exception e) {
throw new IOException("Failed to delete: " + file.getAbsolutePath(), e);
} finally {
if (process != null) {
process.destroy();
}
}

if (exitCode != 0 || file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
}
}

private static File[] listFilesSafely(File file) throws IOException {
if (file.exists()) {
File[] files = file.listFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import com.google.common.io.Files;

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Manages some sort-shuffle data, including the creation
* and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
*/
public class TestShuffleDataContext {
private static final Logger logger = LoggerFactory.getLogger(TestShuffleDataContext.class);

public final String[] localDirs;
public final int subDirsPerLocalDir;

Expand All @@ -53,7 +58,11 @@ public void create() {

public void cleanup() {
for (String localDir : localDirs) {
deleteRecursively(new File(localDir));
try {
JavaUtils.deleteRecursively(new File(localDir));
} catch (IOException e) {
logger.warn("Unable to cleanup localDir = " + localDir, e);
}
}
}

Expand Down Expand Up @@ -92,17 +101,4 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr
public ExecutorShuffleInfo createExecutorInfo(String shuffleManager) {
return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
}

private static void deleteRecursively(File f) {
assert f != null;
if (f.isDirectory()) {
File[] children = f.listFiles();
if (children != null) {
for (File child : children) {
deleteRecursively(child);
}
}
}
f.delete();
}
}
25 changes: 17 additions & 8 deletions core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,24 @@ public abstract class MemoryConsumer {

protected final TaskMemoryManager taskMemoryManager;
private final long pageSize;
private final MemoryMode mode;
protected long used;

protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
this.mode = mode;
}

protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
this(taskMemoryManager, taskMemoryManager.pageSizeBytes());
this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
}

/**
* Returns the memory mode, ON_HEAP or OFF_HEAP.
*/
public MemoryMode getMode() {
return mode;
}

/**
Expand Down Expand Up @@ -132,19 +141,19 @@ protected void freePage(MemoryBlock page) {
}

/**
* Allocates a heap memory of `size`.
* Allocates memory of `size`.
*/
public long acquireOnHeapMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
public long acquireMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, this);
used += granted;
return granted;
}

/**
* Release N bytes of heap memory.
* Release N bytes of memory.
*/
public void freeOnHeapMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
public void freeMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, this);
used -= size;
}
}
Loading

0 comments on commit 7eadefd

Please sign in to comment.