Skip to content

Commit

Permalink
Merge pull request #15 from apache/master
Browse files Browse the repository at this point in the history
apache spark latest
  • Loading branch information
rekhajoshm authored Jun 30, 2018
2 parents ae51f60 + d54d8b8 commit 1c48d4f
Show file tree
Hide file tree
Showing 192 changed files with 7,182 additions and 1,431 deletions.
12 changes: 6 additions & 6 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,18 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
(BSD) JLine (jline:jline:0.9.94 - http://jline.sourceforge.net)
(BSD) JLine (jline:jline:2.14.3 - https://github.com/jline/jline2)
(BSD) ParaNamer Core (com.thoughtworks.paranamer:paranamer:2.3 - http://paranamer.codehaus.org/paranamer)
(BSD) ParaNamer Core (com.thoughtworks.paranamer:paranamer:2.6 - http://paranamer.codehaus.org/paranamer)
(BSD 3 Clause) Scala (http://www.scala-lang.org/download/#License)
(Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.12 - http://www.scala-lang.org/)
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org)
(BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org)
(BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org)
Expand Down
4 changes: 2 additions & 2 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ install_mvn() {
install_zinc() {
local zinc_path="zinc-0.3.15/bin/zinc"
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}

install_app \
"${TYPESAFE_MIRROR}/zinc/0.3.15" \
Expand All @@ -109,7 +109,7 @@ install_scala() {
# determine the Scala version used in Spark
local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | head -n1 | awk -F '[<>]' '{print $3}'`
local scala_bin="${_DIR}/scala-${scala_version}/bin/scala"
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}

install_app \
"${TYPESAFE_MIRROR}/scala/${scala_version}" \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.client;

public interface StreamCallbackWithID extends StreamCallback {
String getID();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@

import io.netty.buffer.ByteBuf;

import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.server.MessageHandler;
import org.apache.spark.network.util.TransportFrameDecoder;

/**
* An interceptor that is registered with the frame decoder to feed stream data to a
* callback.
*/
class StreamInterceptor implements TransportFrameDecoder.Interceptor {
public class StreamInterceptor<T extends Message> implements TransportFrameDecoder.Interceptor {

private final TransportResponseHandler handler;
private final MessageHandler<T> handler;
private final String streamId;
private final long byteCount;
private final StreamCallback callback;
private long bytesRead;

StreamInterceptor(
TransportResponseHandler handler,
public StreamInterceptor(
MessageHandler<T> handler,
String streamId,
long byteCount,
StreamCallback callback) {
Expand All @@ -50,16 +52,24 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor {

@Override
public void exceptionCaught(Throwable cause) throws Exception {
handler.deactivateStream();
deactivateStream();
callback.onFailure(streamId, cause);
}

@Override
public void channelInactive() throws Exception {
handler.deactivateStream();
deactivateStream();
callback.onFailure(streamId, new ClosedChannelException());
}

private void deactivateStream() {
if (handler instanceof TransportResponseHandler) {
// we only have to do this for TransportResponseHandler as it exposes numOutstandingFetches
// (there is no extra cleanup that needs to happen)
((TransportResponseHandler) handler).deactivateStream();
}
}

@Override
public boolean handle(ByteBuf buf) throws Exception {
int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
Expand All @@ -72,10 +82,10 @@ public boolean handle(ByteBuf buf) throws Exception {
RuntimeException re = new IllegalStateException(String.format(
"Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
callback.onFailure(streamId, re);
handler.deactivateStream();
deactivateStream();
throw re;
} else if (bytesRead == byteCount) {
handler.deactivateStream();
deactivateStream();
callback.onComplete(streamId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.OneWayMessage;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.*;

import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;

/**
Expand Down Expand Up @@ -133,34 +133,21 @@ public void fetchChunk(
long streamId,
int chunkIndex,
ChunkReceivedCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}

StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);

channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
StdChannelListener listener = new StdChannelListener(streamChunkId) {
@Override
void handleFailure(String errorMsg, Throwable cause) {
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
callback.onFailure(chunkIndex, new IOException(errorMsg, cause));
}
});
};
handler.addFetchRequest(streamChunkId, callback);

channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(listener);
}

/**
Expand All @@ -170,7 +157,12 @@ public void fetchChunk(
* @param callback Object to call with the stream data.
*/
public void stream(String streamId, StreamCallback callback) {
long startTime = System.currentTimeMillis();
StdChannelListener listener = new StdChannelListener(streamId) {
@Override
void handleFailure(String errorMsg, Throwable cause) throws Exception {
callback.onFailure(streamId, new IOException(errorMsg, cause));
}
};
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}
Expand All @@ -180,25 +172,7 @@ public void stream(String streamId, StreamCallback callback) {
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(streamId, callback);
channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});
channel.writeAndFlush(new StreamRequest(streamId)).addListener(listener);
}
}

Expand All @@ -211,35 +185,44 @@ public void stream(String streamId, StreamCallback callback) {
* @return The RPC's id.
*/
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
long requestId = requestId();
handler.addRpcRequest(requestId, callback);

RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
.addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
try {
callback.onFailure(new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});
.addListener(listener);

return requestId;
}

/**
* Send data to the remote end as a stream. This differs from stream() in that this is a request
* to *send* data to the remote end, not to receive it from the remote.
*
* @param meta meta data associated with the stream, which will be read completely on the
* receiving end before the stream itself.
* @param data this will be streamed to the remote end to allow for transferring large amounts
* of data without reading into memory.
* @param callback handles the reply -- onSuccess will only be called when both message and data
* are received successfully.
*/
public long uploadStream(
ManagedBuffer meta,
ManagedBuffer data,
RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

long requestId = requestId();
handler.addRpcRequest(requestId, callback);

RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(new UploadStream(requestId, meta, data)).addListener(listener);

return requestId;
}
Expand Down Expand Up @@ -319,4 +302,60 @@ public String toString() {
.add("isActive", isActive())
.toString();
}

private static long requestId() {
return Math.abs(UUID.randomUUID().getLeastSignificantBits());
}

private class StdChannelListener
implements GenericFutureListener<Future<? super Void>> {
final long startTime;
final Object requestId;

StdChannelListener(Object requestId) {
this.startTime = System.currentTimeMillis();
this.requestId = requestId;
}

@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
if (logger.isTraceEnabled()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
handleFailure(errorMsg, future.cause());
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}

void handleFailure(String errorMsg, Throwable cause) throws Exception {}
}

private class RpcChannelListener extends StdChannelListener {
final long rpcRequestId;
final RpcResponseCallback callback;

RpcChannelListener(long rpcRequestId, RpcResponseCallback callback) {
super("RPC " + rpcRequestId);
this.rpcRequestId = rpcRequestId;
this.callback = callback;
}

@Override
void handleFailure(String errorMsg, Throwable cause) {
handler.removeRpcRequest(rpcRequestId);
callback.onFailure(new IOException(errorMsg, cause));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.SaslRpcHandler;
Expand Down Expand Up @@ -149,6 +150,14 @@ public void receive(TransportClient client, ByteBuffer message) {
delegate.receive(client, message);
}

@Override
public StreamCallbackWithID receiveStream(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback) {
return delegate.receiveStream(client, message, callback);
}

@Override
public StreamManager getStreamManager() {
return delegate.getStreamManager();
Expand Down
Loading

0 comments on commit 1c48d4f

Please sign in to comment.