-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6237][NETWORK] Network-layer changes to allow stream upload. #21346
Conversation
Test build #90693 has finished for PR 21346 at commit
|
It's been a little while since I've thought about this issue, so I have a few clarifying questions to help me understand the high-level changes:
|
Test build #90694 has finished for PR 21346 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to take a look at this at some point but I basically wanted to ask the same thing as Josh's question 2.
project/MimaExcludes.scala
Outdated
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this") | ||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this"), | ||
|
||
// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we started adding these at the top since that is cleaner (doesn't require changing the previous exclusion rule).
All good questions and stuff I had wondered about too -- I should actually be sure to comment on these on the jira as well:
yes
that was already handled by https://issues.apache.org/jira/browse/SPARK-22062 (despite the title saying its something else entirely). That said, I recently discovered that my tests doing this for large blocks was incorrect, so I need to reconfirm this (I need to rearrange my test a little, and I've got a different aspect of this in flight so will take a couple of days probably).
its certainly possible to do this, and I started taking this approach, but I stopped because replication is synchronous. So you'd have to add a callback for when the block is finally fetched, to go back to this initial call -- but also add timeout logic to avoid waiting forever if the destination went away. It all seemed much more complicated than doing it the way I'm proposing here.
Correct; I'm currently investigating what we can do to address this. (sorry, again I discovered my test was broken shortly after posting this.) It would certainly simplify things if we only supported this for disk cached blocks -- what exactly are you proposing? Just failing when its cached in memory, and telling the user to rerun with disk caching? Changing the block manager to automatically cache on disk also when the block is > 2gb? Or when sending the block, just write it to a temp file, and then send from that? The problem here is on the sending side, not the receiving side; netty uses an But that would go into another jira under SPARK-6235 |
btw I may have made the pull-based approach sound more complex than I meant to, I'm happy to take that approach if you think its better. The fact the replication is synchronous doesn't really matter, I just meant its not a fire-and-forget msg, we have to setup the callbacks to confirm the block has been fetched (or a failure). It just seemed like extra indirection to me, and I thought it would be better to stay closer to the UploadBlock path. Are there particular reasons you think that approach would be better? I guess the receiver can throttle the requests, but on the other hand the task on the sender will block waiting for the replication to finish (whether its success or failure), so we really don't want it to wait too long. |
These changes allow an RPCHandler to receive an upload as a stream of data, without having to buffer the entire message in the FrameDecoder. The primary use case is for replicating large blocks. Added unit tests.
Test build #91056 has finished for PR 21346 at commit
|
Test build #4188 has finished for PR 21346 at commit
|
Test build #91121 has finished for PR 21346 at commit
|
Test build #91122 has finished for PR 21346 at commit
|
Test build #91136 has finished for PR 21346 at commit
|
Test build #91138 has finished for PR 21346 at commit
|
Test build #4189 has finished for PR 21346 at commit
|
Last failures are known flakies. A few updates here from my last set of comments. I've posted an overall design doc, and shared the tests I'm running on a cluster. I think the tests cover all the cases care about, but would appreciate review on that tests too. I can change this to use the existing pull approach for large blocks, rather than updating the push one if you want. If you're OK with this, there will be one more PR on top of this to make use of the new uploadStream functionality. There will be another PR as well to cover reading large remote blocks in memory for SPARK-24307 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the code the changes seem simpler than I expected. While it would have been nice to re-use the existing feature if possible, I can see how a "please pull this" approach could make resource management on the server side a little more complicated; the server would have to keep around some list of things that are waiting to be pulled, when with the new RPC is just sends the message and doesn't need to keep state around for cleanup.
I need to go through the stuff you attached to the bug still.
|
||
channel.writeAndFlush(new UploadStream(requestId, meta, data)) | ||
.addListener(future -> { | ||
if (future.isSuccess()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First reaction is that it's about the right time to refactor this into a helper method... all instances in this class look quite similar.
import org.apache.spark.network.buffer.NettyManagedBuffer; | ||
|
||
/** | ||
* An RPC with data that is sent outside of the frame, so it can be read in a stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a stream?
@@ -38,15 +38,24 @@ | |||
* | |||
* This method will not be called in parallel for a single TransportClient (i.e., channel). | |||
* | |||
* The rpc *might* included a data stream in <code>streamData</code>(eg. for uploading a large |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space before (
* | ||
* If an exception is thrown from the callback, it will be propogated back to the sender as an rpc | ||
* failure. | ||
* @param callback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either remove or document all parameters (and add an empty line before).
@@ -23,25 +23,16 @@ | |||
import com.google.common.base.Throwables; | |||
import io.netty.channel.Channel; | |||
import io.netty.channel.ChannelFuture; | |||
import org.apache.spark.network.protocol.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are in the wrong place.
res.successMessages = Collections.synchronizedSet(new HashSet<String>()); | ||
res.errorMessages = Collections.synchronizedSet(new HashSet<String>()); | ||
|
||
for (String stream: streams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space before :
final String streamId; | ||
final RpcResult res; | ||
final Semaphore sem; | ||
RpcStreamCallback(String streamId, RpcResult res, Semaphore sem) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add empty line
@@ -193,10 +299,78 @@ public void sendOneWayMessage() throws Exception { | |||
} | |||
} | |||
|
|||
@Test | |||
public void sendRpcWithStreamOneAtATime() throws Exception { | |||
for (String stream: StreamTestHelper.STREAMS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space before :
package org.apache.spark.network; | ||
|
||
import com.google.common.io.Files; | ||
import org.apache.spark.network.buffer.FileSegmentManagedBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong place... basically in every file you've changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooops, sorry got used to the style checker warning finding these in scala. fixed these now.
@@ -36,6 +36,9 @@ object MimaExcludes { | |||
|
|||
// Exclude rules for 2.4.x | |||
lazy val v24excludes = v23excludes ++ Seq( | |||
// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload | |||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kinda wondering why this class is public in the first place... along with SparkTransportConf
in the same package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that it's because we might want to access these across Java package boundaries and Java doesn't have the equivalent of Scala's nested package scoped private[package]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see references to them in Scala code... also private[package]
translates to public
in Java, so that would at least avoid the mima checks.
Where did you post those? Couldn't find them on the bug, nor the bug linked from that bug. EDIT: nevermind, it's linked from the "epic" (SPARK-6235). |
Test build #91192 has finished for PR 21346 at commit
|
Test build #91195 has finished for PR 21346 at commit
|
Test build #4190 has finished for PR 21346 at commit
|
* @param client A channel client which enables the handler to make requests back to the sender | ||
* of this RPC. This will always be the exact same object for a particular channel. | ||
* @param message The serialized bytes of the RPC. | ||
* @param streamData StreamData if there is data which is meant to be read via a StreamCallback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if a separate callback for these streams wouldn't be better. It would at the very least avoid having to change all the existing handlers.
But it would also make it clearer what the contract is. For example, the callback could return the stream callback to be registered.
It also doesn't seem like StreamData
itself has a lot of useful information other than the registration method, so it could be replaced with parameters in the new callback, avoiding having to expose that type to RPC handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done this refactoring, and I agree it made the change significantly simpler.
Test build #91450 has finished for PR 21346 at commit
|
Test build #91791 has started for PR 21346 at commit |
Test build #91792 has finished for PR 21346 at commit
|
Test build #91793 has finished for PR 21346 at commit
|
logger.trace("Sending RPC to {}", getRemoteAddress(channel)); | ||
} | ||
|
||
long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Math.abs(UUID.randomUUID().getLeastSignificantBits());
is repeated twice. Move it to a separate new method .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Test build #91854 has finished for PR 21346 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly style nits, aside from the test issue.
logger.trace("Sending request {} to {} took {} ms", streamChunkId, | ||
getRemoteAddress(channel), timeTaken); | ||
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)) | ||
.addListener( new StdChannelListener(startTime, streamChunkId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no space after (
|
||
return requestId; | ||
} | ||
|
||
/** | ||
* Send data to the remote end as a stream. This differs from stream() in that this is a request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you're in the "2 spaces after period camp", but that's 3.
return requestId; | ||
} | ||
|
||
private class StdChannelListener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally try to keep nested classes at the bottom of the enclosing class, but up to you.
ManagedBuffer meta, | ||
ManagedBuffer data, | ||
RpcResponseCallback callback) { | ||
long startTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it should be easy to move this to StdChannelListener
's constructor. Looks pretty similar in all methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't do that the originally as I figured you wanted the startTime to be before writeAndFlush
, but I can work around that too.
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); | ||
// We choose to totally fail the channel, rather than trying to recover as we do in other | ||
// cases. We don't know how many bytes of the stream the client has already sent for the | ||
// stream, its not worth trying to recover. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's
final StreamSuite.TestCallback helper; | ||
final OutputStream out; | ||
final File outFile; | ||
VerifyingStreamCallback(String streamId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, sorry I missed this one. fixed now
base.get(expected); | ||
assertEquals(expected.length, result.length); | ||
assertTrue("buffers don't match", Arrays.equals(expected, result)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" }; | ||
|
||
final File testFile; | ||
File tempDir; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
for all these?
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
|
||
void cleanup() { | ||
if (tempDir != null) { | ||
for (File f : tempDir.listFiles()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaUtils.deleteRecursively
.
/** | ||
* An RPC with data that is sent outside of the frame, so it can be read as a stream. | ||
*/ | ||
public final class UploadStream extends AbstractMessage implements RequestMessage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to merge UploadStream and RpcRequest into a class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps, but do you think that is really that useful? the handling of them is different (both in the network layer and the outer RpcHandler). And other things being equal, I'm biased to fewer changes to existing code paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, I'll give Josh some time to comment if he has anything to say.
I'd have moved the debug logging into the new listener too, but that's minor.
TransportFrameDecoder frameDecoder = (TransportFrameDecoder) | ||
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME); | ||
ByteBuffer meta = req.meta.nioByteBuffer(); | ||
StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for null? Otherwise you'll get some weird NPE buried in some other code path.
} | ||
streamCallbacks.values().forEach(streamCallback -> { | ||
try { | ||
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the wait part now redundant, after you waited for the semaphore?
streamCallbacks.values().forEach(streamCallback -> { | ||
try { | ||
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5)); | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method throws Exception
, so this seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forEach
doesn't like the IOException
Test build #91925 has finished for PR 21346 at commit
|
Test build #92101 has finished for PR 21346 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending tests.
final StreamSuite.TestCallback helper; | ||
final OutputStream out; | ||
final File outFile; | ||
VerifyingStreamCallback(String streamId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
Test build #92347 has finished for PR 21346 at commit
|
Test build #92349 has finished for PR 21346 at commit
|
Merging to master. |
These changes allow an RPCHandler to receive an upload as a stream of data, without having to buffer the entire message in the FrameDecoder. The primary use case is for replicating large blocks. By itself, this change is adding dead-code that is not being used -- it is a step towards SPARK-24296. Added unit tests for handling streaming data, including successfully sending data, and failures in reading the stream with concurrent requests. Summary of changes: * Introduce a new UploadStream RPC which is sent to push a large payload as a stream (in contrast, the pre-existing StreamRequest and StreamResponse RPCs are used for pull-based streaming). * Generalize RpcHandler.receive() to support requests which contain streams. * Generalize StreamInterceptor to handle both request and response messages (previously it only handled responses). * Introduce StdChannelListener to abstract away common logging logic in ChannelFuture listeners. Author: Imran Rashid <[email protected]> Closes apache#21346 from squito/upload_stream. Ref: LIHADOOP-52972 RB=2070064
These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks. By itself, this change is adding dead-code that is not being used -- it is a step towards SPARK-24296.
Added unit tests for handling streaming data, including successfully sending data, and failures in reading the stream with concurrent requests.
Summary of changes: