Skip to content

Commit

Permalink
refactor(stream): make CompactionManager#compact public (#877)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Jan 3, 2024
1 parent e2a2871 commit d578c12
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 59 deletions.
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.12.0-SNAPSHOT</version>
<version>0.13.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ public Builder withItem(StreamObjectsCompactionTask.CompactionSummary compaction
return this;
}
this.involvedStreamCount++;
this.sourceObjectsTotalSize += compactionSummary.getTotalObjectSize();
this.sourceObjectsCount += compactionSummary.getSourceObjectsCount();
this.targetObjectsCount += compactionSummary.getTargetObjectCount();
this.smallSizeCopyWriteCount += compactionSummary.getSmallSizeCopyWriteCount();
this.sourceObjectsTotalSize += compactionSummary.totalObjectSize();
this.sourceObjectsCount += compactionSummary.sourceObjectsCount();
this.targetObjectsCount += compactionSummary.targetObjectCount();
this.smallSizeCopyWriteCount += compactionSummary.smallSizeCopyWriteCount();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ private Queue<List<S3StreamObjectMetadataSplitWrapper>> groupEligibleObjects(Lis
}
}
if (groups.isEmpty()) {
long startOffset = streamObjects.get(0).getOffsetRanges().get(0).getStartOffset();
long endOffset = streamObjects.get(streamObjects.size() - 1).getOffsetRanges().get(0).getEndOffset();
long startOffset = streamObjects.get(0).getOffsetRanges().get(0).startOffset();
long endOffset = streamObjects.get(streamObjects.size() - 1).getOffsetRanges().get(0).endOffset();
LOGGER.trace("{} no eligible stream objects found for range [{}, {})", logIdent, startOffset, endOffset);
}
return groups;
Expand Down Expand Up @@ -464,35 +464,35 @@ public CompactionSummary(long streamId, long startOffset, long endOffset, long t
this.smallSizeCopyWriteCount = smallSizeCopyWriteCount;
}

public long getStreamId() {
public long streamId() {
return streamId;
}

public long getStartOffset() {
public long startOffset() {
return startOffset;
}

public long getEndOffset() {
public long endOffset() {
return endOffset;
}

public long getTimeCostInMs() {
public long timeCostInMs() {
return timeCostInMs;
}

public long getTotalObjectSize() {
public long totalObjectSize() {
return totalObjectSize;
}

public long getSourceObjectsCount() {
public long sourceObjectsCount() {
return sourceObjectsCount;
}

public long getTargetObjectCount() {
public long targetObjectCount() {
return targetObjectCount;
}

public long getSmallSizeCopyWriteCount() {
public long smallSizeCopyWriteCount() {
return smallSizeCopyWriteCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ public void shutdown() {
this.uploader.stop();
}

private CompletableFuture<Void> compact() {
public CompletableFuture<Void> compact() {
return this.objectManager.getServerObjects().thenComposeAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::getStreamId).distinct().toList();
.map(StreamOffsetRange::streamId).distinct().toList();
return this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList ->
this.compact(streamMetadataList, objectMetadataList), compactThreadPool);
}, compactThreadPool);
Expand Down Expand Up @@ -287,7 +287,7 @@ public CompletableFuture<Void> forceSplitAll() {
//TODO: deal with metadata delay
this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::getStreamId).distinct().toList();
.map(StreamOffsetRange::streamId).distinct().toList();
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
if (objectMetadataList.isEmpty()) {
logger.info("No stream set objects to force split");
Expand Down Expand Up @@ -488,32 +488,32 @@ boolean isSanityCheckFailed(List<StreamMetadata> streamMetadataList, List<S3Obje
request.getStreamRanges().forEach(o -> compactedStreamOffsetRanges.add(new StreamOffsetRange(o.getStreamId(), o.getStartOffset(), o.getEndOffset())));
request.getStreamObjects().forEach(o -> compactedStreamOffsetRanges.add(new StreamOffsetRange(o.getStreamId(), o.getStartOffset(), o.getEndOffset())));
Map<Long, List<StreamOffsetRange>> sortedStreamOffsetRanges = compactedStreamOffsetRanges.stream()
.collect(Collectors.groupingBy(StreamOffsetRange::getStreamId));
.collect(Collectors.groupingBy(StreamOffsetRange::streamId));
sortedStreamOffsetRanges.replaceAll((k, v) -> sortAndMerge(v));
for (long objectId : request.getCompactedObjectIds()) {
S3ObjectMetadata metadata = objectMetadataMap.get(objectId);
for (StreamOffsetRange streamOffsetRange : metadata.getOffsetRanges()) {
if (!streamMetadataMap.containsKey(streamOffsetRange.getStreamId())) {
if (!streamMetadataMap.containsKey(streamOffsetRange.streamId())) {
// skip non-exist stream
continue;
}
long streamStartOffset = streamMetadataMap.get(streamOffsetRange.getStreamId()).startOffset();
if (streamOffsetRange.getEndOffset() <= streamStartOffset) {
long streamStartOffset = streamMetadataMap.get(streamOffsetRange.streamId()).startOffset();
if (streamOffsetRange.endOffset() <= streamStartOffset) {
// skip stream offset range that has been trimmed
continue;
}
if (streamOffsetRange.getStartOffset() < streamStartOffset) {
if (streamOffsetRange.startOffset() < streamStartOffset) {
// trim stream offset range
streamOffsetRange = new StreamOffsetRange(streamOffsetRange.getStreamId(), streamStartOffset, streamOffsetRange.getEndOffset());
streamOffsetRange = new StreamOffsetRange(streamOffsetRange.streamId(), streamStartOffset, streamOffsetRange.endOffset());
}
if (!sortedStreamOffsetRanges.containsKey(streamOffsetRange.getStreamId())) {
logger.error("Sanity check failed, stream {} is missing after compact", streamOffsetRange.getStreamId());
if (!sortedStreamOffsetRanges.containsKey(streamOffsetRange.streamId())) {
logger.error("Sanity check failed, stream {} is missing after compact", streamOffsetRange.streamId());
return true;
}
boolean contained = false;
for (StreamOffsetRange compactedStreamOffsetRange : sortedStreamOffsetRanges.get(streamOffsetRange.getStreamId())) {
if (streamOffsetRange.getStartOffset() >= compactedStreamOffsetRange.getStartOffset()
&& streamOffsetRange.getEndOffset() <= compactedStreamOffsetRange.getEndOffset()) {
for (StreamOffsetRange compactedStreamOffsetRange : sortedStreamOffsetRanges.get(streamOffsetRange.streamId())) {
if (streamOffsetRange.startOffset() >= compactedStreamOffsetRange.startOffset()
&& streamOffsetRange.endOffset() <= compactedStreamOffsetRange.endOffset()) {
contained = true;
break;
}
Expand All @@ -532,7 +532,7 @@ private List<StreamOffsetRange> sortAndMerge(List<StreamOffsetRange> streamOffse
if (streamOffsetRangeList.size() < 2) {
return streamOffsetRangeList;
}
long streamId = streamOffsetRangeList.get(0).getStreamId();
long streamId = streamOffsetRangeList.get(0).streamId();
Collections.sort(streamOffsetRangeList);
List<StreamOffsetRange> mergedList = new ArrayList<>();
long start = -1L;
Expand All @@ -541,14 +541,14 @@ private List<StreamOffsetRange> sortAndMerge(List<StreamOffsetRange> streamOffse
StreamOffsetRange curr = streamOffsetRangeList.get(i);
StreamOffsetRange next = streamOffsetRangeList.get(i + 1);
if (start == -1) {
start = curr.getStartOffset();
end = curr.getEndOffset();
start = curr.startOffset();
end = curr.endOffset();
}
if (curr.getEndOffset() < next.getStartOffset()) {
mergedList.add(new StreamOffsetRange(curr.getStreamId(), start, end));
start = next.getStartOffset();
if (curr.endOffset() < next.startOffset()) {
mergedList.add(new StreamOffsetRange(curr.streamId(), start, end));
start = next.startOffset();
}
end = next.getEndOffset();
end = next.endOffset();
}
mergedList.add(new StreamOffsetRange(streamId, start, end));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,19 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long st
List<S3ObjectMetadata> streamSetObjectList = streamSetObjects.values()
.stream()
.map(Pair::getRight)
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1)))
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.toList();
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1)))
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.toList();

List<S3ObjectMetadata> result = new ArrayList<>();
result.addAll(streamSetObjectList);
result.addAll(streamObjectList);
result.sort((o1, o2) -> {
long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.streamId() == streamId).findFirst().get().startOffset();
long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.streamId() == streamId).findFirst().get().startOffset();
return Long.compare(startOffset1, startOffset2);
});

Expand All @@ -199,7 +199,7 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(l
long endOffset, int limit) {
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1)))
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.limit(limit)
.toList();
return CompletableFuture.completedFuture(streamObjectList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,22 @@ public long startOffset() {
if (offsetRanges == null || offsetRanges.isEmpty()) {
return S3StreamConstant.INVALID_OFFSET;
}
return offsetRanges.get(0).getStartOffset();
return offsetRanges.get(0).startOffset();
}

public long endOffset() {
if (offsetRanges == null || offsetRanges.isEmpty()) {
return S3StreamConstant.INVALID_OFFSET;
}
return offsetRanges.get(offsetRanges.size() - 1).getEndOffset();
return offsetRanges.get(offsetRanges.size() - 1).endOffset();
}

public boolean intersect(long streamId, long startOffset, long endOffset) {
if (offsetRanges == null || offsetRanges.isEmpty()) {
return false;
}
for (StreamOffsetRange offsetRange : offsetRanges) {
if (offsetRange.getStreamId() == streamId && offsetRange.intersect(startOffset, endOffset)) {
if (offsetRange.streamId() == streamId && offsetRange.intersect(startOffset, endOffset)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ public StreamOffsetRange(long streamId, long startOffset, long endOffset) {
this.endOffset = endOffset;
}

public long getStreamId() {
public long streamId() {
return streamId;
}

public long getStartOffset() {
public long startOffset() {
return startOffset;
}

public long getEndOffset() {
public long endOffset() {
return endOffset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ void testCommitAndCompact() {
List<StreamOffsetRange> ranges = streamSetMetadata.getOffsetRanges();
assertEquals(2, ranges.size());

assertEquals(0, ranges.get(0).getStreamId());
assertEquals(0, ranges.get(0).getStartOffset());
assertEquals(3, ranges.get(0).getEndOffset());
assertEquals(0, ranges.get(0).streamId());
assertEquals(0, ranges.get(0).startOffset());
assertEquals(3, ranges.get(0).endOffset());

assertEquals(1, ranges.get(1).getStreamId());
assertEquals(0, ranges.get(1).getStartOffset());
assertEquals(5, ranges.get(1).getEndOffset());
assertEquals(1, ranges.get(1).streamId());
assertEquals(0, ranges.get(1).startOffset());
assertEquals(5, ranges.get(1).endOffset());

List<S3ObjectMetadata> streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join();
assertEquals(1, streamObjectMetadataList.size());
ranges = streamObjectMetadataList.get(0).getOffsetRanges();
assertEquals(1, ranges.size());
assertEquals(2, ranges.get(0).getStreamId());
assertEquals(0, ranges.get(0).getStartOffset());
assertEquals(10, ranges.get(0).getEndOffset());
assertEquals(2, ranges.get(0).streamId());
assertEquals(0, ranges.get(0).startOffset());
assertEquals(10, ranges.get(0).endOffset());

streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 20, 100).join();
assertEquals(2, streamObjectMetadataList.size());
Expand Down Expand Up @@ -153,9 +153,9 @@ void testCommitAndCompact() {
assertEquals(1, streamObjectMetadataList.size());
ranges = streamObjectMetadataList.get(0).getOffsetRanges();
assertEquals(1, ranges.size());
assertEquals(2, ranges.get(0).getStreamId());
assertEquals(0, ranges.get(0).getStartOffset());
assertEquals(20, ranges.get(0).getEndOffset());
assertEquals(2, ranges.get(0).streamId());
assertEquals(0, ranges.get(0).startOffset());
assertEquals(20, ranges.get(0).endOffset());
}

@Test
Expand Down

0 comments on commit d578c12

Please sign in to comment.