Skip to content

Commit

Permalink
Resolve conflict with SPARK-28209
Browse files Browse the repository at this point in the history
Add indeterminate stage rerun support in shuffle writer api

(cherry picked from commit 99c2b4a)
Signed-off-by: Yuanjian Li <[email protected]>
  • Loading branch information
xuanyuanking committed Aug 5, 2019
1 parent cae500a commit dc9759b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,20 @@ public interface ShuffleExecutorComponents {
/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to,
* it equals the stage attempt number while the stage is indeterminate
* and -1 on the contrary.
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
int mapId,
long mapTaskAttemptId,
int numPartitions) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
shuffleId, -1, mapId, mapTaskAttemptId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void initializeExecutor(String appId, String execId) {
@Override
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
int mapId,
long mapTaskAttemptId,
int numPartitions) {
Expand All @@ -66,6 +67,6 @@ public ShuffleMapOutputWriter createMapOutputWriter(
"Executor components must be initialized before getting writers.");
}
return new LocalDiskShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, blockResolver, sparkConf);
shuffleId, shuffleGenerationId, mapId, numPartitions, blockResolver, sparkConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {
LoggerFactory.getLogger(LocalDiskShuffleMapOutputWriter.class);

private final int shuffleId;
private final int shuffleGenerationId;
private final int mapId;
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
Expand All @@ -63,11 +64,13 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {

public LocalDiskShuffleMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
int mapId,
int numPartitions,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.shuffleGenerationId = shuffleGenerationId;
this.mapId = mapId;
this.blockResolver = blockResolver;
this.bufferSize =
Expand Down Expand Up @@ -99,7 +102,8 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
public void commitAllPartitions() throws IOException {
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
blockResolver.writeIndexFileAndCommit(
shuffleId, mapId, partitionLengths, resolvedTmp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA
}
mapOutputWriter = new LocalDiskShuffleMapOutputWriter(
0,
-1,
0,
NUM_PARTITIONS,
blockResolver,
Expand Down

0 comments on commit dc9759b

Please sign in to comment.