diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index cdfaca1621..92d77f1f08 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -46,13 +46,13 @@ public static Config to(KafkaConfig s) { .streamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes()) .controllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount()) .controllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs()) - .sstCompactionInterval(s.s3SSTCompactionInterval()) - .sstCompactionCacheSize(s.s3SSTCompactionCacheSize()) - .maxStreamNumPerSST(s.s3MaxStreamNumPerSST()) + .streamSetObjectCompactionInterval(s.s3StreamSetObjectCompactionInterval()) + .streamSetObjectCompactionCacheSize(s.s3StreamSetObjectCompactionCacheSize()) + .maxStreamNumPerStreamSetObject(s.s3MaxStreamNumPerStreamSetObject()) .maxStreamObjectNumPerCommit(s.s3MaxStreamObjectNumPerCommit()) - .sstCompactionStreamSplitSize(s.s3SSTCompactionStreamSplitSize()) - .sstCompactionForceSplitPeriod(s.s3SSTCompactionForceSplitMinutes()) - .sstCompactionMaxObjectNum(s.s3SSTCompactionMaxObjectNum()) + .streamSetObjectCompactionStreamSplitSize(s.s3StreamSetObjectCompactionStreamSplitSize()) + .streamSetObjectCompactionForceSplitPeriod(s.s3StreamSetObjectCompactionForceSplitMinutes()) + .streamSetObjectCompactionMaxObjectNum(s.s3StreamSetObjectCompactionMaxObjectNum()) .mockEnable(s.s3MockEnable()) .objectLogEnable(s.s3ObjectLogEnable()) .networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp()) diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 906fea8319..0a72522890 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -96,7 +96,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator); this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build(); this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator); - // stream object compactions share the same s3Operator with SST object compactions + // stream object compactions share the same s3Operator with stream set object compactions this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionS3Operator, this.config, networkInboundLimiter, networkOutboundLimiter); this.kvClient = new ControllerKVClient(this.requestSender); } diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index 964473d9b7..8bb850265f 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -101,9 +101,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { } } - public CompletableFuture> getSSTObjects() { + public CompletableFuture> getStreamSetObjects() { synchronized (this) { - List s3ObjectMetadataList = this.streamsImage.getSSTObjects(config.brokerId()).stream() + List s3ObjectMetadataList = this.streamsImage.getStreamSetObjects(config.brokerId()).stream() .map(object -> { S3Object s3Object = this.objectsImage.getObjectMetadata(object.objectId()); return new S3ObjectMetadata(object.objectId(), object.objectType(), diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java index 909787532d..b3f708b44c 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java @@ -18,9 +18,10 @@ package kafka.log.stream.s3.objects; -import com.automq.stream.s3.objects.CommitStreamObjectRequest; -import com.automq.stream.s3.objects.CommitSSTObjectRequest; -import com.automq.stream.s3.objects.CommitSSTObjectResponse; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.objects.CommitStreamSetObjectRequest; +import com.automq.stream.s3.objects.CommitStreamSetObjectResponse; +import com.automq.stream.s3.objects.CompactStreamObjectRequest; import com.automq.stream.s3.objects.ObjectManager; import kafka.log.stream.s3.metadata.StreamMetadataManager; import kafka.log.stream.s3.network.ControllerRequestSender; @@ -28,10 +29,10 @@ import kafka.log.stream.s3.network.ControllerRequestSender.ResponseHandleResult; import kafka.log.stream.s3.network.request.WrapRequest; import kafka.server.KafkaConfig; +import org.apache.kafka.common.message.CommitStreamSetObjectRequestData; +import org.apache.kafka.common.message.CommitStreamSetObjectResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitSSTObjectRequestData; -import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -41,7 +42,6 @@ import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse; import org.apache.kafka.metadata.stream.InRangeObjects; -import com.automq.stream.s3.metadata.S3ObjectMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,9 +71,9 @@ public ControllerObjectManager(ControllerRequestSender requestSender, StreamMeta @Override public CompletableFuture prepareObject(int count, long ttl) { PrepareS3ObjectRequestData request = new PrepareS3ObjectRequestData() - .setNodeId(nodeId) - .setPreparedCount(count) - .setTimeToLiveInMs(ttl); + .setNodeId(nodeId) + .setPreparedCount(count) + .setTimeToLiveInMs(ttl); WrapRequest req = new WrapRequest() { @Override public ApiKeys apiKey() { @@ -101,38 +101,38 @@ public Builder toRequestBuilder() { } @Override - public CompletableFuture commitSSTObject(CommitSSTObjectRequest commitSSTObjectRequest) { - CommitSSTObjectRequestData request = new CommitSSTObjectRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch) - .setOrderId(commitSSTObjectRequest.getOrderId()) - .setObjectId(commitSSTObjectRequest.getObjectId()) - .setObjectSize(commitSSTObjectRequest.getObjectSize()) - .setObjectStreamRanges(commitSSTObjectRequest.getStreamRanges() - .stream() - .map(Convertor::toObjectStreamRangeInRequest).collect(Collectors.toList())) - .setStreamObjects(commitSSTObjectRequest.getStreamObjects() - .stream() - .map(Convertor::toStreamObjectInRequest).collect(Collectors.toList())) - .setCompactedObjectIds(commitSSTObjectRequest.getCompactedObjectIds()); + public CompletableFuture commitStreamSetObject(CommitStreamSetObjectRequest commitStreamSetObjectRequest) { + CommitStreamSetObjectRequestData request = new CommitStreamSetObjectRequestData() + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch) + .setOrderId(commitStreamSetObjectRequest.getOrderId()) + .setObjectId(commitStreamSetObjectRequest.getObjectId()) + .setObjectSize(commitStreamSetObjectRequest.getObjectSize()) + .setObjectStreamRanges(commitStreamSetObjectRequest.getStreamRanges() + .stream() + .map(Convertor::toObjectStreamRangeInRequest).collect(Collectors.toList())) + .setStreamObjects(commitStreamSetObjectRequest.getStreamObjects() + .stream() + .map(Convertor::toStreamObjectInRequest).collect(Collectors.toList())) + .setCompactedObjectIds(commitStreamSetObjectRequest.getCompactedObjectIds()); WrapRequest req = new WrapRequest() { @Override public ApiKeys apiKey() { - return ApiKeys.COMMIT_SST_OBJECT; + return ApiKeys.COMMIT_STREAM_SET_OBJECT; } @Override public Builder toRequestBuilder() { - return new org.apache.kafka.common.requests.s3.CommitSSTObjectRequest.Builder(request); + return new org.apache.kafka.common.requests.s3.CommitStreamSetObjectRequest.Builder(request); } }; - CompletableFuture future = new CompletableFuture<>(); - RequestTask task = new RequestTask<>(req, future, response -> { - CommitSSTObjectResponseData resp = response.data(); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + CommitStreamSetObjectResponseData resp = response.data(); Errors code = Errors.forCode(resp.errorCode()); switch (code) { case NONE: - return ResponseHandleResult.withSuccess(new CommitSSTObjectResponse()); + return ResponseHandleResult.withSuccess(new CommitStreamSetObjectResponse()); case NODE_EPOCH_EXPIRED: case NODE_EPOCH_NOT_EXIST: LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode())); @@ -141,7 +141,7 @@ public Builder toRequestBuilder() { case COMPACTED_OBJECTS_NOT_FOUND: throw code.exception(); default: - LOGGER.error("Error while committing SST object: {}, code: {}, retry later", request, code); + LOGGER.error("Error while committing stream set object: {}, code: {}, retry later", request, code); return ResponseHandleResult.withRetry(); } }); @@ -149,17 +149,16 @@ public Builder toRequestBuilder() { return future; } - @Override - public CompletableFuture commitStreamObject(CommitStreamObjectRequest commitStreamObjectRequest) { + public CompletableFuture compactStreamObject(CompactStreamObjectRequest compactStreamObjectRequest) { CommitStreamObjectRequestData request = new CommitStreamObjectRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch) - .setObjectId(commitStreamObjectRequest.getObjectId()) - .setObjectSize(commitStreamObjectRequest.getObjectSize()) - .setStreamId(commitStreamObjectRequest.getStreamId()) - .setStartOffset(commitStreamObjectRequest.getStartOffset()) - .setEndOffset(commitStreamObjectRequest.getEndOffset()) - .setSourceObjectIds(commitStreamObjectRequest.getSourceObjectIds()); + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch) + .setObjectId(compactStreamObjectRequest.getObjectId()) + .setObjectSize(compactStreamObjectRequest.getObjectSize()) + .setStreamId(compactStreamObjectRequest.getStreamId()) + .setStartOffset(compactStreamObjectRequest.getStartOffset()) + .setEndOffset(compactStreamObjectRequest.getEndOffset()) + .setSourceObjectIds(compactStreamObjectRequest.getSourceObjectIds()); WrapRequest req = new WrapRequest() { @Override public ApiKeys apiKey() { @@ -207,7 +206,7 @@ public CompletableFuture> getObjects(long streamId, long @Override public CompletableFuture> getServerObjects() { try { - return this.metadataManager.getSSTObjects(); + return this.metadataManager.getStreamSetObjects(); } catch (Exception e) { LOGGER.error("Error while get server objects", e); return CompletableFuture.completedFuture(Collections.emptyList()); diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java b/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java index c785645800..2afe72a607 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java @@ -19,12 +19,12 @@ import com.automq.stream.s3.objects.ObjectStreamRange; import com.automq.stream.s3.objects.StreamObject; -import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitStreamSetObjectRequestData; public class Convertor { - public static CommitSSTObjectRequestData.StreamObject toStreamObjectInRequest(StreamObject s) { - return new CommitSSTObjectRequestData.StreamObject() + public static CommitStreamSetObjectRequestData.StreamObject toStreamObjectInRequest(StreamObject s) { + return new CommitStreamSetObjectRequestData.StreamObject() .setStreamId(s.getStreamId()) .setObjectId(s.getObjectId()) .setObjectSize(s.getObjectSize()) @@ -32,8 +32,8 @@ public static CommitSSTObjectRequestData.StreamObject toStreamObjectInRequest(St .setEndOffset(s.getEndOffset()); } - public static CommitSSTObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest(ObjectStreamRange s) { - return new CommitSSTObjectRequestData.ObjectStreamRange() + public static CommitStreamSetObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest(ObjectStreamRange s) { + return new CommitStreamSetObjectRequestData.ObjectStreamRange() .setStreamId(s.getStreamId()) .setStartOffset(s.getStartOffset()) .setEndOffset(s.getEndOffset()); diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java index 04125a2b7a..35b7c5bf6c 100644 --- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java @@ -23,7 +23,7 @@ import kafka.server.KafkaConfig; import kafka.server.metadata.BrokerMetadataListener; import kafka.server.metadata.KRaftMetadataCache; -import org.apache.kafka.image.NodeS3SSTMetadataImage; +import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.S3ObjectsImage; @@ -35,7 +35,7 @@ import org.apache.kafka.metadata.stream.S3ObjectState; import com.automq.stream.s3.metadata.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3SSTObject; +import org.apache.kafka.metadata.stream.S3StreamSetObject; import com.automq.stream.s3.metadata.StreamOffsetRange; import com.automq.stream.s3.metadata.StreamState; import org.junit.jupiter.api.BeforeEach; @@ -106,10 +106,10 @@ public void setUp() { 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); - NodeS3SSTMetadataImage walMetadataImage0 = new NodeS3SSTMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of( - 1L, new S3SSTObject(1L, BROKER0, Map.of( + NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of( + 1L, new S3StreamSetObject(1L, BROKER0, Map.of( STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), - 2L, new S3SSTObject(2L, BROKER0, Map.of( + 2L, new S3StreamSetObject(2L, BROKER0, Map.of( STREAM2, new StreamOffsetRange(STREAM2, 0L, 100L)), 2L))); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), @@ -122,7 +122,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, NodeS3SSTMetadataImage.EMPTY)); + Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null); ranges = new HashMap<>(ranges); @@ -131,7 +131,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, NodeS3SSTMetadataImage.EMPTY)); + Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index d91b350491..23f323658a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -172,7 +172,7 @@ public ControllerResult commitObject(long objectId, long objectSize, lon } S3Object object = this.objectsMetadata.get(objectId); if (object == null) { - log.error("object {} not exist when commit SST object", objectId); + log.error("object {} not exist when commit stream set object", objectId); return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); } // verify the state diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 1f03da5a65..dd4ea38a32 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -25,10 +25,10 @@ import org.apache.kafka.common.message.CloseStreamsResponseData.CloseStreamResponse; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitSSTObjectRequestData; -import org.apache.kafka.common.message.CommitSSTObjectRequestData.ObjectStreamRange; -import org.apache.kafka.common.message.CommitSSTObjectRequestData.StreamObject; -import org.apache.kafka.common.message.CommitSSTObjectResponseData; +import org.apache.kafka.common.message.CommitStreamSetObjectRequestData; +import org.apache.kafka.common.message.CommitStreamSetObjectRequestData.ObjectStreamRange; +import org.apache.kafka.common.message.CommitStreamSetObjectRequestData.StreamObject; +import org.apache.kafka.common.message.CommitStreamSetObjectResponseData; import org.apache.kafka.common.message.CreateStreamsRequestData.CreateStreamRequest; import org.apache.kafka.common.message.CreateStreamsResponseData.CreateStreamResponse; import org.apache.kafka.common.message.DeleteStreamsRequestData.DeleteStreamRequest; @@ -47,18 +47,18 @@ import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; -import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; +import org.apache.kafka.common.metadata.RemoveStreamSetObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.common.metadata.S3SSTObjectRecord; -import org.apache.kafka.common.metadata.S3SSTObjectRecord.StreamIndex; +import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; +import org.apache.kafka.common.metadata.S3StreamSetObjectRecord.StreamIndex; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.Convertor; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3SSTObject; +import org.apache.kafka.metadata.stream.S3StreamSetObject; import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -153,17 +153,17 @@ public String toString() { } } - public static class NodeS3SSTMetadata { + public static class NodeS3StreamSetObjectMetadata { private final int nodeId; private final TimelineLong nodeEpoch; - private final TimelineHashMap sstObjects; + private final TimelineHashMap streamSetObjects; - public NodeS3SSTMetadata(int nodeId, long nodeEpoch, SnapshotRegistry registry) { + public NodeS3StreamSetObjectMetadata(int nodeId, long nodeEpoch, SnapshotRegistry registry) { this.nodeId = nodeId; this.nodeEpoch = new TimelineLong(registry); this.nodeEpoch.set(nodeEpoch); - this.sstObjects = new TimelineHashMap<>(registry, 0); + this.streamSetObjects = new TimelineHashMap<>(registry, 0); } public int getNodeId() { @@ -174,16 +174,16 @@ public long getNodeEpoch() { return nodeEpoch.get(); } - public TimelineHashMap sstObjects() { - return sstObjects; + public TimelineHashMap streamSetObjects() { + return streamSetObjects; } @Override public String toString() { - return "NodeS3SSTMetadata{" + + return "NodeS3StreamSetObjectMetadata{" + "nodeId=" + nodeId + ", nodeEpoch=" + nodeEpoch + - ", sstObjects=" + sstObjects + + ", streamSetObjects=" + streamSetObjects + '}'; } } @@ -201,7 +201,7 @@ public String toString() { private final TimelineHashMap streamsMetadata; - private final TimelineHashMap nodesMetadata; + private final TimelineHashMap nodesMetadata; public StreamControlManager( SnapshotRegistry snapshotRegistry, @@ -551,40 +551,40 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); } - // remove SST object or remove stream range in SST object + // remove stream set object or remove stream range in stream set object // TODO: optimize this.nodesMetadata.values() .stream() - .flatMap(entry -> entry.sstObjects.values().stream()) - .filter(sstObject -> sstObject.offsetRanges().containsKey(streamId)) - .filter(sstObject -> sstObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset) - .forEach(sstObj -> { - if (sstObj.offsetRanges().size() == 1) { - // only this range, but we will remove this range, so now we can remove this SST object + .flatMap(entry -> entry.streamSetObjects.values().stream()) + .filter(streamSetObject -> streamSetObject.offsetRanges().containsKey(streamId)) + .filter(streamSetObject -> streamSetObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset) + .forEach(streamSetObj -> { + if (streamSetObj.offsetRanges().size() == 1) { + // only this range, but we will remove this range, so now we can remove this stream set object records.add(new ApiMessageAndVersion( - new RemoveSSTObjectRecord() - .setNodeId(sstObj.nodeId()) - .setObjectId(sstObj.objectId()), (short) 0 + new RemoveStreamSetObjectRecord() + .setNodeId(streamSetObj.nodeId()) + .setObjectId(streamSetObj.objectId()), (short) 0 )); ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects( - List.of(sstObj.objectId())); + List.of(streamSetObj.objectId())); if (!markDestroyResult.response()) { - log.error("[TrimStream] Mark destroy SST object: {} failed", sstObj.objectId()); + log.error("[TrimStream] Mark destroy stream set object: {} failed", streamSetObj.objectId()); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return; } records.addAll(markDestroyResult.records()); return; } - Map newOffsetRange = new HashMap<>(sstObj.offsetRanges()); + Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges()); // remove offset range newOffsetRange.remove(streamId); - records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() - .setObjectId(sstObj.objectId()) - .setNodeId(sstObj.nodeId()) + records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord() + .setObjectId(streamSetObj.objectId()) + .setNodeId(streamSetObj.nodeId()) .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) - .setDataTimeInMs(sstObj.dataTimeInMs()) - .setOrderId(sstObj.orderId()), (short) 0)); + .setDataTimeInMs(streamSetObj.dataTimeInMs()) + .setOrderId(streamSetObj.orderId()), (short) 0)); }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); @@ -629,38 +629,38 @@ public ControllerResult deleteStream(int nodeId, long node return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(markDestroyResult.records()); - // remove SST object or remove stream-offset-range in SST object + // remove stream set object or remove stream-offset-range in stream set object this.nodesMetadata.values() .stream() - .flatMap(entry -> entry.sstObjects.values().stream()) - .filter(sstObject -> sstObject.offsetRanges().containsKey(streamId)) - .forEach(sstObj -> { - if (sstObj.offsetRanges().size() == 1) { - // only this range, but we will remove this range, so now we can remove this SST object + .flatMap(entry -> entry.streamSetObjects.values().stream()) + .filter(streamsSetObject -> streamsSetObject.offsetRanges().containsKey(streamId)) + .forEach(streamSetObj -> { + if (streamSetObj.offsetRanges().size() == 1) { + // only this range, but we will remove this range, so now we can remove this stream set object records.add(new ApiMessageAndVersion( - new RemoveSSTObjectRecord() - .setNodeId(sstObj.nodeId()) - .setObjectId(sstObj.objectId()), (short) 0 + new RemoveStreamSetObjectRecord() + .setNodeId(streamSetObj.nodeId()) + .setObjectId(streamSetObj.objectId()), (short) 0 )); ControllerResult result = this.s3ObjectControlManager.markDestroyObjects( - List.of(sstObj.objectId())); + List.of(streamSetObj.objectId())); if (!result.response()) { - log.error("[DeleteStream]: Mark destroy SST object: {} failed", sstObj.objectId()); + log.error("[DeleteStream]: Mark destroy stream set object: {} failed", streamSetObj.objectId()); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return; } records.addAll(result.records()); return; } - Map newOffsetRange = new HashMap<>(sstObj.offsetRanges()); + Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges()); // remove offset range newOffsetRange.remove(streamId); - records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() - .setObjectId(sstObj.objectId()) - .setNodeId(sstObj.nodeId()) + records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord() + .setObjectId(streamSetObj.objectId()) + .setNodeId(streamSetObj.nodeId()) .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) - .setDataTimeInMs(sstObj.dataTimeInMs()) - .setOrderId(sstObj.orderId()), (short) 0)); + .setDataTimeInMs(streamSetObj.dataTimeInMs()) + .setOrderId(streamSetObj.orderId()), (short) 0)); }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); @@ -670,14 +670,14 @@ public ControllerResult deleteStream(int nodeId, long node } /** - * Commit SST object. + * Commit stream set object. *

* Response Errors Enum: *

    *
  • * OBJECT_NOT_EXIST *
      - *
    1. SST object not exist when commit
    2. + *
    3. stream set object not exist when commit
    4. *
    5. stream object not exist when commit
    6. *
    *
  • @@ -690,8 +690,8 @@ public ControllerResult deleteStream(int nodeId, long node *
*/ @SuppressWarnings("all") - public ControllerResult commitSSTObject(CommitSSTObjectRequestData data) { - CommitSSTObjectResponseData resp = new CommitSSTObjectResponseData(); + public ControllerResult commitStreamSetObject(CommitStreamSetObjectRequestData data) { + CommitStreamSetObjectResponseData resp = new CommitStreamSetObjectResponseData(); long objectId = data.objectId(); int nodeId = data.nodeId(); long nodeEpoch = data.nodeEpoch(); @@ -702,7 +702,7 @@ public ControllerResult commitSSTObject(CommitSSTOb Errors nodeEpochCheckResult = nodeEpochCheck(nodeId, nodeEpoch); if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); - log.warn("[CommitSSTObject] nodeId={}'s epoch={} check failed, code: {}", + log.warn("[CommitStreamSetObject] nodeId={}'s epoch={} check failed, code: {}", nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -724,7 +724,7 @@ public ControllerResult commitSSTObject(CommitSSTOb .collect(Collectors.toList()); Errors continuityCheckResult = streamAdvanceCheck(offsetRanges, data.nodeId()); if (continuityCheckResult != Errors.NONE) { - log.error("[CommitSSTObject] streamId={} advance check failed, error: {}", offsetRanges, continuityCheckResult); + log.error("[CommitStreamSetObject] streamId={} advance check failed, error: {}", offsetRanges, continuityCheckResult); resp.setErrorCode(continuityCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -733,13 +733,13 @@ public ControllerResult commitSSTObject(CommitSSTOb // commit object ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize, committedTs); if (commitResult.response() == Errors.OBJECT_NOT_EXIST) { - log.error("[CommitSSTObject] object={} not exist when commit SST object", objectId); + log.error("[CommitStreamSetObject] object={} not exist when commit stream set object", objectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); return ControllerResult.of(Collections.emptyList(), resp); } if (commitResult.response() == Errors.REDUNDANT_OPERATION) { // regard it as redundant commit operation, just return success - log.warn("[CommitSSTObject] object={} already committed", objectId); + log.warn("[CommitStreamSetObject] object={} already committed", objectId); return ControllerResult.of(Collections.emptyList(), resp); } List records = new ArrayList<>(commitResult.records()); @@ -748,7 +748,7 @@ public ControllerResult commitSSTObject(CommitSSTOb if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(compactedObjectIds); if (!destroyResult.response()) { - log.error("[CommitSSTObject]: Mark destroy compacted objects: {} failed", compactedObjectIds); + log.error("[CommitStreamSetObject]: Mark destroy compacted objects: {} failed", compactedObjectIds); resp.setErrorCode(Errors.COMPACTED_OBJECTS_NOT_FOUND.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -756,32 +756,32 @@ public ControllerResult commitSSTObject(CommitSSTOb // update dataTs to the min compacted object's dataTs //noinspection OptionalGetWithoutIsPresent dataTs = compactedObjectIds.stream() - .map(id -> this.nodesMetadata.get(nodeId).sstObjects.get(id)) - .map(S3SSTObject::dataTimeInMs) + .map(id -> this.nodesMetadata.get(nodeId).streamSetObjects.get(id)) + .map(S3StreamSetObject::dataTimeInMs) .min(Long::compareTo).get(); } List indexes = streamRanges.stream() .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); - // update node's SST object - NodeS3SSTMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + // update node's stream set object + NodeS3StreamSetObjectMetadata nodeMetadata = this.nodesMetadata.get(nodeId); if (nodeMetadata == null) { - // first time commit SST object, generate node's metadata record + // first time commit stream set object, generate node's metadata record records.add(new ApiMessageAndVersion(new NodeWALMetadataRecord() .setNodeId(nodeId), (short) 0)); } if (objectId != NOOP_OBJECT_ID) { - // generate node's SST object record + // generate node's stream set object record List streamIndexes = indexes.stream() .map(Convertor::to) .collect(Collectors.toList()); - S3SSTObjectRecord S3SSTObjectRecord = new S3SSTObjectRecord() + S3StreamSetObjectRecord S3StreamSetObjectRecord = new S3StreamSetObjectRecord() .setObjectId(objectId) .setDataTimeInMs(dataTs) .setOrderId(orderId) .setNodeId(nodeId) .setStreamsIndex(streamIndexes); - records.add(new ApiMessageAndVersion(S3SSTObjectRecord, (short) 0)); + records.add(new ApiMessageAndVersion(S3StreamSetObjectRecord, (short) 0)); } // commit stream objects if (streamObjects != null && !streamObjects.isEmpty()) { @@ -790,7 +790,7 @@ public ControllerResult commitSSTObject(CommitSSTOb ControllerResult streamObjectCommitResult = this.s3ObjectControlManager.commitObject(streamObject.objectId(), streamObject.objectSize(), committedTs); if (streamObjectCommitResult.response() != Errors.NONE) { - log.error("[CommitSSTObject]: stream object={} not exist when commit SST object: {}", streamObject.objectId(), objectId); + log.error("[CommitStreamSetObject]: stream object={} not exist when commit stream set object: {}", streamObject.objectId(), objectId); resp.setErrorCode(streamObjectCommitResult.response().code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -806,11 +806,11 @@ public ControllerResult commitSSTObject(CommitSSTOb } // generate compacted objects' remove record if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { - compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveSSTObjectRecord() + compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveStreamSetObjectRecord() .setNodeId(nodeId) .setObjectId(id), (short) 0))); } - log.info("[CommitSSTObject]: nodeId={} commit SST object: {} success, compacted objects: {}, SST stream range: {}, stream objects: {}", + log.info("[CommitStreamSetObject]: nodeId={} commit object: {} success, compacted objects: {}, stream range: {}, stream objects: {}", nodeId, objectId, compactedObjectIds, data.objectStreamRanges(), streamObjects); return ControllerResult.atomicOf(records, resp); } @@ -1110,32 +1110,32 @@ public void replay(NodeWALMetadataRecord record) { long nodeEpoch = record.nodeEpoch(); // already exist, update the node's self metadata if (this.nodesMetadata.containsKey(nodeId)) { - NodeS3SSTMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + NodeS3StreamSetObjectMetadata nodeMetadata = this.nodesMetadata.get(nodeId); nodeMetadata.nodeEpoch.set(nodeEpoch); return; } // not exist, create a new node - this.nodesMetadata.put(nodeId, new NodeS3SSTMetadata(nodeId, nodeEpoch, this.snapshotRegistry)); + this.nodesMetadata.put(nodeId, new NodeS3StreamSetObjectMetadata(nodeId, nodeEpoch, this.snapshotRegistry)); } - public void replay(S3SSTObjectRecord record) { + public void replay(S3StreamSetObjectRecord record) { long objectId = record.objectId(); int nodeId = record.nodeId(); long orderId = record.orderId(); long dataTs = record.dataTimeInMs(); List streamIndexes = record.streamsIndex(); - NodeS3SSTMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + NodeS3StreamSetObjectMetadata nodeMetadata = this.nodesMetadata.get(nodeId); if (nodeMetadata == null) { // should not happen - log.error("nodeId={} not exist when replay SST object record {}", nodeId, record); + log.error("nodeId={} not exist when replay stream set object record {}", nodeId, record); return; } - // create SST object + // create stream set object Map indexMap = streamIndexes .stream() .collect(Collectors.toMap(StreamIndex::streamId, Convertor::to)); - nodeMetadata.sstObjects.put(objectId, new S3SSTObject(objectId, nodeId, indexMap, orderId, dataTs)); + nodeMetadata.streamSetObjects.put(objectId, new S3StreamSetObject(objectId, nodeId, indexMap, orderId, dataTs)); // update range record.streamsIndex().forEach(index -> { @@ -1143,36 +1143,36 @@ public void replay(S3SSTObjectRecord record) { S3StreamMetadata metadata = this.streamsMetadata.get(streamId); if (metadata == null) { // ignore it - LOGGER.error("[REPLAY_SST_FAIL] cannot find streamId={} metadata", streamId); + LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} metadata", streamId); return; } RangeMetadata rangeMetadata = metadata.currentRangeMetadata(); if (rangeMetadata == null) { // ignore it - LOGGER.error("[REPLAY_SST_FAIL] cannot find streamId={} stream range metadata", streamId); + LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} stream range metadata", streamId); return; } if (rangeMetadata.endOffset() < index.startOffset()) { - LOGGER.error("[REPLAY_SST_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, + LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, rangeMetadata.endOffset(), index.startOffset()); return; } else if (rangeMetadata.endOffset() > index.startOffset()) { - // ignore it, the SST object is the compacted SST object. + // ignore it, the stream set object is the compacted stream set object. return; } rangeMetadata.setEndOffset(index.endOffset()); }); } - public void replay(RemoveSSTObjectRecord record) { + public void replay(RemoveStreamSetObjectRecord record) { long objectId = record.objectId(); - NodeS3SSTMetadata walMetadata = this.nodesMetadata.get(record.nodeId()); + NodeS3StreamSetObjectMetadata walMetadata = this.nodesMetadata.get(record.nodeId()); if (walMetadata == null) { // should not happen - log.error("node {} not exist when replay remove SST object record {}", record.nodeId(), record); + log.error("node {} not exist when replay remove stream set object record {}", record.nodeId(), record); return; } - walMetadata.sstObjects.remove(objectId); + walMetadata.streamSetObjects.remove(objectId); } public void replay(S3StreamObjectRecord record) { @@ -1192,15 +1192,15 @@ public void replay(S3StreamObjectRecord record) { // update range RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); if (rangeMetadata == null) { - LOGGER.error("[REPLAY_SST_FAIL] cannot find streamId={} stream range metadata", streamId); + LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} stream range metadata", streamId); return; } if (rangeMetadata.endOffset() < startOffset) { - LOGGER.error("[REPLAY_SST_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, + LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, rangeMetadata.endOffset(), startOffset); return; } else if (rangeMetadata.endOffset() > startOffset) { - // ignore it, the SST object compact and stream compact may generate this StreamObjectRecord. + // ignore it, the stream set object compact and stream compact may generate this StreamObjectRecord. return; } rangeMetadata.setEndOffset(endOffset); @@ -1228,7 +1228,7 @@ public Map streamsMetadata() { return streamsMetadata; } - public Map nodesMetadata() { + public Map nodesMetadata() { return nodesMetadata; }