Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload. #21346

Closed
wants to merge 21 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented May 16, 2018

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks. By itself, this change is adding dead-code that is not being used -- it is a step towards SPARK-24296.

Added unit tests for handling streaming data, including successfully sending data, and failures in reading the stream with concurrent requests.

Summary of changes:

  • Introduce a new UploadStream RPC which is sent to push a large payload as a stream (in contrast, the pre-existing StreamRequest and StreamResponse RPCs are used for pull-based streaming).
  • Generalize RpcHandler.receive() to support requests which contain streams.
  • Generalize StreamInterceptor to handle both request and response messages (previously it only handled responses).
  • Introduce StdChannelListener to abstract away common logging logic in ChannelFuture listeners.

@SparkQA
Copy link

SparkQA commented May 16, 2018

Test build #90693 has finished for PR 21346 at commit 49e0a80.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StreamInterceptor<T extends Message> implements TransportFrameDecoder.Interceptor
  • public final class UploadStream extends AbstractMessage implements RequestMessage
  • public class StreamData

@JoshRosen
Copy link
Contributor

It's been a little while since I've thought about this issue, so I have a few clarifying questions to help me understand the high-level changes:

  1. I recall that the problem with large shuffle blocks was that the OneForOneBlockFetcher strategy basically read the entire block as a single chunk, which becomes a problem for large blocks. I understand that we have now removed this limitation for shuffles by using a streaming transfer strategy only for large blocks (above some threshold). Is this patch conceptually doing the same thing for push-based communication where the action is initiated by a sender (e.g. to push a block for replication)? Does it also affect pull-based remote cache block reads or will that be handled separately?
  2. Given that we already seem to have pull-based openStream() calls which can be initiated from the receive side, could we simplify things here by pushing a "this value is big, pull it" message and then have the remote end initiate a streaming read, similar to how DirectTaskResult and IndirectTaskResult work?
  3. For remote reads of large cached blocks: is it true that this works today only if the block is on disk but fails if the block is in memory? If certain size limit problems only occur when things are cached in memory, can we simplify anything if we add a requirement that blocks above 2GB can only be cached on disk (regardless of storage level)?

@SparkQA
Copy link

SparkQA commented May 16, 2018

Test build #90694 has finished for PR 21346 at commit fa3ac4e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to take a look at this at some point but I basically wanted to ask the same thing as Josh's question 2.

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this"),

// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we started adding these at the top since that is cleaner (doesn't require changing the previous exclusion rule).

@squito
Copy link
Contributor Author

squito commented May 17, 2018

All good questions and stuff I had wondered about too -- I should actually be sure to comment on these on the jira as well:

I recall that the problem with large shuffle blocks was that the OneForOneBlockFetcher strategy basically read the entire block as a single chunk, which becomes a problem for large blocks. I understand that we have now removed this limitation for shuffles by using a streaming transfer strategy only for large blocks (above some threshold). Is this patch conceptually doing the same thing for push-based communication where the action is initiated by a sender (e.g. to push a block for replication)?

yes

Does it also affect pull-based remote cache block reads or will that be handled separately?

that was already handled by https://issues.apache.org/jira/browse/SPARK-22062 (despite the title saying its something else entirely). That said, I recently discovered that my tests doing this for large blocks was incorrect, so I need to reconfirm this (I need to rearrange my test a little, and I've got a different aspect of this in flight so will take a couple of days probably).

Given that we already seem to have pull-based openStream() calls which can be initiated from the receive side, could we simplify things here by pushing a "this value is big, pull it" message and then have the remote end initiate a streaming read, similar to how DirectTaskResult and IndirectTaskResult work?

its certainly possible to do this, and I started taking this approach, but I stopped because replication is synchronous. So you'd have to add a callback for when the block is finally fetched, to go back to this initial call -- but also add timeout logic to avoid waiting forever if the destination went away. It all seemed much more complicated than doing it the way I'm proposing here.

For remote reads of large cached blocks: is it true that this works today only if the block is on disk but fails if the block is in memory? If certain size limit problems only occur when things are cached in memory, can we simplify anything if we add a requirement that blocks above 2GB can only be cached on disk (regardless of storage level)?

Correct; I'm currently investigating what we can do to address this. (sorry, again I discovered my test was broken shortly after posting this.) It would certainly simplify things if we only supported this for disk cached blocks -- what exactly are you proposing? Just failing when its cached in memory, and telling the user to rerun with disk caching? Changing the block manager to automatically cache on disk also when the block is > 2gb? Or when sending the block, just write it to a temp file, and then send from that?

The problem here is on the sending side, not the receiving side; netty uses an int to manage the length of a ByteBuf based msg, but it uses a long for a FileRegion based msg (code is a little different in the latest on branch 4.1, but same problem is still there). I'm investigating making a "FileRegion" that is actually backed by a ChunkedByteBuffer.

But that would go into another jira under SPARK-6235

@squito
Copy link
Contributor Author

squito commented May 17, 2018

btw I may have made the pull-based approach sound more complex than I meant to, I'm happy to take that approach if you think its better. The fact the replication is synchronous doesn't really matter, I just meant its not a fire-and-forget msg, we have to setup the callbacks to confirm the block has been fetched (or a failure). It just seemed like extra indirection to me, and I thought it would be better to stay closer to the UploadBlock path.

Are there particular reasons you think that approach would be better? I guess the receiver can throttle the requests, but on the other hand the task on the sender will block waiting for the replication to finish (whether its success or failure), so we really don't want it to wait too long.

squito added 2 commits May 23, 2018 12:49
These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests.
@SparkQA
Copy link

SparkQA commented May 23, 2018

Test build #91056 has finished for PR 21346 at commit 3098b9c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 23, 2018

Test build #4188 has finished for PR 21346 at commit 3098b9c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 24, 2018

Test build #91121 has finished for PR 21346 at commit 2fef75f.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 24, 2018

Test build #91122 has finished for PR 21346 at commit 54533c2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2018

Test build #91136 has finished for PR 21346 at commit 32f4f94.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2018

Test build #91138 has finished for PR 21346 at commit 331124b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2018

Test build #4189 has finished for PR 21346 at commit 331124b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented May 25, 2018

Last failures are known flakies.

A few updates here from my last set of comments. I've posted an overall design doc, and shared the tests I'm running on a cluster. I think the tests cover all the cases care about, but would appreciate review on that tests too. I can change this to use the existing pull approach for large blocks, rather than updating the push one if you want. If you're OK with this, there will be one more PR on top of this to make use of the new uploadStream functionality.

There will be another PR as well to cover reading large remote blocks in memory for SPARK-24307

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the code the changes seem simpler than I expected. While it would have been nice to re-use the existing feature if possible, I can see how a "please pull this" approach could make resource management on the server side a little more complicated; the server would have to keep around some list of things that are waiting to be pulled, when with the new RPC is just sends the message and doesn't need to keep state around for cleanup.

I need to go through the stuff you attached to the bug still.


channel.writeAndFlush(new UploadStream(requestId, meta, data))
.addListener(future -> {
if (future.isSuccess()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First reaction is that it's about the right time to refactor this into a helper method... all instances in this class look quite similar.

import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* An RPC with data that is sent outside of the frame, so it can be read in a stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a stream?

@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single TransportClient (i.e., channel).
*
* The rpc *might* included a data stream in <code>streamData</code>(eg. for uploading a large
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before (

*
* If an exception is thrown from the callback, it will be propogated back to the sender as an rpc
* failure.
* @param callback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either remove or document all parameters (and add an empty line before).

@@ -23,25 +23,16 @@
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.apache.spark.network.protocol.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are in the wrong place.

res.successMessages = Collections.synchronizedSet(new HashSet<String>());
res.errorMessages = Collections.synchronizedSet(new HashSet<String>());

for (String stream: streams) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before :

final String streamId;
final RpcResult res;
final Semaphore sem;
RpcStreamCallback(String streamId, RpcResult res, Semaphore sem) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add empty line

@@ -193,10 +299,78 @@ public void sendOneWayMessage() throws Exception {
}
}

@Test
public void sendRpcWithStreamOneAtATime() throws Exception {
for (String stream: StreamTestHelper.STREAMS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before :

package org.apache.spark.network;

import com.google.common.io.Files;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong place... basically in every file you've changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooops, sorry got used to the style checker warning finding these in scala. fixed these now.

@@ -36,6 +36,9 @@ object MimaExcludes {

// Exclude rules for 2.4.x
lazy val v24excludes = v23excludes ++ Seq(
// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda wondering why this class is public in the first place... along with SparkTransportConf in the same package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that it's because we might want to access these across Java package boundaries and Java doesn't have the equivalent of Scala's nested package scoped private[package].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see references to them in Scala code... also private[package] translates to public in Java, so that would at least avoid the mima checks.

@vanzin
Copy link
Contributor

vanzin commented May 25, 2018

I've posted an overall design doc, and shared the tests I'm running on a cluster.

Where did you post those? Couldn't find them on the bug, nor the bug linked from that bug.

EDIT: nevermind, it's linked from the "epic" (SPARK-6235).

@SparkQA
Copy link

SparkQA commented May 27, 2018

Test build #91192 has finished for PR 21346 at commit f4d9123.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RpcChannelListener extends StdChannelListener

@SparkQA
Copy link

SparkQA commented May 27, 2018

Test build #91195 has finished for PR 21346 at commit 7bd1b43.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #4190 has finished for PR 21346 at commit 7bd1b43.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

* @param client A channel client which enables the handler to make requests back to the sender
* of this RPC. This will always be the exact same object for a particular channel.
* @param message The serialized bytes of the RPC.
* @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if a separate callback for these streams wouldn't be better. It would at the very least avoid having to change all the existing handlers.

But it would also make it clearer what the contract is. For example, the callback could return the stream callback to be registered.

It also doesn't seem like StreamData itself has a lot of useful information other than the registration method, so it could be replaced with parameters in the new callback, avoiding having to expose that type to RPC handlers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this refactoring, and I agree it made the change significantly simpler.

@SparkQA
Copy link

SparkQA commented Jun 4, 2018

Test build #91450 has finished for PR 21346 at commit 6c086c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91791 has started for PR 21346 at commit 3d28a1b.

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91792 has finished for PR 21346 at commit cf991a9.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91793 has finished for PR 21346 at commit 8a18da5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Math.abs(UUID.randomUUID().getLeastSignificantBits()); is repeated twice. Move it to a separate new method .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91854 has finished for PR 21346 at commit 1a222aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly style nits, aside from the test issue.

logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId))
.addListener( new StdChannelListener(startTime, streamChunkId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space after (


return requestId;
}

/**
* Send data to the remote end as a stream. This differs from stream() in that this is a request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're in the "2 spaces after period camp", but that's 3.

return requestId;
}

private class StdChannelListener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally try to keep nested classes at the bottom of the enclosing class, but up to you.

ManagedBuffer meta,
ManagedBuffer data,
RpcResponseCallback callback) {
long startTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it should be easy to move this to StdChannelListener's constructor. Looks pretty similar in all methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do that the originally as I figured you wanted the startTime to be before writeAndFlush, but I can work around that too.

respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
// We choose to totally fail the channel, rather than trying to recover as we do in other
// cases. We don't know how many bytes of the stream the client has already sent for the
// stream, its not worth trying to recover.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's

final StreamSuite.TestCallback helper;
final OutputStream out;
final File outFile;
VerifyingStreamCallback(String streamId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add empty line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, sorry I missed this one. fixed now

base.get(expected);
assertEquals(expected.length, result.length);
assertTrue("buffers don't match", Arrays.equals(expected, result));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };

final File testFile;
File tempDir;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final for all these?

}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove


void cleanup() {
if (tempDir != null) {
for (File f : tempDir.listFiles()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaUtils.deleteRecursively.

/**
* An RPC with data that is sent outside of the frame, so it can be read as a stream.
*/
public final class UploadStream extends AbstractMessage implements RequestMessage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to merge UploadStream and RpcRequest into a class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, but do you think that is really that useful? the handling of them is different (both in the network layer and the outer RpcHandler). And other things being equal, I'm biased to fewer changes to existing code paths.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, I'll give Josh some time to comment if he has anything to say.

I'd have moved the debug logging into the new listener too, but that's minor.

TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
ByteBuffer meta = req.meta.nioByteBuffer();
StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for null? Otherwise you'll get some weird NPE buried in some other code path.

}
streamCallbacks.values().forEach(streamCallback -> {
try {
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the wait part now redundant, after you waited for the semaphore?

streamCallbacks.values().forEach(streamCallback -> {
try {
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method throws Exception, so this seems unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forEach doesn't like the IOException

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91925 has finished for PR 21346 at commit ea4a1f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92101 has finished for PR 21346 at commit fd62f61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending tests.

final StreamSuite.TestCallback helper;
final OutputStream out;
final File outFile;
VerifyingStreamCallback(String streamId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92347 has finished for PR 21346 at commit 58d52b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92349 has finished for PR 21346 at commit cd11abc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jun 26, 2018

Merging to master.

@asfgit asfgit closed this in 16f2c3e Jun 26, 2018
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.  By itself, this change is adding dead-code that is not being used -- it is a step towards SPARK-24296.

Added unit tests for handling streaming data, including successfully sending data, and failures in reading the stream with concurrent requests.

Summary of changes:

* Introduce a new UploadStream RPC which is sent to push a large payload as a stream (in contrast, the pre-existing StreamRequest and StreamResponse RPCs are used for pull-based streaming).
* Generalize RpcHandler.receive() to support requests which contain streams.
* Generalize StreamInterceptor to handle both request and response messages (previously it only handled responses).
* Introduce StdChannelListener to abstract away common logging logic in ChannelFuture listeners.

Author: Imran Rashid <[email protected]>

Closes apache#21346 from squito/upload_stream.
Ref: LIHADOOP-52972

RB=2070064
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants