Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-607]Add map ids info for each PartitionLocation to enable filtering for m… #619

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,41 +142,56 @@ private static final class RssInputStreamImpl extends RssInputStream {
moveToNextReader();
}

private void moveToNextReader() throws IOException {
if (currentReader != null) {
currentReader.close();
private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) {
if (endMapIndex == Integer.MAX_VALUE) {
return false;
}
for (int i = startMapIndex; i < endMapIndex; i++) {
if (location.getMapIdBitMap().contains(i)) {
return false;
}
}
return true;
}

private PartitionLocation nextReadableLocation() {
int locationCount = locations.length;
if (fileIndex >= locationCount) {
return null;
}
PartitionLocation currentLocation = locations[fileIndex];
currentReader = createReader(currentLocation);
logger.debug(
"Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read ",
currentLocation,
startMapIndex,
endMapIndex,
fileIndex,
locationCount);
while (!currentReader.hasNext() && fileIndex < locationCount - 1) {
while (skipLocation(startMapIndex, endMapIndex, currentLocation)) {
fileIndex++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (fileIndex == locationCount) {
   break;
}

if (fileIndex == locationCount) {
return null;
}
currentLocation = locations[fileIndex];
}
return currentLocation;
}

private void moveToNextReader() throws IOException {
if (currentReader != null) {
currentReader.close();
currentReader = createReader(currentLocation);
logger.debug(
"Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read ",
currentLocation,
startMapIndex,
endMapIndex,
fileIndex,
locationCount);
currentReader = null;
}
if (currentReader.hasNext()) {
currentChunk = currentReader.next();
fileIndex++;
} else {
PartitionLocation currentLocation = nextReadableLocation();
if (currentLocation == null) {
return;
}
currentReader = createReader(currentLocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the following logic? For normal read we don't know if currentReader has chunks.

while (!currentReader.hasNext()...)

fileIndex++;
while (!currentReader.hasNext()) {
currentReader.close();
currentReader = null;
currentLocation = nextReadableLocation();
if (currentLocation == null) {
return;
}
currentReader = createReader(currentLocation);
fileIndex++;
}
currentChunk = currentReader.next();
}

private PartitionReader createReader(PartitionLocation location) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random

import org.roaringbitmap.RoaringBitmap

import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.haclient.RssHARetryClient
import com.aliyun.emr.rss.common.identity.IdentityProvider
Expand Down Expand Up @@ -677,6 +679,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
val committedSlaveIds = ConcurrentHashMap.newKeySet[String]()
val committedMasterStorageInfos = new ConcurrentHashMap[String, StorageInfo]()
val committedSlaveStorageInfos = new ConcurrentHashMap[String, StorageInfo]()
val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]()
val failedMasterIds = ConcurrentHashMap.newKeySet[String]()
val failedSlaveIds = ConcurrentHashMap.newKeySet[String]()

Expand Down Expand Up @@ -736,6 +739,10 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
failedMasterIds.addAll(res.failedMasterIds)
failedSlaveIds.addAll(res.failedSlaveIds)

if (!res.committedMapIdBitMap.isEmpty) {
committedMapIdBitmap.putAll(res.committedMapIdBitMap)
}

totalWritten.add(res.totalWritten)
fileCount.add(res.fileCount)
}
Expand Down Expand Up @@ -773,6 +780,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
logDebug(s"$applicationId-$shuffleId $id storage hint was not returned")
} else {
masterPartMap.get(id).setStorageInfo(committedMasterStorageInfos.get(id))
masterPartMap.get(id).setMapIdBitMap(committedMapIdBitmap.get(id))
committedPartitions.put(id, masterPartMap.get(id))
}
}
Expand All @@ -783,6 +791,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
logDebug(s"$applicationId-$shuffleId $id storage hint was not returned")
} else {
slavePartition.setStorageInfo(committedSlaveStorageInfos.get(id))
slavePartition.setMapIdBitMap(committedMapIdBitmap.get(id))
val masterPartition = committedPartitions.get(id)
if (masterPartition ne null) {
masterPartition.setPeer(slavePartition)
Expand Down
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import java.io.Serializable;

import org.roaringbitmap.RoaringBitmap;

import com.aliyun.emr.rss.common.meta.WorkerInfo;
import com.aliyun.emr.rss.common.util.Utils;

public class PartitionLocation implements Serializable {
public enum Mode {
Expand Down Expand Up @@ -56,6 +59,7 @@ public static PartitionLocation.Mode getMode(byte mode) {
private Mode mode;
private PartitionLocation peer;
private StorageInfo storageInfo = new StorageInfo();
private RoaringBitmap mapIdBitMap = new RoaringBitmap();

public PartitionLocation(PartitionLocation loc) {
this.id = loc.id;
Expand Down Expand Up @@ -89,7 +93,8 @@ public PartitionLocation(
replicatePort,
mode,
null,
new StorageInfo());
new StorageInfo(),
new RoaringBitmap());
}

public PartitionLocation(
Expand All @@ -112,7 +117,8 @@ public PartitionLocation(
replicatePort,
mode,
peer,
new StorageInfo());
new StorageInfo(),
new RoaringBitmap());
}

public PartitionLocation(
Expand All @@ -125,7 +131,8 @@ public PartitionLocation(
int replicatePort,
Mode mode,
PartitionLocation peer,
StorageInfo hint) {
StorageInfo hint,
RoaringBitmap mapIdBitMap) {
this.id = id;
this.epoch = epoch;
this.host = host;
Expand All @@ -136,6 +143,7 @@ public PartitionLocation(
this.mode = mode;
this.peer = peer;
this.storageInfo = hint;
this.mapIdBitMap = mapIdBitMap;
}

public int getId() {
Expand Down Expand Up @@ -286,6 +294,14 @@ public WorkerInfo getWorker() {
return new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort);
}

public RoaringBitmap getMapIdBitMap() {
return mapIdBitMap;
}

public void setMapIdBitMap(RoaringBitmap mapIdBitMap) {
this.mapIdBitMap = mapIdBitMap;
}

public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLoc) {
Mode mode = Mode.MASTER;
if (pbLoc.getMode() == PbPartitionLocation.Mode.Slave) {
Expand All @@ -303,7 +319,8 @@ public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLo
pbLoc.getReplicatePort(),
mode,
null,
StorageInfo.fromPb(pbLoc.getStorageInfo()));
StorageInfo.fromPb(pbLoc.getStorageInfo()),
Utils.byteStringToRoaringBitmap(pbLoc.getMapIdBitmap()));
if (pbLoc.hasPeer()) {
PbPartitionLocation peerPb = pbLoc.getPeer();
Mode peerMode = Mode.MASTER;
Expand All @@ -321,7 +338,8 @@ public static PartitionLocation fromPbPartitionLocation(PbPartitionLocation pbLo
peerPb.getReplicatePort(),
peerMode,
partitionLocation,
StorageInfo.fromPb(peerPb.getStorageInfo()));
StorageInfo.fromPb(peerPb.getStorageInfo()),
Utils.byteStringToRoaringBitmap(peerPb.getMapIdBitmap()));
partitionLocation.setPeer(peerLocation);
}

Expand All @@ -343,6 +361,7 @@ public static PbPartitionLocation toPbPartitionLocation(PartitionLocation locati
builder.setFetchPort(location.getFetchPort());
builder.setReplicatePort(location.getReplicatePort());
builder.setStorageInfo(StorageInfo.toPb(location.storageInfo));
builder.setMapIdBitmap(Utils.roaringBitmapToByteString(location.getMapIdBitMap()));

if (location.getPeer() != null) {
PbPartitionLocation.Builder peerBuilder = PbPartitionLocation.newBuilder();
Expand All @@ -359,6 +378,7 @@ public static PbPartitionLocation toPbPartitionLocation(PartitionLocation locati
peerBuilder.setFetchPort(location.getPeer().getFetchPort());
peerBuilder.setReplicatePort(location.getPeer().getReplicatePort());
peerBuilder.setStorageInfo(StorageInfo.toPb(location.getPeer().getStorageInfo()));
peerBuilder.setMapIdBitmap(Utils.roaringBitmapToByteString(location.getMapIdBitMap()));
builder.setPeer(peerBuilder.build());
}

Expand Down
2 changes: 2 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ message PbPartitionLocation {
int32 replicatePort = 8;
PbPartitionLocation peer = 9;
PbStorageInfo storageInfo = 10;
bytes mapIdBitmap = 11;
}

message PbWorkerResource {
Expand Down Expand Up @@ -321,6 +322,7 @@ message PbCommitFilesResponse {
repeated string failedSlaveIds = 5;
map<string, PbStorageInfo> committedMasterStorageInfos = 6;
map<string, PbStorageInfo> committedSlaveStorageInfos = 7;
map<string, bytes> mapIdBitmap = 10;
int64 totalWritten = 8;
int32 fileCount = 9;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.UUID

import scala.collection.JavaConverters._

import org.roaringbitmap.RoaringBitmap

import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.{DiskInfo, WorkerInfo}
import com.aliyun.emr.rss.common.network.protocol.TransportMessage
Expand Down Expand Up @@ -385,6 +387,7 @@ sealed trait Message extends Serializable {
failedSlaveIds,
committedMasterStorageInfos,
committedSlaveStorageInfos,
committedMapIdBitMap,
totalWritten,
fileCount) =>
val builder = PbCommitFilesResponse.newBuilder()
Expand All @@ -397,6 +400,9 @@ sealed trait Message extends Serializable {
builder.putCommittedMasterStorageInfos(entry._1, StorageInfo.toPb(entry._2)))
committedSlaveStorageInfos.asScala.foreach(entry =>
builder.putCommittedSlaveStorageInfos(entry._1, StorageInfo.toPb(entry._2)))
committedMapIdBitMap.asScala.foreach(entry => {
builder.putMapIdBitmap(entry._1, Utils.roaringBitmapToByteString(entry._2))
})
builder.setTotalWritten(totalWritten)
builder.setFileCount(fileCount)
val payload = builder.build().toByteArray
Expand Down Expand Up @@ -688,6 +694,8 @@ object ControlMessages extends Logging {
Map.empty[String, StorageInfo].asJava,
committedSlaveStorageInfos: util.Map[String, StorageInfo] =
Map.empty[String, StorageInfo].asJava,
committedMapIdBitMap: util.Map[String, RoaringBitmap] =
Map.empty[String, RoaringBitmap].asJava,
totalWritten: Long = 0,
fileCount: Int = 0) extends WorkerMessage

Expand Down Expand Up @@ -991,10 +999,14 @@ object ControlMessages extends Logging {
val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload)
val committedMasterStorageInfos = new util.HashMap[String, StorageInfo]()
val committedSlaveStorageInfos = new util.HashMap[String, StorageInfo]()
val committedBitMap = new util.HashMap[String, RoaringBitmap]()
pbCommitFilesResponse.getCommittedMasterStorageInfosMap.asScala.foreach(entry =>
committedMasterStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
pbCommitFilesResponse.getCommittedSlaveStorageInfosMap.asScala.foreach(entry =>
committedSlaveStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry =>
committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2))
}
CommitFilesResponse(
Utils.toStatusCode(pbCommitFilesResponse.getStatus),
pbCommitFilesResponse.getCommittedMasterIdsList,
Expand All @@ -1003,6 +1015,7 @@ object ControlMessages extends Logging {
pbCommitFilesResponse.getFailedSlaveIdsList,
committedMasterStorageInfos,
committedSlaveStorageInfos,
committedBitMap,
pbCommitFilesResponse.getTotalWritten,
pbCommitFilesResponse.getFileCount)

Expand Down
25 changes: 25 additions & 0 deletions common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{File, FileInputStream, InputStreamReader, IOException}
import java.lang.management.ManagementFactory
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util
Expand All @@ -34,8 +35,10 @@ import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.net.InetAddresses
import com.google.protobuf.ByteString
import io.netty.channel.unix.Errors.NativeIoException
import org.apache.commons.lang3.SystemUtils
import org.roaringbitmap.RoaringBitmap

import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.exception.RssException
Expand Down Expand Up @@ -875,4 +878,26 @@ object Utils extends Logging {
def getWriteSuccessFilePath(path: String): String = {
path + SUFFIX_HDFS_WRITE_SUCCESS
}

def roaringBitmapToByteString(roaringBitMap: RoaringBitmap): ByteString = {
if (!roaringBitMap.isEmpty) {
val buf = ByteBuffer.allocate(roaringBitMap.serializedSizeInBytes())
roaringBitMap.serialize(buf)
buf.rewind()
ByteString.copyFrom(buf)
} else {
ByteString.EMPTY
}
}

def byteStringToRoaringBitmap(bytes: ByteString): RoaringBitmap = {
val roaringBitmap = new RoaringBitmap()
if (!bytes.isEmpty) {
val buf = bytes.asReadOnlyByteBuffer()
buf.rewind()
roaringBitmap.deserialize(buf)
}
roaringBitmap
}

}
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<scalamock.version>5.1.0</scalamock.version>
<scalatest.version>3.2.3</scalatest.version>
<slf4j.version>1.7.16</slf4j.version>
<roaringbitmap.version>0.9.32</roaringbitmap.version>
<!-- default hadoop version -->
<hadoop.version>3.3.1</hadoop.version>

Expand Down Expand Up @@ -381,6 +382,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>${roaringbitmap.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Loading