Skip to content

Commit

Permalink
rename mapId to mapTaskId and mapIndex, address commments
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanyuanking committed Sep 5, 2019
1 parent 7518ddf commit 3bfb6e6
Show file tree
Hide file tree
Showing 34 changed files with 199 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,32 @@ private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {
private final String appId;
private final String execId;
private final int shuffleId;
private final long[] mapIds;
private final long[] mapTaskIds;
private final int[][] reduceIds;

ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
appId = msg.appId;
execId = msg.execId;
shuffleId = msg.shuffleId;
mapIds = msg.mapIds;
mapTaskIds = msg.mapTaskIds;
reduceIds = msg.reduceIds;
}

@Override
public boolean hasNext() {
// mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
// must have non-empty mapIds and reduceIds, see the checking logic in OneForOneBlockFetcher.
return mapIdx < mapIds.length && reduceIdx < reduceIds[mapIdx].length;
// mapTaskIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
// must have non-empty mapTaskIds and reduceIds, see the checking logic in
// OneForOneBlockFetcher.
return mapIdx < mapTaskIds.length && reduceIdx < reduceIds[mapIdx].length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(
appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
reduceIdx += 1;
if (reduceIdx == reduceIds[mapIdx].length) {
// Reach the end of current mapId, move to next mapId and its reduceIds.
appId, execId, shuffleId, mapTaskIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
if (reduceIdx < reduceIds[mapIdx].length - 1) {
reduceIdx += 1;
} else {
reduceIdx = 0;
mapIdx += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ public void registerExecutor(
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapTaskId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
long mapId,
long mapTaskId,
int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
return getSortBasedShuffleBlockData(executor, shuffleId, mapTaskId, reduceId);
}

public ManagedBuffer getRddBlockData(
Expand Down Expand Up @@ -291,22 +291,23 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
}

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapTaskId_0.index" into a data
* file called "shuffle_ShuffleId_MapTaskId_0.data".
* This logic is from IndexShuffleBlockResolver, and the block id format is from
* ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, long mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, long mapTaskId, int reduceId) {
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private boolean isShuffleBlocks(String[] blockIds) {

/**
* Analyze the pass in blockIds and create FetchShuffleBlocks message.
* The blockIds has been sorted by mapId and reduceId. It's produced in
* The blockIds has been sorted by mapTaskId and reduceId. It's produced in
* org.apache.spark.MapOutputTracker.convertMapStatuses.
*/
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
Expand All @@ -121,21 +121,21 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
long mapId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
long mapTaskId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapTaskId)) {
mapIdToReduceIds.put(mapTaskId, new ArrayList<>());
}
mapIdToReduceIds.get(mapId).add(blockIdParts.right);
mapIdToReduceIds.get(mapTaskId).add(blockIdParts.right);
}
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapIds.length][];
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
long[] mapTaskIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapTaskIds.length][];
for (int i = 0; i < mapTaskIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapTaskIds[i]));
}
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr);
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIdArr);
}

/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
/** Split the shuffleBlockId and return shuffleId, mapTaskId and reduceId. */
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
String[] blockIdParts = blockId.split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
public final String appId;
public final String execId;
public final int shuffleId;
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
public final long[] mapIds;
// The length of mapTaskIds must equal to reduceIds.size(), for the i-th mapTaskId in mapTaskIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map task.
public final long[] mapTaskIds;
public final int[][] reduceIds;

public FetchShuffleBlocks(
String appId,
String execId,
int shuffleId,
long[] mapIds,
long[] mapTaskIds,
int[][] reduceIds) {
this.appId = appId;
this.execId = execId;
this.shuffleId = shuffleId;
this.mapIds = mapIds;
this.mapTaskIds = mapTaskIds;
this.reduceIds = reduceIds;
assert(mapIds.length == reduceIds.length);
assert(mapTaskIds.length == reduceIds.length);
}

@Override
Expand All @@ -60,7 +60,7 @@ public String toString() {
.add("appId", appId)
.add("execId", execId)
.add("shuffleId", shuffleId)
.add("mapIds", Arrays.toString(mapIds))
.add("mapTaskIds", Arrays.toString(mapTaskIds))
.add("reduceIds", Arrays.deepToString(reduceIds))
.toString();
}
Expand All @@ -75,7 +75,7 @@ public boolean equals(Object o) {
if (shuffleId != that.shuffleId) return false;
if (!appId.equals(that.appId)) return false;
if (!execId.equals(that.execId)) return false;
if (!Arrays.equals(mapIds, that.mapIds)) return false;
if (!Arrays.equals(mapTaskIds, that.mapTaskIds)) return false;
return Arrays.deepEquals(reduceIds, that.reduceIds);
}

Expand All @@ -84,7 +84,7 @@ public int hashCode() {
int result = appId.hashCode();
result = 31 * result + execId.hashCode();
result = 31 * result + shuffleId;
result = 31 * result + Arrays.hashCode(mapIds);
result = 31 * result + Arrays.hashCode(mapTaskIds);
result = 31 * result + Arrays.deepHashCode(reduceIds);
return result;
}
Expand All @@ -98,7 +98,7 @@ public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(execId)
+ 4 /* encoded length of shuffleId */
+ Encoders.LongArrays.encodedLength(mapIds)
+ Encoders.LongArrays.encodedLength(mapTaskIds)
+ 4 /* encoded length of reduceIds.size() */
+ encodedLengthOfReduceIds;
}
Expand All @@ -108,7 +108,7 @@ public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, execId);
buf.writeInt(shuffleId);
Encoders.LongArrays.encode(buf, mapIds);
Encoders.LongArrays.encode(buf, mapTaskIds);
buf.writeInt(reduceIds.length);
for (int[] ids: reduceIds) {
Encoders.IntArrays.encode(buf, ids);
Expand All @@ -119,12 +119,12 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String execId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
long[] mapIds = Encoders.LongArrays.decode(buf);
long[] mapTaskIds = Encoders.LongArrays.decode(buf);
int reduceIdsSize = buf.readInt();
int[][] reduceIds = new int[reduceIdsSize][];
for (int i = 0; i < reduceIdsSize; i++) {
reduceIds[i] = Encoders.IntArrays.decode(buf);
}
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds);
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ public interface ShuffleExecutorComponents {
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param mapId An ID of the map task. The ID is unique within this Spark application.
* @param mapTaskId An ID of the map task. The ID is unique within this Spark application.
* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
long mapId,
long mapTaskId,
int numPartitions) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final Partitioner partitioner;
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final long mapId;
private final long mapTaskId;
private final Serializer serializer;
private final ShuffleExecutorComponents shuffleExecutorComponents;

Expand All @@ -105,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BypassMergeSortShuffleWriter(
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
long mapId,
long mapTaskId,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
ShuffleExecutorComponents shuffleExecutorComponents) {
Expand All @@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.mapTaskId = mapTaskId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
Expand All @@ -127,12 +127,12 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, numPartitions);
.createMapOutputWriter(shuffleId, mapTaskId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -166,7 +166,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final Partitioner partitioner;
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final long mapId;
private final long mapTaskId;
private final TaskContext taskContext;
private final SparkConf sparkConf;
private final boolean transferToEnabled;
Expand Down Expand Up @@ -133,7 +133,7 @@ public UnsafeShuffleWriter(
this.blockManager = blockManager;
this.shuffleBlockResolver = shuffleBlockResolver;
this.memoryManager = memoryManager;
this.mapId = taskContext.taskAttemptId();
this.mapTaskId = taskContext.taskAttemptId();
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
Expand Down Expand Up @@ -230,7 +230,7 @@ void closeAndWriteOutput() throws IOException {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapTaskId);
final File tmp = Utils.tempFileWith(output);
try {
try {
Expand All @@ -242,13 +242,14 @@ void closeAndWriteOutput() throws IOException {
}
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapTaskId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public void initializeExecutor(String appId, String execId) {
@Override
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
long mapId,
long mapTaskId,
int numPartitions) {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
}
return new LocalDiskShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, blockResolver, sparkConf);
shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {
LoggerFactory.getLogger(LocalDiskShuffleMapOutputWriter.class);

private final int shuffleId;
private final long mapId;
private final long mapTaskId;
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
private final int bufferSize;
Expand All @@ -63,18 +63,18 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {

public LocalDiskShuffleMapOutputWriter(
int shuffleId,
long mapId,
long mapTaskId,
int numPartitions,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.mapTaskId = mapTaskId;
this.blockResolver = blockResolver;
this.bufferSize =
(int) (long) sparkConf.get(
package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
this.partitionLengths = new long[numPartitions];
this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
this.outputFile = blockResolver.getDataFile(shuffleId, mapTaskId);
this.outputTempFile = null;
}

Expand All @@ -99,7 +99,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
public long[] commitAllPartitions() throws IOException {
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
blockResolver.writeIndexFileAndCommit(shuffleId, mapTaskId, partitionLengths, resolvedTmp);
return partitionLengths;
}

Expand Down
Loading

0 comments on commit 3bfb6e6

Please sign in to comment.