Skip to content

Commit

Permalink
Merge remote-tracking branch 'spark/master' into SPARK-13019
Browse files Browse the repository at this point in the history
  • Loading branch information
keypointt committed Mar 22, 2016
2 parents 892fe60 + caea152 commit 87b2c56
Show file tree
Hide file tree
Showing 946 changed files with 14,090 additions and 16,560 deletions.
10 changes: 5 additions & 5 deletions R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ To set other options like driver memory, executor memory etc. you can pass in th
If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example
```
# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/Users/shivaram/spark")
Sys.setenv(SPARK_HOME="/Users/username/spark")
# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Expand All @@ -51,7 +51,7 @@ sc <- sparkR.init(master="local")

The [instructions](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) for making contributions to Spark also apply to SparkR.
If you only make R file changes (i.e. no Scala changes) then you can just re-install the R package using `R/install-dev.sh` and test your changes.
Once you have made your changes, please include unit tests for them and run existing unit tests using the `run-tests.sh` script as described below.
Once you have made your changes, please include unit tests for them and run existing unit tests using the `R/run-tests.sh` script as described below.

#### Generating documentation

Expand All @@ -60,17 +60,17 @@ The SparkR documentation (Rd files and HTML files) are not a part of the source
### Examples, Unit tests

SparkR comes with several sample programs in the `examples/src/main/r` directory.
To run one of them, use `./bin/sparkR <filename> <args>`. For example:
To run one of them, use `./bin/spark-submit <filename> <args>`. For example:

./bin/sparkR examples/src/main/r/dataframe.R
./bin/spark-submit examples/src/main/r/dataframe.R

You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first):

R -e 'install.packages("testthat", repos="http://cran.us.r-project.org")'
./R/run-tests.sh

### Running on YARN
The `./bin/spark-submit` and `./bin/sparkR` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run
The `./bin/spark-submit` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run
```
export YARN_CONF_DIR=/etc/hadoop/conf
./bin/spark-submit --master yarn examples/src/main/r/dataframe.R
Expand Down
55 changes: 2 additions & 53 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,5 @@ if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

EXAMPLES_DIR="${SPARK_HOME}"/examples

. "${SPARK_HOME}"/bin/load-spark-env.sh

if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
shift
else
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
echo " - set MASTER=XX to use a specific master" 1>&2
echo " - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2
echo " (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2
exit 1
fi

if [ -f "${SPARK_HOME}/RELEASE" ]; then
JAR_PATH="${SPARK_HOME}/lib"
else
JAR_PATH="${EXAMPLES_DIR}/target/scala-${SPARK_SCALA_VERSION}"
fi

JAR_COUNT=0

for f in "${JAR_PATH}"/spark-examples-*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in ${SPARK_HOME}/lib or ${SPARK_HOME}/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done

if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
ls "${JAR_PATH}"/spark-examples-*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

export SPARK_EXAMPLES_JAR

EXAMPLE_MASTER=${MASTER:-"local[*]"}

if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi

exec "${SPARK_HOME}"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
"$@"
export _SPARK_CMD_USAGE="Usage: ./bin/run-example [options] example-class [example args]"
exec "${SPARK_HOME}"/bin/spark-submit run-example "$@"
7 changes: 3 additions & 4 deletions bin/run-example.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem This is the entry point for running a Spark example. To avoid polluting
rem the environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0run-example2.cmd" %*
set SPARK_HOME=%~dp0..
set _SPARK_CMD_USAGE=Usage: ./bin/run-example [options] example-class [example args]
cmd /V /E /C "%~dp0spark-submit.cmd" run-example %*
85 changes: 0 additions & 85 deletions bin/run-example2.cmd

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@

/**
* Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
* setup Netty Channel pipelines with a {@link org.apache.spark.network.server.TransportChannelHandler}.
* setup Netty Channel pipelines with a
* {@link org.apache.spark.network.server.TransportChannelHandler}.
*
* There are two communication protocols that the TransportClient provides, control-plane RPCs and
* data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
*/
public final class NettyManagedBuffer extends ManagedBuffer {
public class NettyManagedBuffer extends ManagedBuffer {
private final ByteBuf buf;

public NettyManagedBuffer(ByteBuf buf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.nio.ByteBuffer;

/**
* Callback for streaming data. Stream data will be offered to the {@link #onData(String, ByteBuffer)}
* method as it arrives. Once all the stream data is received, {@link #onComplete(String)} will be
* called.
* Callback for streaming data. Stream data will be offered to the
* {@link #onData(String, ByteBuffer)} method as it arrives. Once all the stream data is received,
* {@link #onComplete(String)} will be called.
* <p>
* The network library guarantees that a single thread will call these methods at a time, but
* different call may be made by different threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private static class ClientPool {
TransportClient[] clients;
Object[] locks;

public ClientPool(int size) {
ClientPool(int size) {
clients = new TransportClient[size];
locks = new Object[size];
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ public interface Message extends Encodable {
boolean isBodyInFrame();

/** Preceding every serialized Message is its type, which allows us to deserialize it. */
public static enum Type implements Encodable {
enum Type implements Encodable {
ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
RpcRequest(3), RpcResponse(4), RpcFailure(5),
StreamRequest(6), StreamResponse(7), StreamFailure(8),
OneWayMessage(9), User(-1);

private final byte id;

private Type(int id) {
Type(int id) {
assert id < 128 : "Cannot have more than 128 message types";
this.id = (byte) id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.network.protocol;

import org.apache.spark.network.protocol.Message;

/** Messages from the client to the server. */
public interface RequestMessage extends Message {
// token interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.network.protocol;

import org.apache.spark.network.protocol.Message;

/** Messages from the server to the client. */
public interface ResponseMessage extends Message {
// token interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class SaslMessage extends AbstractMessage {

public final String appId;

public SaslMessage(String appId, byte[] message) {
SaslMessage(String appId, byte[] message) {
this(appId, Unpooled.wrappedBuffer(message));
}

public SaslMessage(String appId, ByteBuf message) {
SaslMessage(String appId, ByteBuf message) {
super(new NettyManagedBuffer(message), true);
this.appId = appId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.spark.network.client.TransportClient;

/**
* StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are individually
* fetched as chunks by the client. Each registered buffer is one chunk.
* StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are
* individually fetched as chunks by the client. Each registered buffer is one chunk.
*/
public class OneForOneStreamManager extends StreamManager {
private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (responseHandler.numOutstandingRequests() > 0) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum ByteUnit {
TiB ((long) Math.pow(1024L, 4L)),
PiB ((long) Math.pow(1024L, 5L));

private ByteUnit(long multiplier) {
ByteUnit(long multiplier) {
this.multiplier = multiplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.NoSuchElementException;

import org.apache.spark.network.util.ConfigProvider;

/** Uses System properties to obtain config values. */
public class SystemPropertyConfigProvider extends ConfigProvider {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private boolean feedInterceptor(ByteBuf buf) throws Exception {
return interceptor != null;
}

public static interface Interceptor {
public interface Interceptor {

/**
* Handles data received from the remote end.
Expand Down
Loading

0 comments on commit 87b2c56

Please sign in to comment.