diff --git a/CONFIGURATION_GUIDE.md b/CONFIGURATION_GUIDE.md index 17eb3234579..2f3cdcf29ca 100644 --- a/CONFIGURATION_GUIDE.md +++ b/CONFIGURATION_GUIDE.md @@ -44,6 +44,7 @@ memory. Empirically, RSS worker off-heap memory should be set to `(numDirs * que | spark.rss.shuffle.writer.mode | hash | RSS support two different shuffle writers. Hash-based shuffle writer works fine when shuffle partition count is normal. Sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. | | spark.rss.client.compression.codec | lz4 | The codec used to compress shuffle data. By default, RSS provides two codecs: `lz4` and `zstd`. | | spark.rss.client.compression.zstd.level | 1 | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | +| spark.rss.range.read.filter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. | ### RSS Master Configurations diff --git a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java index 1bc9693348e..afb322fbabb 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java @@ -829,6 +829,7 @@ public RssInputStream readPartition(String applicationId, int shuffleId, int red public RssInputStream readPartition(String applicationId, int shuffleId, int reduceId, int attemptNumber, int startMapIndex, int endMapIndex) throws IOException { ReduceFileGroups fileGroups = reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> { + long getReducerFileGroupStartTime = System.nanoTime(); try { if (driverRssMetaService == null) { logger.warn("Driver endpoint is null!"); @@ -843,6 +844,10 @@ public RssInputStream readPartition(String applicationId, int shuffleId, int red driverRssMetaService.askSync(getReducerFileGroup, classTag); if (response != null && response.status() == StatusCode.Success) { + logger.info( + "Shuffle {} request reducer file group success using time:{} ms", + shuffleId, + (System.nanoTime() - getReducerFileGroupStartTime) / 1000_000); return new ReduceFileGroups(response.fileGroup(), response.attempts()); } } catch (Exception e) { diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index db38dafe131..947d6ca7544 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import io.netty.buffer.ByteBuf; import org.slf4j.Logger; @@ -115,6 +116,8 @@ private static final class RssInputStreamImpl extends RssInputStream { // mapId, attempId, batchId, size private final int BATCH_HEADER_SIZE = 4 * 4; private final byte[] sizeBuf = new byte[BATCH_HEADER_SIZE]; + private LongAdder skipCount = new LongAdder(); + private final boolean rangeReadFilter; RssInputStreamImpl( RssConf conf, @@ -140,6 +143,7 @@ private static final class RssInputStreamImpl extends RssInputStream { this.attemptNumber = attemptNumber; this.startMapIndex = startMapIndex; this.endMapIndex = endMapIndex; + this.rangeReadFilter = RssConf.rangeReadFilterEnabled(conf); maxInFlight = RssConf.fetchChunkMaxReqsInFlight(conf); @@ -153,30 +157,60 @@ private static final class RssInputStreamImpl extends RssInputStream { moveToNextReader(); } - private void moveToNextReader() throws IOException { - if (currentReader != null) { - currentReader.close(); + private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) { + if (!rangeReadFilter) { + return false; + } + if (endMapIndex == Integer.MAX_VALUE) { + return false; } + for (int i = startMapIndex; i < endMapIndex; i++) { + if (location.getMapIdBitMap().contains(i)) { + return false; + } + } + return true; + } - currentReader = createReader(locations[fileIndex]); - logger.debug("Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read , " + - "get chunks size {}", locations[fileIndex], startMapIndex, endMapIndex, - fileIndex, locations.length, currentReader.numChunks); - while (currentReader.numChunks < 1 && fileIndex < locations.length - 1) { + private PartitionLocation nextReadableLocation() { + int locationCount = locations.length; + if (fileIndex >= locationCount) { + return null; + } + PartitionLocation currentLocation = locations[fileIndex]; + while (skipLocation(startMapIndex, endMapIndex, currentLocation)) { + skipCount.increment(); fileIndex++; + if (fileIndex == locationCount) { + return null; + } + currentLocation = locations[fileIndex]; + } + return currentLocation; + } + + private void moveToNextReader() throws IOException { + if (currentReader != null) { currentReader.close(); - currentReader = createReader(locations[fileIndex]); - logger.debug("Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read , " + - "get chunks size {}", locations[fileIndex], startMapIndex, endMapIndex, - fileIndex, locations.length, currentReader.numChunks); + currentReader = null; } - if (currentReader.numChunks > 0) { - currentChunk = currentReader.next(); - fileIndex++; - } else { + PartitionLocation currentLocation = nextReadableLocation(); + if (currentLocation == null) { + return; + } + currentReader = createReader(currentLocation); + fileIndex++; + while (!currentReader.hasNext()) { currentReader.close(); currentReader = null; + currentLocation = nextReadableLocation(); + if (currentLocation == null) { + return; + } + currentReader = createReader(currentLocation); + fileIndex++; } + currentChunk = currentReader.next(); } private PartitionReader createReader(PartitionLocation location) throws IOException { @@ -246,6 +280,12 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public void close() { + int locationsCount = locations.length; + logger.info( + "total location count {} read {} skip {}", + locationsCount, + locationsCount - skipCount.sum(), + skipCount.sum()); if (currentChunk != null) { logger.debug("Release chunk {}!", currentChunk); currentChunk.release(); diff --git a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala index f8be8c7aa7e..45cb82295cc 100644 --- a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala +++ b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala @@ -19,6 +19,7 @@ package com.aliyun.emr.rss.client.write import java.util import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.LongAdder import scala.collection.JavaConverters._ import scala.collection.mutable @@ -26,6 +27,7 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import io.netty.util.internal.ConcurrentSet +import org.roaringbitmap.RoaringBitmap import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.haclient.RssHARetryClient @@ -49,6 +51,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit private val splitThreshold = RssConf.partitionSplitThreshold(conf) private val splitMode = RssConf.partitionSplitMode(conf) private val storageHint = RssConf.storageHint(conf) + private val rangeReadFilter = RssConf.rangeReadFilterEnabled(conf) private val unregisterShuffleTime = new ConcurrentHashMap[Int, Long]() @@ -611,10 +614,13 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit val committedSlaveIds = new ConcurrentSet[String] val failedMasterIds = new ConcurrentSet[String] val failedSlaveIds = new ConcurrentSet[String] + val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]() val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId) val commitFilesFailedWorkers = new ConcurrentSet[WorkerInfo] + val commitFileStartTime = System.nanoTime() + val parallelism = Math.min(workerSnapshots(shuffleId).size(), RssConf.rpcMaxParallelism(conf)) ThreadUtils.parmap( @@ -663,6 +669,9 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit if (res.failedSlaveIds != null) { failedSlaveIds.addAll(res.failedSlaveIds) } + if (!res.committedMapIdBitMap.isEmpty) { + committedMapIdBitmap.putAll(res.committedMapIdBitMap) + } } } @@ -695,9 +704,11 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit val committedPartitions = new util.HashMap[String, PartitionLocation] committedMasterIds.asScala.foreach { id => committedPartitions.put(id, masterPartMap.get(id)) + masterPartMap.get(id).setMapIdBitMap(committedMapIdBitmap.get(id)) } committedSlaveIds.asScala.foreach { id => val slavePartition = slavePartMap.get(id) + slavePartition.setMapIdBitMap(committedMapIdBitmap.get(id)) val masterPartition = committedPartitions.get(id) if (masterPartition ne null) { masterPartition.setPeer(slavePartition) @@ -719,6 +730,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit fileGroups(i) = sets(i).toArray(new Array[PartitionLocation](0)) i += 1 } + logInfo(s"Shuffle $shuffleId commit files complete " + + s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms") } // reply @@ -783,7 +796,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit } else { val res = requestReserveSlots(entry._1.endpoint, ReserveSlots(applicationId, shuffleId, entry._2._1, entry._2._2, splitThreshold, - splitMode, storageHint)) + splitMode, storageHint, rangeReadFilter)) if (res.status.equals(StatusCode.Success)) { logDebug(s"Successfully allocated " + s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" + @@ -1129,7 +1142,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit } catch { case e: Exception => logError(s"AskSync CommitFiles for ${message.shuffleId} failed.", e) - CommitFilesResponse(StatusCode.Failed, null, null, message.masterIds, message.slaveIds) + CommitFilesResponse(StatusCode.Failed, null, null, + message.masterIds, message.slaveIds, Map.empty[String, RoaringBitmap].asJava) } } diff --git a/common/pom.xml b/common/pom.xml index 9d92b95b75c..590d9b10e6f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -141,6 +141,10 @@ scalatest-funsuite_${scala.binary.version} test + + org.roaringbitmap + RoaringBitmap + diff --git a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java index e7a4570cfeb..a6b8de8a97f 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java @@ -19,7 +19,10 @@ import java.io.Serializable; +import org.roaringbitmap.RoaringBitmap; + import com.aliyun.emr.rss.common.meta.WorkerInfo; +import com.aliyun.emr.rss.common.util.Utils; public class PartitionLocation implements Serializable { public enum Mode { @@ -61,6 +64,7 @@ public static PartitionLocation.Mode getMode(byte mode) { private Mode mode; private PartitionLocation peer; private StorageHint storageHint; + private RoaringBitmap mapIdBitMap = null; public PartitionLocation(PartitionLocation loc) { this.reduceId = loc.reduceId; @@ -85,7 +89,7 @@ public PartitionLocation( int replicatePort, Mode mode) { this(reduceId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort, - mode, null, StorageHint.MEMORY); + mode, null, StorageHint.MEMORY,null); } public PartitionLocation( @@ -99,7 +103,7 @@ public PartitionLocation( Mode mode, StorageHint storageHint) { this(reduceId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort, - mode, null, storageHint); + mode, null, storageHint,null); } public PartitionLocation( @@ -113,7 +117,7 @@ public PartitionLocation( Mode mode, PartitionLocation peer) { this(reduceId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort, mode, peer, - StorageHint.MEMORY); + StorageHint.MEMORY,null); } public PartitionLocation( @@ -126,7 +130,8 @@ public PartitionLocation( int replicatePort, Mode mode, PartitionLocation peer, - StorageHint hint) { + StorageHint hint, + RoaringBitmap mapIdBitMap) { this.reduceId = reduceId; this.epoch = epoch; this.host = host; @@ -137,6 +142,7 @@ public PartitionLocation( this.mode = mode; this.peer = peer; this.storageHint = hint; + this.mapIdBitMap = mapIdBitMap; } public int getReduceId() @@ -271,6 +277,14 @@ public WorkerInfo getWorker() { return new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort); } + public RoaringBitmap getMapIdBitMap() { + return mapIdBitMap; + } + + public void setMapIdBitMap(RoaringBitmap mapIdBitMap) { + this.mapIdBitMap = mapIdBitMap; + } + public static PartitionLocation fromPbPartitionLocation(TransportMessages.PbPartitionLocation pbPartitionLocation) { Mode mode = Mode.Master; @@ -278,15 +292,18 @@ public static PartitionLocation fromPbPartitionLocation(TransportMessages.PbPart mode = Mode.Slave; } - PartitionLocation partitionLocation = new PartitionLocation(pbPartitionLocation.getReduceId(), - pbPartitionLocation.getEpoch(), - pbPartitionLocation.getHost(), - pbPartitionLocation.getRpcPort(), - pbPartitionLocation.getPushPort(), - pbPartitionLocation.getFetchPort(), - pbPartitionLocation.getReplicatePort(), - mode, - PartitionLocation.StorageHint.values()[pbPartitionLocation.getStorageHintOrdinal()]); + PartitionLocation partitionLocation = new PartitionLocation( + pbPartitionLocation.getReduceId(), + pbPartitionLocation.getEpoch(), + pbPartitionLocation.getHost(), + pbPartitionLocation.getRpcPort(), + pbPartitionLocation.getPushPort(), + pbPartitionLocation.getFetchPort(), + pbPartitionLocation.getReplicatePort(), + mode, + null, + PartitionLocation.StorageHint.values()[pbPartitionLocation.getStorageHintOrdinal()], + Utils.byteStringToRoaringBitmap(pbPartitionLocation.getMapIdBitmap())); if (pbPartitionLocation.hasPeer()) { TransportMessages.PbPartitionLocation peerPb = pbPartitionLocation.getPeer(); @@ -297,7 +314,8 @@ public static PartitionLocation fromPbPartitionLocation(TransportMessages.PbPart PartitionLocation peerLocation = new PartitionLocation(peerPb.getReduceId(), peerPb.getEpoch(), peerPb.getHost(), peerPb.getRpcPort(), peerPb.getPushPort(), peerPb.getFetchPort(), peerPb.getReplicatePort(), peerMode, partitionLocation, - PartitionLocation.StorageHint.values()[peerPb.getStorageHintOrdinal()]); + PartitionLocation.StorageHint.values()[peerPb.getStorageHintOrdinal()], + Utils.byteStringToRoaringBitmap(peerPb.getMapIdBitmap())); partitionLocation.setPeer(peerLocation); } @@ -321,6 +339,8 @@ public static TransportMessages.PbPartitionLocation toPbPartitionLocation(Partit pbPartitionLocationBuilder.setFetchPort(partitionLocation.getFetchPort()); pbPartitionLocationBuilder.setReplicatePort(partitionLocation.getReplicatePort()); pbPartitionLocationBuilder.setStorageHintOrdinal(partitionLocation.getStorageHint().ordinal()); + pbPartitionLocationBuilder.setMapIdBitmap( + Utils.roaringBitmapToByteString(partitionLocation.getMapIdBitMap())); if (partitionLocation.getPeer() != null) { TransportMessages.PbPartitionLocation.Builder peerPbPartionLocationBuilder = TransportMessages @@ -339,6 +359,8 @@ public static TransportMessages.PbPartitionLocation toPbPartitionLocation(Partit peerPbPartionLocationBuilder.setReplicatePort(partitionLocation.getPeer().getReplicatePort()); peerPbPartionLocationBuilder.setStorageHintOrdinal( partitionLocation.getPeer().getStorageHint().ordinal()); + peerPbPartionLocationBuilder.setMapIdBitmap(Utils.roaringBitmapToByteString( + partitionLocation.getPeer().getMapIdBitMap())); pbPartitionLocationBuilder.setPeer(peerPbPartionLocationBuilder.build()); } diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index a979c1e02a0..7eb346a3166 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -68,6 +68,7 @@ message PbPartitionLocation { int32 replicatePort = 8; PbPartitionLocation peer = 9; int32 storageHintOrdinal = 10; + bytes mapIdBitmap = 11; } message PbWorkerResource { @@ -263,6 +264,7 @@ message PbReserveSlots { int64 splitThreshold = 5; int32 splitMode = 6; int32 storageHintOrdinal = 7; + bool rangeReadFilter = 8; } message PbReserveSlotsResponse { @@ -284,6 +286,7 @@ message PbCommitFilesResponse { repeated string committedSlaveIds = 3; repeated string failedMasterIds = 4; repeated string failedSlaveIds = 5; + map mapIdBitmap = 10; } message PbDestroy { diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala index 2b8ecbd30d8..d494f8761f9 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala @@ -819,6 +819,10 @@ object RssConf extends Logging { } } + def rangeReadFilterEnabled(conf: RssConf): Boolean = { + conf.getBoolean("rss.range.read.filter.enabled", false) + } + val WorkingDirName = "hadoop/rss-worker/shuffle_data" // If we want to use multi-raft group we can diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/AbstractSource.scala b/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/AbstractSource.scala index 0f98e5fdc67..e93ea043167 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/AbstractSource.scala @@ -222,7 +222,11 @@ abstract class AbstractSource(essConf: RssConf, role: String) def recordGauge(ng: NamedGauge[_]): Unit = { val timestamp = System.currentTimeMillis() val sb = new StringBuilder - sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gaurge.getValue} $timestamp\n") + if (ng.gaurge == null) { + sb.append(s"${normalizeKey(ng.name)}Value$label 0 $timestamp\n") + } else { + sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gaurge.getValue} $timestamp\n") + } updateInnerMetrics(sb.toString()) } diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index a6245c1b91a..a65a62feb8a 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -22,6 +22,8 @@ import java.util.UUID import scala.collection.JavaConverters._ +import org.roaringbitmap.RoaringBitmap + import com.aliyun.emr.rss.common.internal.Logging import com.aliyun.emr.rss.common.meta.WorkerInfo import com.aliyun.emr.rss.common.network.protocol.TransportMessage @@ -304,7 +306,7 @@ sealed trait Message extends Serializable{ new TransportMessage(TransportMessages.MessageType.REREGISTER_WORKER_RESPONSE, payload) case ReserveSlots(applicationId, shuffleId, masterLocations, slaveLocations, - splitThreshold, splitMode, storageHint) => + splitThreshold, splitMode, storageHint, rangeReadFilter) => val payload = TransportMessages.PbReserveSlots.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) @@ -315,6 +317,7 @@ sealed trait Message extends Serializable{ .setSplitThreshold(splitThreshold) .setSplitMode(splitMode.getValue) .setStorageHintOrdinal(storageHint.ordinal()) + .setRangeReadFilter(rangeReadFilter) .build().toByteArray new TransportMessage(TransportMessages.MessageType.RESERVE_SLOTS, payload) @@ -335,7 +338,7 @@ sealed trait Message extends Serializable{ new TransportMessage(TransportMessages.MessageType.COMMIT_FILES, payload) case CommitFilesResponse(status, committedMasterIds, committedSlaveIds, - failedMasterIds, failedSlaveIds) => + failedMasterIds, failedSlaveIds, committedMapIdBitMap) => val builder = TransportMessages.PbCommitFilesResponse.newBuilder() .setStatus(status.getValue) if (committedMasterIds != null) { @@ -350,6 +353,9 @@ sealed trait Message extends Serializable{ if (failedSlaveIds != null) { builder.addAllFailedSlaveIds(failedSlaveIds) } + committedMapIdBitMap.asScala.foreach(entry => { + builder.putMapIdBitmap(entry._1, Utils.roaringBitmapToByteString(entry._2)) + }) val payload = builder.build().toByteArray new TransportMessage(TransportMessages.MessageType.COMMIT_FILES_RESPONSE, payload) @@ -604,7 +610,8 @@ object ControlMessages extends Logging{ slaveLocations: util.List[PartitionLocation], splitThreshold: Long, splitMode: PartitionSplitMode, - storageHint: PartitionLocation.StorageHint) + storageHint: PartitionLocation.StorageHint, + rangeReadFilter: Boolean) extends WorkerMessage case class ReserveSlotsResponse( @@ -623,7 +630,8 @@ object ControlMessages extends Logging{ committedMasterIds: util.List[String], committedSlaveIds: util.List[String], failedMasterIds: util.List[String], - failedSlaveIds: util.List[String]) + failedSlaveIds: util.List[String], + committedMapIdBitMap: util.Map[String, RoaringBitmap]) extends WorkerMessage case class Destroy( @@ -845,7 +853,8 @@ object ControlMessages extends Logging{ new util.ArrayList[PartitionLocation](pbReserveSlots.getSlaveLocationsList.asScala .map(PartitionLocation.fromPbPartitionLocation(_)).toList.asJava), pbReserveSlots.getSplitThreshold, Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode), - PartitionLocation.StorageHint.values()(pbReserveSlots.getStorageHintOrdinal)) + PartitionLocation.StorageHint.values()(pbReserveSlots.getStorageHintOrdinal), + pbReserveSlots.getRangeReadFilter) case RESERVE_SLOTS_RESPONSE => val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload) @@ -860,11 +869,16 @@ object ControlMessages extends Logging{ case COMMIT_FILES_RESPONSE => val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload) + val committedBitMap = new util.HashMap[String, RoaringBitmap]() + pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry => + committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2)) + } CommitFilesResponse(Utils.toStatusCode(pbCommitFilesResponse.getStatus), pbCommitFilesResponse.getCommittedMasterIdsList, pbCommitFilesResponse.getCommittedSlaveIdsList, pbCommitFilesResponse.getFailedMasterIdsList, - pbCommitFilesResponse.getFailedSlaveIdsList) + pbCommitFilesResponse.getFailedSlaveIdsList, + committedBitMap) case DESTROY => val pbDestroy = PbDestroy.parseFrom(message.getPayload) diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala index 63ae65ee603..89a108d6c47 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory import java.math.{MathContext, RoundingMode} import java.net._ import java.nio.charset.StandardCharsets +import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.{Locale, Properties, UUID} import java.util @@ -33,8 +34,10 @@ import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.net.InetAddresses +import com.google.protobuf.ByteString import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils +import org.roaringbitmap.RoaringBitmap import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.exception.RssException @@ -751,4 +754,27 @@ object Utils extends Logging { PartitionSplitMode.soft } } + + def roaringBitmapToByteString(roaringBitMap: RoaringBitmap): ByteString = { + if (roaringBitMap != null && !roaringBitMap.isEmpty) { + val buf = ByteBuffer.allocate(roaringBitMap.serializedSizeInBytes()) + roaringBitMap.serialize(buf) + buf.rewind() + ByteString.copyFrom(buf) + } else { + ByteString.EMPTY + } + } + + def byteStringToRoaringBitmap(bytes: ByteString): RoaringBitmap = { + if (!bytes.isEmpty) { + val roaringBitmap = new RoaringBitmap() + val buf = bytes.asReadOnlyByteBuffer() + buf.rewind() + roaringBitmap.deserialize(buf) + roaringBitmap + } else { + null + } + } } diff --git a/pom.xml b/pom.xml index 0f2c02cb0b0..a9dc950a6f6 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ prepare-package 5.1.0 1.5.2-3 + 0.9.32 @@ -247,6 +248,11 @@ protobuf-java ${protobuf.version} + + org.roaringbitmap + RoaringBitmap + ${roaringbitmap.version} + org.scalamock scalamock_${scala.binary.version} diff --git a/server-common/pom.xml b/server-common/pom.xml index 3de02f2f2d5..2da3904b19a 100644 --- a/server-common/pom.xml +++ b/server-common/pom.xml @@ -70,6 +70,10 @@ org.apache.commons commons-crypto + + org.roaringbitmap + RoaringBitmap + diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java index 998e89c9b2f..aae53371e0d 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java @@ -31,6 +31,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import com.aliyun.emr.rss.common.metrics.source.AbstractSource; import com.aliyun.emr.rss.common.network.server.MemoryTracker; import com.aliyun.emr.rss.common.protocol.PartitionSplitMode; +import com.aliyun.emr.rss.common.unsafe.Platform; /* * Note: Once FlushNotifier.exception is set, the whole file is not available. @@ -74,6 +76,8 @@ public final class FileWriter extends DeviceObserver { private long splitThreshold = 0; private final AtomicBoolean splitted = new AtomicBoolean(false); private final PartitionSplitMode splitMode; + private final boolean rangeReadFilter; + private RoaringBitmap mapIdBitMap = null; @Override public void notifyError(String deviceName, ListBuffer dirs, @@ -117,7 +121,8 @@ public FileWriter( RssConf rssConf, DeviceMonitor deviceMonitor, long splitThreshold, - PartitionSplitMode splitMode) throws IOException { + PartitionSplitMode splitMode, + boolean rangeReadFilter) throws IOException { this.file = file; this.flusher = flusher; this.flushWorkerIndex = flusher.getWorkerIndex(); @@ -130,9 +135,13 @@ public FileWriter( this.flushBufferSize = flushBufferSize; this.deviceMonitor = deviceMonitor; this.splitMode = splitMode; + this.rangeReadFilter = rangeReadFilter; channel = new FileOutputStream(file).getChannel(); source = workerSource; logger.debug("FileWriter {} split threshold {} mode {}", this, splitThreshold, splitMode); + if (rangeReadFilter) { + this.mapIdBitMap = new RoaringBitmap(); + } takeBuffer(); } @@ -202,9 +211,21 @@ public void write(ByteBuf data) throws IOException { return; } + int mapId = 0; + if (rangeReadFilter) { + byte[] header = new byte[16]; + data.markReaderIndex(); + data.readBytes(header); + data.resetReaderIndex(); + mapId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET); + } + final int numBytes = data.readableBytes(); MemoryTracker.instance().incrementDiskBuffer(numBytes); synchronized (this) { + if (rangeReadFilter) { + mapIdBitMap.add(mapId); + } if (flushBuffer.readableBytes() != 0 && flushBuffer.readableBytes() + numBytes >= this.flushBufferSize) { flush(false); @@ -218,6 +239,10 @@ public void write(ByteBuf data) throws IOException { } } + public RoaringBitmap getMapIdBitMap() { + return mapIdBitMap; + } + public long close() throws IOException { if (closed) { String msg = "FileWriter has already closed! fileName " + file.getAbsolutePath(); diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala index 03eae9652b4..a085f19a33f 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala @@ -18,7 +18,7 @@ package com.aliyun.emr.rss.service.deploy.worker import java.io.IOException -import java.util.{ArrayList => jArrayList, List => jList} +import java.util.{ArrayList => jArrayList, List => jList, HashMap => jHashMap} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.util.function.BiFunction @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import io.netty.util.{HashedWheelTimer, Timeout, TimerTask} import io.netty.util.internal.ConcurrentSet +import org.roaringbitmap.RoaringBitmap import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.internal.Logging @@ -69,14 +70,14 @@ private[deploy] class Controller( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case ReserveSlots(applicationId, shuffleId, masterLocations, slaveLocations, splitThreashold, - splitMode, storageHint) => + splitMode, storageHint, rangeReadFileter: Boolean) => val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) workerSource.sample(WorkerSource.ReserveSlotsTime, shuffleKey) { logDebug(s"Received ReserveSlots request, $shuffleKey, " + s"master partitions: ${masterLocations.asScala.map(_.getUniqueId).mkString(",")}; " + s"slave partitions: ${slaveLocations.asScala.map(_.getUniqueId).mkString(",")}.") handleReserveSlots(context, applicationId, shuffleId, masterLocations, - slaveLocations, splitThreashold, splitMode, storageHint) + slaveLocations, splitThreashold, splitMode, storageHint, rangeReadFileter) logDebug(s"ReserveSlots for $shuffleKey succeed.") } @@ -110,7 +111,8 @@ private[deploy] class Controller( slaveLocations: jList[PartitionLocation], splitThreshold: Long, splitMode: PartitionSplitMode, - storageHint: StorageHint): Unit = { + storageHint: StorageHint, + rangeReadFilter: Boolean): Unit = { val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) if (!localStorageManager.hasAvailableWorkingDirs) { val msg = "Local storage has no available dirs!" @@ -123,7 +125,7 @@ private[deploy] class Controller( for (ind <- 0 until masterLocations.size()) { val location = masterLocations.get(ind) val writer = localStorageManager.createWriter(applicationId, shuffleId, location, - splitThreshold, splitMode) + splitThreshold, splitMode, rangeReadFilter) masterPartitions.add(new WorkingPartition(location, writer)) } } catch { @@ -143,7 +145,7 @@ private[deploy] class Controller( for (ind <- 0 until slaveLocations.size()) { val location = slaveLocations.get(ind) val writer = localStorageManager.createWriter(applicationId, shuffleId, - location, splitThreshold, splitMode) + location, splitThreshold, splitMode, rangeReadFilter) slavePartitions.add(new WorkingPartition(location, writer)) } } catch { @@ -173,6 +175,7 @@ private[deploy] class Controller( uniqueIds: jList[String], committedIds: ConcurrentSet[String], failedIds: ConcurrentSet[String], + committedMapIdBitMap: ConcurrentHashMap[String, RoaringBitmap], master: Boolean = true): CompletableFuture[Void] = { var future: CompletableFuture[Void] = null @@ -195,6 +198,9 @@ private[deploy] class Controller( val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter val bytes = fileWriter.close() if (bytes > 0L) { + if (fileWriter.getMapIdBitMap != null) { + committedMapIdBitMap.put(uniqueId, fileWriter.getMapIdBitMap) + } committedIds.add(uniqueId) } } catch { @@ -227,7 +233,7 @@ private[deploy] class Controller( logError(s"Shuffle $shuffleKey doesn't exist!") context.reply(CommitFilesResponse( StatusCode.ShuffleNotRegistered, List.empty.asJava, List.empty.asJava, - masterIds, slaveIds)) + masterIds, slaveIds, Map.empty[String, RoaringBitmap].asJava)) return } @@ -239,9 +245,12 @@ private[deploy] class Controller( val committedSlaveIds = new ConcurrentSet[String]() val failedMasterIds = new ConcurrentSet[String]() val failedSlaveIds = new ConcurrentSet[String]() + val committedMapIdBitMap = new ConcurrentHashMap[String, RoaringBitmap]() - val masterFuture = commitFiles(shuffleKey, masterIds, committedMasterIds, failedMasterIds) - val slaveFuture = commitFiles(shuffleKey, slaveIds, committedSlaveIds, failedSlaveIds, false) + val masterFuture = commitFiles(shuffleKey, masterIds, committedMasterIds, + failedMasterIds, committedMapIdBitMap) + val slaveFuture = commitFiles(shuffleKey, slaveIds, committedSlaveIds, + failedSlaveIds, committedMapIdBitMap, false) val future = if (masterFuture != null && slaveFuture != null) { CompletableFuture.allOf(masterFuture, slaveFuture) @@ -264,18 +273,20 @@ private[deploy] class Controller( val committedSlaveIdList = new jArrayList[String](committedSlaveIds) val failedMasterIdList = new jArrayList[String](failedMasterIds) val failedSlaveIdList = new jArrayList[String](failedSlaveIds) + val committedMapIdBitMapList = new jHashMap[String, RoaringBitmap](committedMapIdBitMap) // reply if (failedMasterIds.isEmpty && failedSlaveIds.isEmpty) { logInfo(s"CommitFiles for $shuffleKey success with ${committedMasterIds.size()}" + s" master partitions and ${committedSlaveIds.size()} slave partitions!") context.reply(CommitFilesResponse( StatusCode.Success, committedMasterIdList, committedSlaveIdList, - List.empty.asJava, List.empty.asJava)) + List.empty.asJava, List.empty.asJava, committedMapIdBitMapList)) } else { logWarning(s"CommitFiles for $shuffleKey failed with ${failedMasterIds.size()} master" + s" partitions and ${failedSlaveIds.size()} slave partitions!") context.reply(CommitFilesResponse(StatusCode.PartialSuccess, committedMasterIdList, - committedSlaveIdList, failedMasterIdList, failedSlaveIdList)) + committedSlaveIdList, failedMasterIdList, + failedSlaveIdList, Map.empty[String, RoaringBitmap].asJava)) } } diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala index cb91301b63d..94604647971 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala @@ -346,12 +346,12 @@ private[worker] final class LocalStorageManager( @throws[IOException] def createWriter(appId: String, shuffleId: Int, location: PartitionLocation, - splitThreshold: Long, splitMode: PartitionSplitMode): FileWriter = { + splitThreshold: Long, splitMode: PartitionSplitMode,rangeReadFilter: Boolean): FileWriter = { if (!hasAvailableWorkingDirs()) { throw new IOException("No available working dirs!") } createWriter(appId, shuffleId, location.getReduceId, location.getEpoch, - location.getMode, splitThreshold, splitMode) + location.getMode, splitThreshold, splitMode, rangeReadFilter) } @throws[IOException] @@ -362,7 +362,8 @@ private[worker] final class LocalStorageManager( epoch: Int, mode: PartitionLocation.Mode, splitThreshold: Long, - splitMode: PartitionSplitMode): FileWriter = { + splitMode: PartitionSplitMode, + rangeReadFilter: Boolean): FileWriter = { val fileName = s"$reduceId-$epoch-${mode.mode()}" var retryCount = 0 @@ -381,7 +382,8 @@ private[worker] final class LocalStorageManager( throw new RssException("create app shuffle data dir or file failed") } val fileWriter = new FileWriter(file, diskFlushers.get(dir), dir, fetchChunkSize, - writerFlushBufferSize, workerSource, conf, deviceMonitor, splitThreshold, splitMode) + writerFlushBufferSize, workerSource, conf, deviceMonitor, splitThreshold, splitMode, + rangeReadFilter) deviceMonitor.registerFileWriter(fileWriter) val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) val shuffleMap = writers.computeIfAbsent(shuffleKey, newMapFunc) diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerSource.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerSource.scala index 675d71c05e3..14412571d14 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerSource.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerSource.scala @@ -42,6 +42,7 @@ class WorkerSource(essConf: RssConf) addTimer(TakeBufferTime) addTimer(SortTime) + // start cleaner thread startCleaner() } @@ -59,7 +60,6 @@ object WorkerSource { val FetchChunkTime = "FetchChunkTime" - // push data val MasterPushDataTime = "MasterPushDataTime" val SlavePushDataTime = "SlavePushDataTime" diff --git a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java index 41c1e303152..6f7c49d18fe 100644 --- a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java +++ b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java @@ -212,7 +212,7 @@ public void testMultiThreadWrite() throws IOException, ExecutionException, Inter File file = getTemporaryFile(); FileWriter writer = new FileWriter(file, flusher, file.getParentFile(), CHUNK_SIZE, FLUSH_BUFFER_SIZE_LIMIT, source, new RssConf(), - DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode); + DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode, false); List> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1"); @@ -247,7 +247,7 @@ public void testAfterStressfulWriteWillReadCorrect() File file = getTemporaryFile(); FileWriter writer = new FileWriter(file, flusher, file.getParentFile(), CHUNK_SIZE, FLUSH_BUFFER_SIZE_LIMIT, source, new RssConf(), - DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode); + DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode, false); List> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -286,7 +286,7 @@ public void testWriteAndChunkRead() throws Exception { File file = getTemporaryFile(); FileWriter writer = new FileWriter(file, flusher, file.getParentFile(), CHUNK_SIZE, FLUSH_BUFFER_SIZE_LIMIT, source, new RssConf(), - DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode); + DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode, false); List> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2");