Skip to content

Commit

Permalink
1.[FEATURE] Add metrics about fetch chunk size, commit files time and…
Browse files Browse the repository at this point in the history
… get reducer file time.#660

2.[FEATURE] Add a configuration to enable map id filter mechanism.#662
3.[FEATURE][OPTIMIZE] Add map ids info for each PartitionLocation to enable filtering for map range reading.#607
  • Loading branch information
FMX committed Sep 28, 2022
1 parent f5044eb commit 1b95437
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 59 deletions.
1 change: 1 addition & 0 deletions CONFIGURATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ memory. Empirically, RSS worker off-heap memory should be set to `(numDirs * que
| spark.rss.shuffle.writer.mode | hash | RSS support two different shuffle writers. Hash-based shuffle writer works fine when shuffle partition count is normal. Sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. |
| spark.rss.client.compression.codec | lz4 | The codec used to compress shuffle data. By default, RSS provides two codecs: `lz4` and `zstd`. |
| spark.rss.client.compression.zstd.level | 1 | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. |
| spark.rss.range.read.filter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. |

### RSS Master Configurations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ public RssInputStream readPartition(String applicationId, int shuffleId, int red
public RssInputStream readPartition(String applicationId, int shuffleId, int reduceId,
int attemptNumber, int startMapIndex, int endMapIndex) throws IOException {
ReduceFileGroups fileGroups = reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> {
long getReducerFileGroupStartTime = System.nanoTime();
try {
if (driverRssMetaService == null) {
logger.warn("Driver endpoint is null!");
Expand All @@ -843,6 +844,10 @@ public RssInputStream readPartition(String applicationId, int shuffleId, int red
driverRssMetaService.<GetReducerFileGroupResponse>askSync(getReducerFileGroup, classTag);

if (response != null && response.status() == StatusCode.Success) {
logger.info(
"Shuffle {} request reducer file group success using time:{} ms",
shuffleId,
(System.nanoTime() - getReducerFileGroupStartTime) / 1000_000);
return new ReduceFileGroups(response.fileGroup(), response.attempts());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
Expand Down Expand Up @@ -115,6 +116,8 @@ private static final class RssInputStreamImpl extends RssInputStream {
// mapId, attempId, batchId, size
private final int BATCH_HEADER_SIZE = 4 * 4;
private final byte[] sizeBuf = new byte[BATCH_HEADER_SIZE];
private LongAdder skipCount = new LongAdder();
private final boolean rangeReadFilter;

RssInputStreamImpl(
RssConf conf,
Expand All @@ -140,6 +143,7 @@ private static final class RssInputStreamImpl extends RssInputStream {
this.attemptNumber = attemptNumber;
this.startMapIndex = startMapIndex;
this.endMapIndex = endMapIndex;
this.rangeReadFilter = RssConf.rangeReadFilterEnabled(conf);

maxInFlight = RssConf.fetchChunkMaxReqsInFlight(conf);

Expand All @@ -153,30 +157,60 @@ private static final class RssInputStreamImpl extends RssInputStream {
moveToNextReader();
}

private void moveToNextReader() throws IOException {
if (currentReader != null) {
currentReader.close();
private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) {
if (!rangeReadFilter) {
return false;
}
if (endMapIndex == Integer.MAX_VALUE) {
return false;
}
for (int i = startMapIndex; i < endMapIndex; i++) {
if (location.getMapIdBitMap().contains(i)) {
return false;
}
}
return true;
}

currentReader = createReader(locations[fileIndex]);
logger.debug("Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read , " +
"get chunks size {}", locations[fileIndex], startMapIndex, endMapIndex,
fileIndex, locations.length, currentReader.numChunks);
while (currentReader.numChunks < 1 && fileIndex < locations.length - 1) {
private PartitionLocation nextReadableLocation() {
int locationCount = locations.length;
if (fileIndex >= locationCount) {
return null;
}
PartitionLocation currentLocation = locations[fileIndex];
while (skipLocation(startMapIndex, endMapIndex, currentLocation)) {
skipCount.increment();
fileIndex++;
if (fileIndex == locationCount) {
return null;
}
currentLocation = locations[fileIndex];
}
return currentLocation;
}

private void moveToNextReader() throws IOException {
if (currentReader != null) {
currentReader.close();
currentReader = createReader(locations[fileIndex]);
logger.debug("Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read , " +
"get chunks size {}", locations[fileIndex], startMapIndex, endMapIndex,
fileIndex, locations.length, currentReader.numChunks);
currentReader = null;
}
if (currentReader.numChunks > 0) {
currentChunk = currentReader.next();
fileIndex++;
} else {
PartitionLocation currentLocation = nextReadableLocation();
if (currentLocation == null) {
return;
}
currentReader = createReader(currentLocation);
fileIndex++;
while (!currentReader.hasNext()) {
currentReader.close();
currentReader = null;
currentLocation = nextReadableLocation();
if (currentLocation == null) {
return;
}
currentReader = createReader(currentLocation);
fileIndex++;
}
currentChunk = currentReader.next();
}

private PartitionReader createReader(PartitionLocation location) throws IOException {
Expand Down Expand Up @@ -246,6 +280,12 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void close() {
int locationsCount = locations.length;
logger.info(
"total location count {} read {} skip {}",
locationsCount,
locationsCount - skipCount.sum(),
skipCount.sum());
if (currentChunk != null) {
logger.debug("Release chunk {}!", currentChunk);
currentChunk.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package com.aliyun.emr.rss.client.write

import java.util
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.LongAdder

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.Random

import io.netty.util.internal.ConcurrentSet
import org.roaringbitmap.RoaringBitmap

import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.haclient.RssHARetryClient
Expand All @@ -49,6 +51,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
private val splitThreshold = RssConf.partitionSplitThreshold(conf)
private val splitMode = RssConf.partitionSplitMode(conf)
private val storageHint = RssConf.storageHint(conf)
private val rangeReadFilter = RssConf.rangeReadFilterEnabled(conf)

private val unregisterShuffleTime = new ConcurrentHashMap[Int, Long]()

Expand Down Expand Up @@ -611,10 +614,13 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
val committedSlaveIds = new ConcurrentSet[String]
val failedMasterIds = new ConcurrentSet[String]
val failedSlaveIds = new ConcurrentSet[String]
val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]()

val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
val commitFilesFailedWorkers = new ConcurrentSet[WorkerInfo]

val commitFileStartTime = System.nanoTime()

val parallelism = Math.min(workerSnapshots(shuffleId).size(),
RssConf.rpcMaxParallelism(conf))
ThreadUtils.parmap(
Expand Down Expand Up @@ -663,6 +669,9 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
if (res.failedSlaveIds != null) {
failedSlaveIds.addAll(res.failedSlaveIds)
}
if (!res.committedMapIdBitMap.isEmpty) {
committedMapIdBitmap.putAll(res.committedMapIdBitMap)
}
}
}

Expand Down Expand Up @@ -695,9 +704,11 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
val committedPartitions = new util.HashMap[String, PartitionLocation]
committedMasterIds.asScala.foreach { id =>
committedPartitions.put(id, masterPartMap.get(id))
masterPartMap.get(id).setMapIdBitMap(committedMapIdBitmap.get(id))
}
committedSlaveIds.asScala.foreach { id =>
val slavePartition = slavePartMap.get(id)
slavePartition.setMapIdBitMap(committedMapIdBitmap.get(id))
val masterPartition = committedPartitions.get(id)
if (masterPartition ne null) {
masterPartition.setPeer(slavePartition)
Expand All @@ -719,6 +730,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
fileGroups(i) = sets(i).toArray(new Array[PartitionLocation](0))
i += 1
}
logInfo(s"Shuffle $shuffleId commit files complete " +
s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms")
}

// reply
Expand Down Expand Up @@ -783,7 +796,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
} else {
val res = requestReserveSlots(entry._1.endpoint,
ReserveSlots(applicationId, shuffleId, entry._2._1, entry._2._2, splitThreshold,
splitMode, storageHint))
splitMode, storageHint, rangeReadFilter))
if (res.status.equals(StatusCode.Success)) {
logDebug(s"Successfully allocated " +
s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" +
Expand Down Expand Up @@ -1129,7 +1142,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
} catch {
case e: Exception =>
logError(s"AskSync CommitFiles for ${message.shuffleId} failed.", e)
CommitFilesResponse(StatusCode.Failed, null, null, message.masterIds, message.slaveIds)
CommitFilesResponse(StatusCode.Failed, null, null,
message.masterIds, message.slaveIds, Map.empty[String, RoaringBitmap].asJava)
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@
<artifactId>scalatest-funsuite_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import java.io.Serializable;

import org.roaringbitmap.RoaringBitmap;

import com.aliyun.emr.rss.common.meta.WorkerInfo;
import com.aliyun.emr.rss.common.util.Utils;

public class PartitionLocation implements Serializable {
public enum Mode {
Expand Down Expand Up @@ -61,6 +64,7 @@ public static PartitionLocation.Mode getMode(byte mode) {
private Mode mode;
private PartitionLocation peer;
private StorageHint storageHint;
private RoaringBitmap mapIdBitMap = null;

public PartitionLocation(PartitionLocation loc) {
this.reduceId = loc.reduceId;
Expand All @@ -85,7 +89,7 @@ public PartitionLocation(
int replicatePort,
Mode mode) {
this(reduceId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort,
mode, null, StorageHint.MEMORY);
mode, null, StorageHint.MEMORY,null);
}

public PartitionLocation(
Expand All @@ -99,7 +103,7 @@ public PartitionLocation(
Mode mode,
StorageHint storageHint) {
this(reduceId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort,
mode, null, storageHint);
mode, null, storageHint,null);
}

public PartitionLocation(
Expand All @@ -113,7 +117,7 @@ public PartitionLocation(
Mode mode,
PartitionLocation peer) {
this(reduceId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort, mode, peer,
StorageHint.MEMORY);
StorageHint.MEMORY,null);
}

public PartitionLocation(
Expand All @@ -126,7 +130,8 @@ public PartitionLocation(
int replicatePort,
Mode mode,
PartitionLocation peer,
StorageHint hint) {
StorageHint hint,
RoaringBitmap mapIdBitMap) {
this.reduceId = reduceId;
this.epoch = epoch;
this.host = host;
Expand All @@ -137,6 +142,7 @@ public PartitionLocation(
this.mode = mode;
this.peer = peer;
this.storageHint = hint;
this.mapIdBitMap = mapIdBitMap;
}

public int getReduceId()
Expand Down Expand Up @@ -271,22 +277,33 @@ public WorkerInfo getWorker() {
return new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort);
}

public RoaringBitmap getMapIdBitMap() {
return mapIdBitMap;
}

public void setMapIdBitMap(RoaringBitmap mapIdBitMap) {
this.mapIdBitMap = mapIdBitMap;
}

public static PartitionLocation fromPbPartitionLocation(TransportMessages.PbPartitionLocation
pbPartitionLocation) {
Mode mode = Mode.Master;
if (pbPartitionLocation.getMode() == TransportMessages.PbPartitionLocation.Mode.Slave) {
mode = Mode.Slave;
}

PartitionLocation partitionLocation = new PartitionLocation(pbPartitionLocation.getReduceId(),
pbPartitionLocation.getEpoch(),
pbPartitionLocation.getHost(),
pbPartitionLocation.getRpcPort(),
pbPartitionLocation.getPushPort(),
pbPartitionLocation.getFetchPort(),
pbPartitionLocation.getReplicatePort(),
mode,
PartitionLocation.StorageHint.values()[pbPartitionLocation.getStorageHintOrdinal()]);
PartitionLocation partitionLocation = new PartitionLocation(
pbPartitionLocation.getReduceId(),
pbPartitionLocation.getEpoch(),
pbPartitionLocation.getHost(),
pbPartitionLocation.getRpcPort(),
pbPartitionLocation.getPushPort(),
pbPartitionLocation.getFetchPort(),
pbPartitionLocation.getReplicatePort(),
mode,
null,
PartitionLocation.StorageHint.values()[pbPartitionLocation.getStorageHintOrdinal()],
Utils.byteStringToRoaringBitmap(pbPartitionLocation.getMapIdBitmap()));

if (pbPartitionLocation.hasPeer()) {
TransportMessages.PbPartitionLocation peerPb = pbPartitionLocation.getPeer();
Expand All @@ -297,7 +314,8 @@ public static PartitionLocation fromPbPartitionLocation(TransportMessages.PbPart
PartitionLocation peerLocation = new PartitionLocation(peerPb.getReduceId(),
peerPb.getEpoch(), peerPb.getHost(), peerPb.getRpcPort(), peerPb.getPushPort(),
peerPb.getFetchPort(), peerPb.getReplicatePort(), peerMode, partitionLocation,
PartitionLocation.StorageHint.values()[peerPb.getStorageHintOrdinal()]);
PartitionLocation.StorageHint.values()[peerPb.getStorageHintOrdinal()],
Utils.byteStringToRoaringBitmap(peerPb.getMapIdBitmap()));
partitionLocation.setPeer(peerLocation);
}

Expand All @@ -321,6 +339,8 @@ public static TransportMessages.PbPartitionLocation toPbPartitionLocation(Partit
pbPartitionLocationBuilder.setFetchPort(partitionLocation.getFetchPort());
pbPartitionLocationBuilder.setReplicatePort(partitionLocation.getReplicatePort());
pbPartitionLocationBuilder.setStorageHintOrdinal(partitionLocation.getStorageHint().ordinal());
pbPartitionLocationBuilder.setMapIdBitmap(
Utils.roaringBitmapToByteString(partitionLocation.getMapIdBitMap()));

if (partitionLocation.getPeer() != null) {
TransportMessages.PbPartitionLocation.Builder peerPbPartionLocationBuilder = TransportMessages
Expand All @@ -339,6 +359,8 @@ public static TransportMessages.PbPartitionLocation toPbPartitionLocation(Partit
peerPbPartionLocationBuilder.setReplicatePort(partitionLocation.getPeer().getReplicatePort());
peerPbPartionLocationBuilder.setStorageHintOrdinal(
partitionLocation.getPeer().getStorageHint().ordinal());
peerPbPartionLocationBuilder.setMapIdBitmap(Utils.roaringBitmapToByteString(
partitionLocation.getPeer().getMapIdBitMap()));
pbPartitionLocationBuilder.setPeer(peerPbPartionLocationBuilder.build());
}

Expand Down
3 changes: 3 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ message PbPartitionLocation {
int32 replicatePort = 8;
PbPartitionLocation peer = 9;
int32 storageHintOrdinal = 10;
bytes mapIdBitmap = 11;
}

message PbWorkerResource {
Expand Down Expand Up @@ -263,6 +264,7 @@ message PbReserveSlots {
int64 splitThreshold = 5;
int32 splitMode = 6;
int32 storageHintOrdinal = 7;
bool rangeReadFilter = 8;
}

message PbReserveSlotsResponse {
Expand All @@ -284,6 +286,7 @@ message PbCommitFilesResponse {
repeated string committedSlaveIds = 3;
repeated string failedMasterIds = 4;
repeated string failedSlaveIds = 5;
map<string, bytes> mapIdBitmap = 10;
}

message PbDestroy {
Expand Down
Loading

0 comments on commit 1b95437

Please sign in to comment.