Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files #25620

Closed
wants to merge 15 commits into from

Conversation

xuanyuanking
Copy link
Member

After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.

What changes were proposed in this pull request?

In this patch, we achieve the indeterminate shuffle rerun by reusing the task attempt id(unique id within an application) in shuffle id, so that each shuffle write attempt has a different file name. For the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions.

All changes are summarized as follows:

  • Change the mapId to mapTaskAttemptId in shuffle related id.
  • Record the mapTaskAttemptId in MapStatus.
  • Still keep mapId in ShuffleFetcherIterator for fetch failed scenario.
  • Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage.

Why are the changes needed?

This is a follow-up work for #22112's future improvment[1]: Currently we can't rollback and rerun a shuffle map stage, and just fail.

Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files.

Does this PR introduce any user-facing change?

Yes, after this PR, the indeterminate stage rerun will be accepted by rerunning the whole stage. The original behavior is aborting the stage and fail the job.

How was this patch tested?

  • UT: Add UT for all changing code and newly added function.
  • Manual Test: Also providing a manual test to verify the effect.
import scala.sys.process._
import org.apache.spark.TaskContext

val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
val indeterminateStage1 = determinateStage0.repartition(200)
val indeterminateStage2 = indeterminateStage1.repartition(200)
val indeterminateStage3 = indeterminateStage2.repartition(100)
val indeterminateStage4 = indeterminateStage3.repartition(300)
val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 && 
  TaskContext.get.stageAttemptNumber == 0) {
  throw new Exception("pkill -f -n java".!!)
  }
  x
}
val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length

It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
image

@SparkQA
Copy link

SparkQA commented Aug 29, 2019

Test build #109907 has finished for PR 25620 at commit c6cbb06.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long, Int)])

*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, ArrayBuffer[Long]]()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After using the map task attempt id as part of shuffle file, here we must record all the map task attempt id for each shuffle id. So comparing with the original implement, it's a memory waste here. But consider the shuffle map task number, it's maybe an accessible change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly it's hundreds or thousands of mappers, which is about several KB. I think it's fine. If there are many many mappers, our DAG scheduler will burn out first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, @xuanyuanking should already be aware of that 100K Mappers is not that rare for large production jobs. That would be ~10MB for single one map stage.

Maybe we should removes old shuffleId's data just like the scheduler removes old stages. However I do believe it's fine for now. Let's revise this when it actually hits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's why I highlight this question here, just because we see the huge job before. :)
For current implement of ContextCleaner, the in-memory shuffle metrics bind with JVM gc, the config spark.cleaner.periodicGC.interval can help us.
Sure, let's keep tracking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this map is only touched on executors, right? so its only the number of tasks which run on one executor which matter here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem too worrying at first (even 10MB per stage isn't that much overhead if it's cleaned up eventually). But perhaps using OpenHashSet can help with larger stages (vs. an ArrayBuffer), although it will use more memory for smaller ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the guidance, change it to use OpenHashSet, and gives a smaller initial size 16.

@xuanyuanking
Copy link
Member Author

As @squito's suggestion in #24892 (comment), this PR reuse the map task attempt id as part of the shuffle file name.
cc @cloud-fan @vanzin

@@ -300,7 +300,7 @@ public ShuffleMetrics() {
}

ManagedBufferIterator(FetchShuffleBlocks msg, int numBlockIds) {
final int[] mapIdAndReduceIds = new int[2 * numBlockIds];
final long[] mapIdAndReduceIds = new long[2 * numBlockIds];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should one long[] for map id and one int[] for reduce id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we already have long[] for map id and int[] for reduce id in the message, here we need is kinda assemble work to flatten reduce id and its corresponding mapid.
The current way waste memory, we can also do it in a cpu consuming way, which is for each index, calculate which map id and reduce id corresponding with the idx.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a further look, I split the new protocol managed buffer iterator in
539d725, that make us more flexible to control the iterator and no more array created.

*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we name it mapId? To be consistent with the codebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original mapId still used in mapOutputTracker and scheduler, I doubt anybody will confused by these two ids use same name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all mapTaskAttemptId stuff to mapId in 539d725.
So after the change, the mapId is the unique id for a map task. If we think it's confused to have a mapId represent the map index within a stage or a task set, mapIndex maybe a much better name.

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"),

// [SPARK-25341][CORE] Support rolling back a shuffle map stage and re-generate the shuffle files
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"),
Copy link
Contributor

@cloud-fan cloud-fan Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that this is tracked by mima. It's obviously an internal class. cc @srowen @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably customize what is ignored by adding some logic to GenerateMIMAIgnore. I think it's OK to just add exclusions here too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe GenerateMIMAIgnore isn't really ignoring classes annotated with @Private.

@SparkQA
Copy link

SparkQA commented Aug 29, 2019

Test build #109909 has finished for PR 25620 at commit fa25005.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2019

Test build #109940 has finished for PR 25620 at commit 539d725.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -106,7 +106,7 @@ protected void handleMessage(
numBlockIds += ids.length;
}
streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg, numBlockIds), client.getChannel());
new ShuffleManagedBufferIterator(msg), client.getChannel());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove

numBlockIds = 0;
          for (int[] ids: msg.reduceIds) {
            numBlockIds += ids.length;
          }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numBlockIds used in callback:

callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());

@@ -706,6 +714,7 @@ object ShuffleBlockFetcherIterator {
*/
private[storage] case class SuccessFetchResult(
blockId: BlockId,
mapId: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need the map index here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I follow correctly, the reason is that even a SuccessFetchResult still sometimes results in a FetchFailure back to driver (eg. error decompressing the buffer). And the FetchFailure needs the mapIndex, because the mapstatus is still stored by mapIndex, so this tells us what we need to remove in the handling in DAGScheduler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right. Here we need to guarantee all paths to throwFetchFailedException has mapIndex pass though, even a SuccessFetchResult still can trigger fetch failed exception.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do I understand right that here in DAGScheduler:

} else if (mapId != -1) {
// Mark the map whose fetch failed as broken in the map stage
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

you'd only have mapId == 1 if using the old shuffle protocol? is that worth an assert?

I also find it tough to follow whether an id is the index within the stage or the global task id -- @cloud-fan pointed out a couple of cases where things could be named mapIndex. Its unfortunate we already have confusing names here ... what do you think of using mapTid consistently for all the places you mean the global id? I am at least used to seeing "TID" in spark logs for the global id, so maybe that would make it more clear?

handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
taskIdMapsForShuffle.synchronized {
taskIdMapsForShuffle.putIfAbsent(handle.shuffleId, ArrayBuffer.empty[Long])
taskIdMapsForShuffle.get(handle.shuffleId).append(context.taskAttemptId())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're trying to protect concurrent access to the ArrayBuffer[Long] with that synchronized block, right? minor, but you could avoid locking the entire map, and instead do

val mapTaskIds = taskIdMapsForShuffle.putIfAbsent(handle.shuffleId, ArrayBuffer.empty[Long])
mapTaskIds.synchronized { mapTaskIds.append(context.taskAttemptId()) }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, great thanks for your advice for the optimization! Done in 4bd9e00.
(I think you mean taskIdMapsForShuffle.computeIfAbsent, use it in the new commit.)

*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, ArrayBuffer[Long]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this map is only touched on executors, right? so its only the number of tasks which run on one executor which matter here.

@@ -706,6 +714,7 @@ object ShuffleBlockFetcherIterator {
*/
private[storage] case class SuccessFetchResult(
blockId: BlockId,
mapId: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I follow correctly, the reason is that even a SuccessFetchResult still sometimes results in a FetchFailure back to driver (eg. error decompressing the buffer). And the FetchFailure needs the mapIndex, because the mapstatus is still stored by mapIndex, so this tells us what we need to remove in the handling in DAGScheduler.

@xuanyuanking
Copy link
Member Author

@squito Thanks for reviewing this, as @cloud-fan's suggestion, I'll do the follow-up work to normalize all the names by using mapIndex and mapId, mapIndex indecate the index of this map task in the task set or stage, mapId refers to the unique id for this task. WDYT?

@SparkQA
Copy link

SparkQA commented Aug 31, 2019

Test build #109986 has finished for PR 25620 at commit 4bd9e00.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 3, 2019

I agree with @squito that mapId is vague now. How about mapTaskId and mapIndex?


@Override
public boolean hasNext() {
// mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add check logic here to be safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Xingbo mean a double check here? Basically there's existing checking for both the length and non-empty.

if (blockIds.length == 0) {
throw new IllegalArgumentException("Zero-sized blockIds array");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if the place is not super performance critical I'd prefer a double check here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done the double-check in 00e78b2.

// Before find missing partition, do the intermediate state clean work first.
// The operation here can make sure for the intermediate stage, `findMissingPartitions()`
// returns all partitions every time.
stage match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why not do unregister during failure handling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's for the scenario of ExecutorLost. While executor lost happened, there's possible for the indeterminate stage rerun triggered by submitParentStage.
So if we only unregister during failure handling, only fetch failed stage and its parent stage do unregister, that logic would not cover the scenario of its parent's parent stage is indeterminate and have missing tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can still put the unregister logic into the block:
https://github.com/apache/spark/pull/25620/files#diff-6a9ff7fb74fd490a50462d45db2d5e11R1626 ?

Copy link
Member Author

@xuanyuanking xuanyuanking Sep 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the place points out by Xingbo is just my first-time attempt :), I found that's not enough to fix this problem during doing the integrate test.
This is because the logic we calculate stagesToRollback in collectStagesToRollback, only care about the downstream stages of the current fetch failed stage. For upstream indeterminate stages, put the unregister logic in failure handling didn't cover. So the correctness bug will still happen.
Also the newly added UT SPARK-25341: retry all the succeeding stages when the map stage is indeterminate also covered this check, if we do the unregister in failure handling, the UT will fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about putting this here, as you'll see lower down in this method there is some handling for the case that submitMissingTasks is called but there are actually no tasks to run. I'm not seeing how that happens now, but your change would make those cases always re-evaluate all partitions of the stage.

I think @jiangxb1987 suggestion makes sense, couldn't you do it the unregistering there? I agree the logic is currently insufficient as its not building up the full set of stages that need to be recomputed, but maybe we need to combine both.

or maybe we understand the old cases of submitting a stage with no missing partitions and my concern is not relevant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed comment.
I tried to combine both at first but seems kind of duplicated code.
For the concerned about no missing partitions stage submission, I checkedshuffleMapStage.isAvailable for the unregister in aa3a409, what we need here is making sure the partially completed indeterminate stage will be whole stage rerun.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see people already commented on the mapId vs. mapIndex thing, so not gonna touch that.

startPartition: Int,
endPartition: Int,
useOldFetchProtocol: Boolean)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent more or less. It should be at a different indent level than the method body.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 3bfb6e6.
Pretty confuse before about how to address the line starting with :, thanks for your guidance :)

*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, ArrayBuffer[Long]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem too worrying at first (even 10MB per stage isn't that much overhead if it's cleaned up eventually). But perhaps using OpenHashSet can help with larger stages (vs. an ArrayBuffer), although it will use more memory for smaller ones.

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"),

// [SPARK-25341][CORE] Support rolling back a shuffle map stage and re-generate the shuffle files
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe GenerateMIMAIgnore isn't really ignoring classes annotated with @Private.

@@ -1047,6 +1047,14 @@ package object config {
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(2000)

private[spark] val SHUFFLE_USE_OLD_FETCH_PROTOCOL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you connect to an old shuffle service without setting this? Will things just fail (and always fail)?

Probably ok if they do. Although a more user-friendly error, if possible, might be good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you connect to an old shuffle service without setting this? Will things just fail (and always fail)?

We'll always fail with the UnsupportedOperationException:Unexpected message: FetchShuffleBlocks. This work is done in #24565.

Although a more user-friendly error, if possible, might be good.

Yeah, in this PR we had detailed log in DAGScheduler.
Let me think how to add a more user-friendly error in the follow-up work for the old shuffle service, currently, we only have some doc in the migration guide. https://github.com/apache/spark/pull/24565/files#diff-3f19ec3d15dcd8cd42bb25dde1c5c1a9R139

@xuanyuanking
Copy link
Member Author

Great thanks for all comments, something inserted yesterday. I'll address comments from Xingbo and Vanzin, change all name related by Wenchen and Squito in today(Beijing time).

@SparkQA
Copy link

SparkQA commented Sep 5, 2019

Test build #110185 has finished for PR 25620 at commit b527fe7.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ShuffleBlockId(shuffleId: Int, mapTaskId: Long, reduceId: Int) extends BlockId
  • case class ShuffleDataBlockId(shuffleId: Int, mapTaskId: Long, reduceId: Int) extends BlockId
  • case class ShuffleIndexBlockId(shuffleId: Int, mapTaskId: Long, reduceId: Int) extends BlockId

@SparkQA
Copy link

SparkQA commented Sep 5, 2019

Test build #110186 has finished for PR 25620 at commit 3bfb6e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ShuffleBlockId(shuffleId: Int, mapTaskId: Long, reduceId: Int) extends BlockId
  • case class ShuffleDataBlockId(shuffleId: Int, mapTaskId: Long, reduceId: Int) extends BlockId
  • case class ShuffleIndexBlockId(shuffleId: Int, mapTaskId: Long, reduceId: Int) extends BlockId

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110229 has finished for PR 25620 at commit 8b51720.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -88,23 +88,23 @@ private class ShuffleStatus(numPartitions: Int) {
* Register a map output. If there is already a registered location for the map output then it
* will be replaced by the new location.
*/
def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized {
if (mapStatuses(mapId) == null) {
def addMapOutput(mapIndex: Int, status: MapStatus): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a place that I think mapIndex makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out.

}

/**
* Remove the map output which was served by the specified block manager.
* This is a no-op if there is no registered map output or if the registered output is from a
* different block manager.
*/
def removeMapOutput(mapId: Int, bmAddress: BlockManagerId): Unit = synchronized {
if (mapStatuses(mapId) != null && mapStatuses(mapId).location == bmAddress) {
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Contributor

@xuanyuanking thanks for the renaming work! After taking a quick look, I think we can go further.

It looks to me that we should only use the name mapIndex and mapTaskId when we really mean it. e.g. ShuffleStatus.addMapOutput, MapStatus.mapTaskId, etc. When we refer to an identifier of the map, then we should use mapId.

There are only a few places that we explicitly mean mapIndex and/or mapTaskId, we can keep the name mapId unchange in other places to reduce the diff.

What do you think?

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Sep 18, 2019

Sure, done this in c86f6cc, keeping mapId works in most cases and we can still get the real meaning by context.

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110923 has finished for PR 25620 at commit caa949d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some code style comments

*/
case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long)]) {
case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long, Int)]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's a tuple3 with int and long elements. I think it's better to create a class for it to make the code easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, add FetchBlockInfo class for this in d2215b2.

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110924 has finished for PR 25620 at commit c86f6cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110930 has finished for PR 25620 at commit 28c9f9c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110928 has finished for PR 25620 at commit d2215b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FetchRequest(address: BlockManagerId, blocks: Seq[FetchBlockInfo])

@cloud-fan
Copy link
Contributor

@squito @vanzin @jiangxb1987 do you have any more comments? This looks good to me now and I'd like to merge it within a few days if none of you object.

@cloud-fan cloud-fan closed this in f725d47 Sep 23, 2019
@cloud-fan
Copy link
Contributor

since there is no objection, I'm merging it, thanks!

@xuanyuanking
Copy link
Member Author

Finally! Thank you all for the review.

@@ -172,7 +172,7 @@ public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
long mapId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking why change this from int to long? Is it possible that a mapId can be greater than 2^31?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous the map id is the index of the mapper, and can get conflicts when we re-run the task. Now the map id is the task id, which is unique. task id needs to be long.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after this patch, we set mapId by using the taskAttemptId of map task, which is a unique Id within the same SparkContext. You can see the comment #25620 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants