From bbe0210ed839e93da5148d6e8b67dfad5345a8fb Mon Sep 17 00:00:00 2001 From: mingji Date: Thu, 15 Sep 2022 23:47:33 +0800 Subject: [PATCH 1/8] Add map ids info for each PartitionLocation to enable filtering for map range reading --- .../emr/rss/client/read/RssInputStream.java | 40 +++++++++++++- .../rss/client/write/LifecycleManager.scala | 5 ++ common/pom.xml | 4 ++ .../common/protocol/PartitionLocation.java | 53 ++++++++++++++++--- common/src/main/proto/TransportMessages.proto | 2 + .../protocol/message/ControlMessages.scala | 27 ++++++++++ pom.xml | 5 ++ .../service/deploy/master/SlotsAllocator.java | 4 +- .../deploy/worker/storage/FileWriter.java | 13 +++++ .../service/deploy/worker/Controller.scala | 9 ++++ 10 files changed, 153 insertions(+), 9 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index fb2acbba4e4..64ac5cf52b8 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -155,13 +155,45 @@ private static final class RssInputStreamImpl extends RssInputStream { moveToNextReader(); } + private boolean locationHasMapIdToRead( + int startMapIndex, int endMapIndex, PartitionLocation location) { + boolean hasMapId = false; + if (endMapIndex == Integer.MAX_VALUE) { + return true; + } + for (int i = startMapIndex; i < endMapIndex; i++) { + hasMapId = hasMapId | location.getMapIdBitMap().contains(i); + } + return hasMapId; + } + + private PartitionLocation nextReadableLocation() { + int locationCount = locations.length; + PartitionLocation currentLocation = locations[fileIndex]; + while (!locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation) + && fileIndex < locationCount - 1) { + fileIndex++; + currentLocation = locations[fileIndex]; + } + if (fileIndex == locationCount - 1 + && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { + return currentLocation; + } else { + return null; + } + } + private void moveToNextReader() throws IOException { if (currentReader != null) { currentReader.close(); } int locationCount = locations.length; - PartitionLocation currentLocation = locations[fileIndex]; + PartitionLocation currentLocation = nextReadableLocation(); + if (currentLocation == null) { + currentReader = null; + return; + } currentReader = createReader(currentLocation); logger.debug( "Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read ", @@ -172,8 +204,12 @@ private void moveToNextReader() throws IOException { locationCount); while (!currentReader.hasNext() && fileIndex < locationCount - 1) { fileIndex++; - currentLocation = locations[fileIndex]; currentReader.close(); + currentLocation = nextReadableLocation(); + if (currentLocation == null) { + currentReader = null; + return; + } currentReader = createReader(currentLocation); logger.debug( "Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read ", diff --git a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala index 2687b93ba0b..beb0852c0dd 100644 --- a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala +++ b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala @@ -26,6 +26,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import org.roaringbitmap.RoaringBitmap + import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.haclient.RssHARetryClient import com.aliyun.emr.rss.common.internal.Logging @@ -671,6 +673,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit val committedSlaveIds = ConcurrentHashMap.newKeySet[String]() val committedMasterStorageInfos = new ConcurrentHashMap[String, StorageInfo]() val committedSlaveStorageInfos = new ConcurrentHashMap[String, StorageInfo]() + val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]() val failedMasterIds = ConcurrentHashMap.newKeySet[String]() val failedSlaveIds = ConcurrentHashMap.newKeySet[String]() @@ -767,6 +770,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit logDebug(s"$applicationId-$shuffleId $id storage hint was not returned") } else { masterPartMap.get(id).setStorageInfo(committedMasterStorageInfos.get(id)) + masterPartMap.get(id).setMapIdBitMap(committedMapIdBitmap.get(id)) committedPartitions.put(id, masterPartMap.get(id)) } } @@ -777,6 +781,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit logDebug(s"$applicationId-$shuffleId $id storage hint was not returned") } else { slavePartition.setStorageInfo(committedSlaveStorageInfos.get(id)) + slavePartition.setMapIdBitMap(committedMapIdBitmap.get(id)) val masterPartition = committedPartitions.get(id) if (masterPartition ne null) { masterPartition.setPeer(slavePartition) diff --git a/common/pom.xml b/common/pom.xml index 2c8a2a8f31f..070c4503a28 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -107,6 +107,10 @@ org.apache.hadoop hadoop-client + + org.roaringbitmap + RoaringBitmap + diff --git a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java index b05894571cc..ba00a0e71a6 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java @@ -17,7 +17,12 @@ package com.aliyun.emr.rss.common.protocol; +import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; + +import com.google.protobuf.ByteString; +import org.roaringbitmap.RoaringBitmap; import com.aliyun.emr.rss.common.meta.WorkerInfo; @@ -56,6 +61,7 @@ public static PartitionLocation.Mode getMode(byte mode) { private Mode mode; private PartitionLocation peer; private StorageInfo storageInfo = new StorageInfo(); + private RoaringBitmap mapIdBitMap = new RoaringBitmap(); public PartitionLocation(PartitionLocation loc) { this.id = loc.id; @@ -89,7 +95,8 @@ public PartitionLocation( replicatePort, mode, null, - new StorageInfo()); + new StorageInfo(), + new RoaringBitmap()); } public PartitionLocation( @@ -112,7 +119,8 @@ public PartitionLocation( replicatePort, mode, peer, - new StorageInfo()); + new StorageInfo(), + new RoaringBitmap()); } public PartitionLocation( @@ -125,7 +133,8 @@ public PartitionLocation( int replicatePort, Mode mode, PartitionLocation peer, - StorageInfo hint) { + StorageInfo hint, + RoaringBitmap mapIdBitMap) { this.id = id; this.epoch = epoch; this.host = host; @@ -136,6 +145,7 @@ public PartitionLocation( this.mode = mode; this.peer = peer; this.storageInfo = hint; + this.mapIdBitMap = mapIdBitMap; } public int getId() { @@ -286,7 +296,34 @@ public WorkerInfo getWorker() { return new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort); } - public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLoc) { + public RoaringBitmap getMapIdBitMap() { + return mapIdBitMap; + } + + public void setMapIdBitMap(RoaringBitmap mapIdBitMap) { + this.mapIdBitMap = mapIdBitMap; + } + + private static ByteString roaringBitmapToPb(RoaringBitmap roaringBitmap) { + if (!roaringBitmap.isEmpty()) { + int serializedSize = roaringBitmap.serializedSizeInBytes(); + ByteBuffer buffer = ByteBuffer.allocate(serializedSize); + roaringBitmap.serialize(buffer); + return ByteString.copyFrom(buffer); + } + return ByteString.EMPTY; + } + + private static RoaringBitmap fromPbToRoaringBitmap(ByteString bytes) throws IOException { + RoaringBitmap roaringBitmap = new RoaringBitmap(); + if (!bytes.isEmpty()) { + roaringBitmap.deserialize(ByteBuffer.wrap(bytes.toByteArray())); + } + return roaringBitmap; + } + + public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLoc) + throws IOException { Mode mode = Mode.MASTER; if (pbLoc.getMode() == PbPartitionLocation.Mode.Slave) { mode = Mode.SLAVE; @@ -303,7 +340,8 @@ public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLo pbLoc.getReplicatePort(), mode, null, - StorageInfo.fromPb(pbLoc.getStorageInfo())); + StorageInfo.fromPb(pbLoc.getStorageInfo()), + fromPbToRoaringBitmap(pbLoc.getMapIdBitmap())); if (pbLoc.hasPeer()) { PbPartitionLocation peerPb = pbLoc.getPeer(); Mode peerMode = Mode.MASTER; @@ -321,7 +359,8 @@ public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLo peerPb.getReplicatePort(), peerMode, partitionLocation, - StorageInfo.fromPb(peerPb.getStorageInfo())); + StorageInfo.fromPb(peerPb.getStorageInfo()), + fromPbToRoaringBitmap(peerPb.getMapIdBitmap())); partitionLocation.setPeer(peerLocation); } @@ -343,6 +382,7 @@ public static PbPartitionLocation toPbPartitionLocation(PartitionLocation locati builder.setFetchPort(location.getFetchPort()); builder.setReplicatePort(location.getReplicatePort()); builder.setStorageInfo(StorageInfo.toPb(location.storageInfo)); + builder.setMapIdBitmap(roaringBitmapToPb(location.getMapIdBitMap())); if (location.getPeer() != null) { PbPartitionLocation.Builder peerBuilder = PbPartitionLocation.newBuilder(); @@ -359,6 +399,7 @@ public static PbPartitionLocation toPbPartitionLocation(PartitionLocation locati peerBuilder.setFetchPort(location.getPeer().getFetchPort()); peerBuilder.setReplicatePort(location.getPeer().getReplicatePort()); peerBuilder.setStorageInfo(StorageInfo.toPb(location.getPeer().getStorageInfo())); + peerBuilder.setMapIdBitmap(roaringBitmapToPb(location.getMapIdBitMap())); builder.setPeer(peerBuilder.build()); } diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index e088c4bb353..7ba43393797 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -90,6 +90,7 @@ message PbPartitionLocation { int32 replicatePort = 8; PbPartitionLocation peer = 9; PbStorageInfo storageInfo = 10; + bytes mapIdBitmap = 11; } message PbWorkerResource { @@ -321,6 +322,7 @@ message PbCommitFilesResponse { repeated string failedSlaveIds = 5; map committedMasterStorageInfos = 6; map committedSlaveStorageInfos = 7; + map mapIdBitmap = 10; int64 totalWritten = 8; int32 fileCount = 9; } diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index ab6448c7b34..99d28aa8568 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -17,11 +17,15 @@ package com.aliyun.emr.rss.common.protocol.message +import java.nio.ByteBuffer import java.util import java.util.UUID import scala.collection.JavaConverters._ +import com.google.protobuf.ByteString +import org.roaringbitmap.RoaringBitmap + import com.aliyun.emr.rss.common.internal.Logging import com.aliyun.emr.rss.common.meta.{DiskInfo, WorkerInfo} import com.aliyun.emr.rss.common.network.protocol.TransportMessage @@ -385,6 +389,7 @@ sealed trait Message extends Serializable { failedSlaveIds, committedMasterStorageInfos, committedSlaveStorageInfos, + committedMapIdBitMap, totalWritten, fileCount) => val builder = PbCommitFilesResponse.newBuilder() @@ -397,6 +402,17 @@ sealed trait Message extends Serializable { builder.putCommittedMasterStorageInfos(entry._1, StorageInfo.toPb(entry._2))) committedSlaveStorageInfos.asScala.foreach(entry => builder.putCommittedSlaveStorageInfos(entry._1, StorageInfo.toPb(entry._2))) + if (!committedMapIdBitMap.isEmpty) { + committedMapIdBitMap.asScala.foreach(entry => { + if (!entry._2.isEmpty) { + val bitMapBuf = ByteBuffer.allocate(entry._2.serializedSizeInBytes()) + entry._2.serialize(bitMapBuf) + builder.putMapIdBitmap(entry._1, ByteString.copyFrom(bitMapBuf)) + } else { + builder.putMapIdBitmap(entry._1, ByteString.EMPTY) + } + }) + } builder.setTotalWritten(totalWritten) builder.setFileCount(fileCount) val payload = builder.build().toByteArray @@ -688,6 +704,8 @@ object ControlMessages extends Logging { Map.empty[String, StorageInfo].asJava, committedSlaveStorageInfos: util.Map[String, StorageInfo] = Map.empty[String, StorageInfo].asJava, + committedMapIdBitMap: util.Map[String, RoaringBitmap] = + Map.empty[String, RoaringBitmap].asJava, totalWritten: Long = 0, fileCount: Int = 0) extends WorkerMessage @@ -983,10 +1001,18 @@ object ControlMessages extends Logging { val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload) val committedMasterStorageInfos = new util.HashMap[String, StorageInfo]() val committedSlaveStorageInfos = new util.HashMap[String, StorageInfo]() + val committedBitMap = new util.HashMap[String, RoaringBitmap]() pbCommitFilesResponse.getCommittedMasterStorageInfosMap.asScala.foreach(entry => committedMasterStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2))) pbCommitFilesResponse.getCommittedSlaveStorageInfosMap.asScala.foreach(entry => committedSlaveStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2))) + pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry => + val bitmap = new RoaringBitmap() + if (!entry._2.isEmpty) { + bitmap.deserialize(ByteBuffer.wrap(entry._2.toByteArray)) + } + committedBitMap.put(entry._1, bitmap) + } CommitFilesResponse( Utils.toStatusCode(pbCommitFilesResponse.getStatus), pbCommitFilesResponse.getCommittedMasterIdsList, @@ -995,6 +1021,7 @@ object ControlMessages extends Logging { pbCommitFilesResponse.getFailedSlaveIdsList, committedMasterStorageInfos, committedSlaveStorageInfos, + committedBitMap, pbCommitFilesResponse.getTotalWritten, pbCommitFilesResponse.getFileCount) diff --git a/pom.xml b/pom.xml index 8db3a24e2b1..ba6f4936834 100644 --- a/pom.xml +++ b/pom.xml @@ -381,6 +381,11 @@ + + org.roaringbitmap + RoaringBitmap + 0.9.32 + junit junit diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/SlotsAllocator.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/SlotsAllocator.java index acc5a079278..3ed63e8d2c2 100644 --- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/SlotsAllocator.java +++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/SlotsAllocator.java @@ -22,6 +22,7 @@ import scala.Tuple2; import org.apache.commons.lang3.StringUtils; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -393,7 +394,8 @@ private static PartitionLocation createLocation( workerInfo.replicatePort(), isMaster ? PartitionLocation.Mode.MASTER : PartitionLocation.Mode.SLAVE, peer, - storageInfo); + storageInfo, + new RoaringBitmap()); } public static Map> slotsToDiskAllocations( diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java index 2afd18886a9..59372959154 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java @@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import org.apache.hadoop.fs.FSDataOutputStream; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ import com.aliyun.emr.rss.common.protocol.PartitionSplitMode; import com.aliyun.emr.rss.common.protocol.PartitionType; import com.aliyun.emr.rss.common.protocol.StorageInfo; +import com.aliyun.emr.rss.common.unsafe.Platform; import com.aliyun.emr.rss.service.deploy.worker.WorkerSource; /* @@ -77,6 +79,7 @@ public final class FileWriter implements DeviceObserver { private Runnable destroyHook; private boolean deleted = false; + private RoaringBitmap mapIdBitMap = new RoaringBitmap(); @Override public void notifyError(String mountPoint, DiskStatus diskStatus) { @@ -187,6 +190,11 @@ public void write(ByteBuf data) throws IOException { return; } + byte[] headerBuf = new byte[16]; + data.getBytes(0, headerBuf); + int mapId = Platform.getInt(headerBuf, Platform.BYTE_ARRAY_OFFSET); + mapIdBitMap.add(mapId); + final int numBytes = data.readableBytes(); MemoryTracker.instance().incrementDiskBuffer(numBytes); synchronized (this) { @@ -203,6 +211,11 @@ public void write(ByteBuf data) throws IOException { } } + public RoaringBitmap getMapIdBitMap() { + mapIdBitMap.runOptimize(); + return mapIdBitMap; + } + public StorageInfo getStorageInfo() { if (flusher instanceof LocalFlusher) { LocalFlusher localFlusher = (LocalFlusher) flusher; diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala index 77d056e0cdc..28a6c52c892 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala @@ -26,6 +26,7 @@ import java.util.function.BiFunction import scala.collection.JavaConverters._ import io.netty.util.{HashedWheelTimer, Timeout, TimerTask} +import org.roaringbitmap.RoaringBitmap import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.internal.Logging @@ -209,6 +210,7 @@ private[deploy] class Controller( committedIds: jSet[String], failedIds: jSet[String], committedStorageInfos: ConcurrentHashMap[String, StorageInfo], + committedMapIdBitMap: ConcurrentHashMap[String, RoaringBitmap], partitionSizeList: LinkedBlockingQueue[Long], master: Boolean = true): CompletableFuture[Void] = { var future: CompletableFuture[Void] = null @@ -236,6 +238,7 @@ private[deploy] class Controller( if (bytes > 0L) { if (fileWriter.getStorageInfo != null) { committedStorageInfos.put(uniqueId, fileWriter.getStorageInfo) + committedMapIdBitMap.put(uniqueId, fileWriter.getMapIdBitMap) } if (bytes >= minimumPartitionSizeForEstimation) { partitionSizeList.add(bytes) @@ -291,6 +294,7 @@ private[deploy] class Controller( val failedSlaveIds = ConcurrentHashMap.newKeySet[String]() val committedMasterStorageInfos = new ConcurrentHashMap[String, StorageInfo]() val committedSlaveStorageInfos = new ConcurrentHashMap[String, StorageInfo]() + val committedMapIdBitMap = new ConcurrentHashMap[String, RoaringBitmap]() val partitionSizeList = new LinkedBlockingQueue[Long]() val masterFuture = @@ -300,6 +304,7 @@ private[deploy] class Controller( committedMasterIds, failedMasterIds, committedMasterStorageInfos, + committedMapIdBitMap, partitionSizeList) val slaveFuture = commitFiles( shuffleKey, @@ -307,6 +312,7 @@ private[deploy] class Controller( committedSlaveIds, failedSlaveIds, committedSlaveStorageInfos, + committedMapIdBitMap, partitionSizeList, false) @@ -341,6 +347,7 @@ private[deploy] class Controller( new jHashMap[String, StorageInfo](committedMasterStorageInfos) val committedSlaveStorageAndDiskHintList = new jHashMap[String, StorageInfo](committedSlaveStorageInfos) + val committedMapIdBitMapList = new jHashMap[String, RoaringBitmap](committedMapIdBitMap) val totalSize = partitionSizeList.asScala.sum val fileCount = partitionSizeList.size() // reply @@ -356,6 +363,7 @@ private[deploy] class Controller( List.empty.asJava, committedMasterStorageAndDiskHintList, committedSlaveStorageAndDiskHintList, + committedMapIdBitMapList, totalSize, fileCount)) } else { @@ -370,6 +378,7 @@ private[deploy] class Controller( failedSlaveIdList, committedMasterStorageAndDiskHintList, committedSlaveStorageAndDiskHintList, + committedMapIdBitMapList, totalSize, fileCount)) } From 8aa01ff54d2381832c34d291206f8822d076fb0a Mon Sep 17 00:00:00 2001 From: mingji Date: Mon, 19 Sep 2022 20:35:09 +0800 Subject: [PATCH 2/8] fix uts. --- .../rss/client/write/LifecycleManager.scala | 4 +++ .../common/protocol/PartitionLocation.java | 33 ++++--------------- .../protocol/message/ControlMessages.scala | 20 +++-------- .../aliyun/emr/rss/common/util/Utils.scala | 25 ++++++++++++++ 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala index beb0852c0dd..3d6356bad74 100644 --- a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala +++ b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala @@ -733,6 +733,10 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit failedMasterIds.addAll(res.failedMasterIds) failedSlaveIds.addAll(res.failedSlaveIds) + if (!res.committedMapIdBitMap.isEmpty) { + committedMapIdBitmap.putAll(res.committedMapIdBitMap) + } + totalWritten.add(res.totalWritten) fileCount.add(res.fileCount) } diff --git a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java index ba00a0e71a6..68fad3d6b08 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java @@ -17,14 +17,12 @@ package com.aliyun.emr.rss.common.protocol; -import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; -import com.google.protobuf.ByteString; import org.roaringbitmap.RoaringBitmap; import com.aliyun.emr.rss.common.meta.WorkerInfo; +import com.aliyun.emr.rss.common.util.Utils; public class PartitionLocation implements Serializable { public enum Mode { @@ -304,26 +302,7 @@ public void setMapIdBitMap(RoaringBitmap mapIdBitMap) { this.mapIdBitMap = mapIdBitMap; } - private static ByteString roaringBitmapToPb(RoaringBitmap roaringBitmap) { - if (!roaringBitmap.isEmpty()) { - int serializedSize = roaringBitmap.serializedSizeInBytes(); - ByteBuffer buffer = ByteBuffer.allocate(serializedSize); - roaringBitmap.serialize(buffer); - return ByteString.copyFrom(buffer); - } - return ByteString.EMPTY; - } - - private static RoaringBitmap fromPbToRoaringBitmap(ByteString bytes) throws IOException { - RoaringBitmap roaringBitmap = new RoaringBitmap(); - if (!bytes.isEmpty()) { - roaringBitmap.deserialize(ByteBuffer.wrap(bytes.toByteArray())); - } - return roaringBitmap; - } - - public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLoc) - throws IOException { + public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLoc) { Mode mode = Mode.MASTER; if (pbLoc.getMode() == PbPartitionLocation.Mode.Slave) { mode = Mode.SLAVE; @@ -341,7 +320,7 @@ public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLo mode, null, StorageInfo.fromPb(pbLoc.getStorageInfo()), - fromPbToRoaringBitmap(pbLoc.getMapIdBitmap())); + Utils.byteStringToRoaringBitmap(pbLoc.getMapIdBitmap())); if (pbLoc.hasPeer()) { PbPartitionLocation peerPb = pbLoc.getPeer(); Mode peerMode = Mode.MASTER; @@ -360,7 +339,7 @@ public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLo peerMode, partitionLocation, StorageInfo.fromPb(peerPb.getStorageInfo()), - fromPbToRoaringBitmap(peerPb.getMapIdBitmap())); + Utils.byteStringToRoaringBitmap(peerPb.getMapIdBitmap())); partitionLocation.setPeer(peerLocation); } @@ -382,7 +361,7 @@ public static PbPartitionLocation toPbPartitionLocation(PartitionLocation locati builder.setFetchPort(location.getFetchPort()); builder.setReplicatePort(location.getReplicatePort()); builder.setStorageInfo(StorageInfo.toPb(location.storageInfo)); - builder.setMapIdBitmap(roaringBitmapToPb(location.getMapIdBitMap())); + builder.setMapIdBitmap(Utils.roaringBitmapToByteString(location.getMapIdBitMap())); if (location.getPeer() != null) { PbPartitionLocation.Builder peerBuilder = PbPartitionLocation.newBuilder(); @@ -399,7 +378,7 @@ public static PbPartitionLocation toPbPartitionLocation(PartitionLocation locati peerBuilder.setFetchPort(location.getPeer().getFetchPort()); peerBuilder.setReplicatePort(location.getPeer().getReplicatePort()); peerBuilder.setStorageInfo(StorageInfo.toPb(location.getPeer().getStorageInfo())); - peerBuilder.setMapIdBitmap(roaringBitmapToPb(location.getMapIdBitMap())); + peerBuilder.setMapIdBitmap(Utils.roaringBitmapToByteString(location.getMapIdBitMap())); builder.setPeer(peerBuilder.build()); } diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index 99d28aa8568..5e4296e7a2c 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -402,17 +402,9 @@ sealed trait Message extends Serializable { builder.putCommittedMasterStorageInfos(entry._1, StorageInfo.toPb(entry._2))) committedSlaveStorageInfos.asScala.foreach(entry => builder.putCommittedSlaveStorageInfos(entry._1, StorageInfo.toPb(entry._2))) - if (!committedMapIdBitMap.isEmpty) { - committedMapIdBitMap.asScala.foreach(entry => { - if (!entry._2.isEmpty) { - val bitMapBuf = ByteBuffer.allocate(entry._2.serializedSizeInBytes()) - entry._2.serialize(bitMapBuf) - builder.putMapIdBitmap(entry._1, ByteString.copyFrom(bitMapBuf)) - } else { - builder.putMapIdBitmap(entry._1, ByteString.EMPTY) - } - }) - } + committedMapIdBitMap.asScala.foreach(entry => { + builder.putMapIdBitmap(entry._1, Utils.roaringBitmapToByteString(entry._2)) + }) builder.setTotalWritten(totalWritten) builder.setFileCount(fileCount) val payload = builder.build().toByteArray @@ -1007,11 +999,7 @@ object ControlMessages extends Logging { pbCommitFilesResponse.getCommittedSlaveStorageInfosMap.asScala.foreach(entry => committedSlaveStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2))) pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry => - val bitmap = new RoaringBitmap() - if (!entry._2.isEmpty) { - bitmap.deserialize(ByteBuffer.wrap(entry._2.toByteArray)) - } - committedBitMap.put(entry._1, bitmap) + committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2)) } CommitFilesResponse( Utils.toStatusCode(pbCommitFilesResponse.getStatus), diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala index 4f040a1c3d5..a2fca932119 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala @@ -21,6 +21,7 @@ import java.io.{File, FileInputStream, InputStreamReader, IOException} import java.lang.management.ManagementFactory import java.math.{MathContext, RoundingMode} import java.net._ +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.text.SimpleDateFormat import java.util @@ -33,8 +34,10 @@ import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.net.InetAddresses +import com.google.protobuf.ByteString import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils +import org.roaringbitmap.RoaringBitmap import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.exception.RssException @@ -851,4 +854,26 @@ object Utils extends Logging { def getWriteSuccessFilePath(path: String): String = { path + SUFFIX_HDFS_WRITE_SUCCESS } + + def roaringBitmapToByteString(roaringBitMap: RoaringBitmap): ByteString = { + if (!roaringBitMap.isEmpty) { + val buf = ByteBuffer.allocate(roaringBitMap.serializedSizeInBytes()) + roaringBitMap.serialize(buf) + buf.rewind() + ByteString.copyFrom(buf) + } else { + ByteString.EMPTY + } + } + + def byteStringToRoaringBitmap(bytes: ByteString): RoaringBitmap = { + val roaringBitmap = new RoaringBitmap() + if (!bytes.isEmpty) { + val buf = bytes.asReadOnlyByteBuffer() + buf.rewind() + roaringBitmap.deserialize(buf) + } + roaringBitmap + } + } From 407638ac56a6ae7d71c52699e6c9f38291f1c72c Mon Sep 17 00:00:00 2001 From: mingji Date: Tue, 20 Sep 2022 10:36:59 +0800 Subject: [PATCH 3/8] refine. --- .../emr/rss/client/read/RssInputStream.java | 42 ++++--------------- .../protocol/message/ControlMessages.scala | 2 - pom.xml | 3 +- .../deploy/cluster/ClusterReadWriteTest.scala | 2 +- 4 files changed, 10 insertions(+), 39 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index 64ac5cf52b8..4668d0985c0 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -171,12 +171,13 @@ private PartitionLocation nextReadableLocation() { int locationCount = locations.length; PartitionLocation currentLocation = locations[fileIndex]; while (!locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation) - && fileIndex < locationCount - 1) { + && fileIndex < locationCount - 1) { fileIndex++; currentLocation = locations[fileIndex]; } - if (fileIndex == locationCount - 1 - && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { + if (((fileIndex == (locationCount - 1)) + && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) || + locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { return currentLocation; } else { return null; @@ -186,46 +187,17 @@ && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { private void moveToNextReader() throws IOException { if (currentReader != null) { currentReader.close(); + currentReader = null; } - int locationCount = locations.length; PartitionLocation currentLocation = nextReadableLocation(); if (currentLocation == null) { currentReader = null; return; } currentReader = createReader(currentLocation); - logger.debug( - "Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read ", - currentLocation, - startMapIndex, - endMapIndex, - fileIndex, - locationCount); - while (!currentReader.hasNext() && fileIndex < locationCount - 1) { - fileIndex++; - currentReader.close(); - currentLocation = nextReadableLocation(); - if (currentLocation == null) { - currentReader = null; - return; - } - currentReader = createReader(currentLocation); - logger.debug( - "Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read ", - currentLocation, - startMapIndex, - endMapIndex, - fileIndex, - locationCount); - } - if (currentReader.hasNext()) { - currentChunk = currentReader.next(); - fileIndex++; - } else { - currentReader.close(); - currentReader = null; - } + currentChunk = currentReader.next(); + fileIndex++; } private PartitionReader createReader(PartitionLocation location) throws IOException { diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index 5e4296e7a2c..3d71b284645 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -17,13 +17,11 @@ package com.aliyun.emr.rss.common.protocol.message -import java.nio.ByteBuffer import java.util import java.util.UUID import scala.collection.JavaConverters._ -import com.google.protobuf.ByteString import org.roaringbitmap.RoaringBitmap import com.aliyun.emr.rss.common.internal.Logging diff --git a/pom.xml b/pom.xml index ba6f4936834..4c356251731 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ 5.1.0 3.2.3 1.7.16 + 0.9.32 3.3.1 @@ -384,7 +385,7 @@ org.roaringbitmap RoaringBitmap - 0.9.32 + ${roaringbitmap.version} junit diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReadWriteTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReadWriteTest.scala index 74f9d345b2b..2c1d5b60f0f 100644 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReadWriteTest.scala +++ b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReadWriteTest.scala @@ -85,7 +85,7 @@ class ClusterReadWriteTest extends MiniClusterFeature { val readBytes = outputStream.toByteArray - assert(readBytes.length == LENGTH1 + LENGTH2 + LENGTH3 + LENGTH4) + Assert.assertEquals(LENGTH1 + LENGTH2 + LENGTH3 + LENGTH4, readBytes.length) val targetArr = Array.concat(DATA1, DATA2, DATA3, DATA4) Assert.assertArrayEquals(targetArr, readBytes) From 8edda0aae7eaeb7f800428293563dfb767da2003 Mon Sep 17 00:00:00 2001 From: mingji Date: Tue, 20 Sep 2022 10:41:42 +0800 Subject: [PATCH 4/8] refine. --- .../com/aliyun/emr/rss/client/read/RssInputStream.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index 4668d0985c0..c2ed3dd28bb 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -171,13 +171,13 @@ private PartitionLocation nextReadableLocation() { int locationCount = locations.length; PartitionLocation currentLocation = locations[fileIndex]; while (!locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation) - && fileIndex < locationCount - 1) { + && fileIndex < locationCount - 1) { fileIndex++; currentLocation = locations[fileIndex]; } if (((fileIndex == (locationCount - 1)) - && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) || - locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { + && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) + || locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { return currentLocation; } else { return null; @@ -189,10 +189,8 @@ private void moveToNextReader() throws IOException { currentReader.close(); currentReader = null; } - PartitionLocation currentLocation = nextReadableLocation(); if (currentLocation == null) { - currentReader = null; return; } currentReader = createReader(currentLocation); From 5c7b2a34e989a6860342dc840d046c4b5fdf5581 Mon Sep 17 00:00:00 2001 From: "zky.zhoukeyong" Date: Tue, 20 Sep 2022 21:11:16 +0800 Subject: [PATCH 5/8] address comments --- .../emr/rss/client/read/RssInputStream.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index c2ed3dd28bb..033df1dca0f 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -155,33 +155,30 @@ private static final class RssInputStreamImpl extends RssInputStream { moveToNextReader(); } - private boolean locationHasMapIdToRead( + private boolean skipLocation( int startMapIndex, int endMapIndex, PartitionLocation location) { - boolean hasMapId = false; if (endMapIndex == Integer.MAX_VALUE) { - return true; + return false; } for (int i = startMapIndex; i < endMapIndex; i++) { - hasMapId = hasMapId | location.getMapIdBitMap().contains(i); + if (location.getMapIdBitMap().contains(i)) { + return false; + } } - return hasMapId; + return true; } private PartitionLocation nextReadableLocation() { int locationCount = locations.length; PartitionLocation currentLocation = locations[fileIndex]; - while (!locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation) - && fileIndex < locationCount - 1) { + while (skipLocation(startMapIndex, endMapIndex, currentLocation)) { fileIndex++; + if (fileIndex == locationCount) { + return null; + } currentLocation = locations[fileIndex]; } - if (((fileIndex == (locationCount - 1)) - && locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) - || locationHasMapIdToRead(startMapIndex, endMapIndex, currentLocation)) { - return currentLocation; - } else { - return null; - } + return currentLocation; } private void moveToNextReader() throws IOException { From 3460630f965654997b6ab5aa8fe3430e5b6fb614 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 21 Sep 2022 12:55:15 +0800 Subject: [PATCH 6/8] add reader double check. --- .../emr/rss/client/read/RssInputStream.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index 033df1dca0f..056facbef42 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -155,8 +155,7 @@ private static final class RssInputStreamImpl extends RssInputStream { moveToNextReader(); } - private boolean skipLocation( - int startMapIndex, int endMapIndex, PartitionLocation location) { + private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) { if (endMapIndex == Integer.MAX_VALUE) { return false; } @@ -191,8 +190,18 @@ private void moveToNextReader() throws IOException { return; } currentReader = createReader(currentLocation); - currentChunk = currentReader.next(); fileIndex++; + while (!currentReader.hasNext()) { + currentReader.close(); + currentReader = null; + currentLocation = nextReadableLocation(); + if (currentLocation == null) { + return; + } + currentReader = createReader(currentLocation); + fileIndex++; + } + currentChunk = currentReader.next(); } private PartitionReader createReader(PartitionLocation location) throws IOException { From a46abe87c1a9c148dc5c4431f5ff4e281605cdfc Mon Sep 17 00:00:00 2001 From: mingji Date: Thu, 22 Sep 2022 13:05:00 +0800 Subject: [PATCH 7/8] fix a multithread problem. --- .../emr/rss/service/deploy/worker/storage/FileWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java index 59372959154..d31a8f3ec5d 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java @@ -193,11 +193,11 @@ public void write(ByteBuf data) throws IOException { byte[] headerBuf = new byte[16]; data.getBytes(0, headerBuf); int mapId = Platform.getInt(headerBuf, Platform.BYTE_ARRAY_OFFSET); - mapIdBitMap.add(mapId); final int numBytes = data.readableBytes(); MemoryTracker.instance().incrementDiskBuffer(numBytes); synchronized (this) { + mapIdBitMap.add(mapId); if (flushBuffer.readableBytes() != 0 && flushBuffer.readableBytes() + numBytes >= this.flushBufferSize) { flush(false); @@ -212,7 +212,6 @@ public void write(ByteBuf data) throws IOException { } public RoaringBitmap getMapIdBitMap() { - mapIdBitMap.runOptimize(); return mapIdBitMap; } From 4d9659d1dfeedfe0280d04c363cdfaeb8900f1fa Mon Sep 17 00:00:00 2001 From: mingji Date: Thu, 22 Sep 2022 15:42:31 +0800 Subject: [PATCH 8/8] update by tests. --- .../com/aliyun/emr/rss/client/read/RssInputStream.java | 3 +++ .../emr/rss/service/deploy/worker/storage/FileWriter.java | 8 +++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index 0a919648218..5b36584ff22 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -156,6 +156,9 @@ private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocati private PartitionLocation nextReadableLocation() { int locationCount = locations.length; + if (fileIndex >= locationCount) { + return null; + } PartitionLocation currentLocation = locations[fileIndex]; while (skipLocation(startMapIndex, endMapIndex, currentLocation)) { fileIndex++; diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java index d31a8f3ec5d..efc772340f8 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java @@ -190,9 +190,11 @@ public void write(ByteBuf data) throws IOException { return; } - byte[] headerBuf = new byte[16]; - data.getBytes(0, headerBuf); - int mapId = Platform.getInt(headerBuf, Platform.BYTE_ARRAY_OFFSET); + byte[] header = new byte[16]; + data.markReaderIndex(); + data.readBytes(header); + data.resetReaderIndex(); + int mapId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET); final int numBytes = data.readableBytes(); MemoryTracker.instance().incrementDiskBuffer(numBytes);