Skip to content

Commit

Permalink
refactor: rename SST to stream set object (#435)
Browse files Browse the repository at this point in the history
* refactor: adaptor s3stream 0.3.0

Signed-off-by: Robin Han <[email protected]>

* refactor: rename config sst to stream set object

Signed-off-by: Robin Han <[email protected]>

* refactor: rename remaining sst to stream set object

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Nov 10, 2023
1 parent 8d33291 commit 3bb0ac2
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 154 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public static Config to(KafkaConfig s) {
.streamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes())
.controllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount())
.controllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs())
.sstCompactionInterval(s.s3SSTCompactionInterval())
.sstCompactionCacheSize(s.s3SSTCompactionCacheSize())
.maxStreamNumPerSST(s.s3MaxStreamNumPerSST())
.streamSetObjectCompactionInterval(s.s3StreamSetObjectCompactionInterval())
.streamSetObjectCompactionCacheSize(s.s3StreamSetObjectCompactionCacheSize())
.maxStreamNumPerStreamSetObject(s.s3MaxStreamNumPerStreamSetObject())
.maxStreamObjectNumPerCommit(s.s3MaxStreamObjectNumPerCommit())
.sstCompactionStreamSplitSize(s.s3SSTCompactionStreamSplitSize())
.sstCompactionForceSplitPeriod(s.s3SSTCompactionForceSplitMinutes())
.sstCompactionMaxObjectNum(s.s3SSTCompactionMaxObjectNum())
.streamSetObjectCompactionStreamSplitSize(s.s3StreamSetObjectCompactionStreamSplitSize())
.streamSetObjectCompactionForceSplitPeriod(s.s3StreamSetObjectCompactionForceSplitMinutes())
.streamSetObjectCompactionMaxObjectNum(s.s3StreamSetObjectCompactionMaxObjectNum())
.mockEnable(s.s3MockEnable())
.objectLogEnable(s.s3ObjectLogEnable())
.networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) {
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator);
this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator);
// stream object compactions share the same s3Operator with SST object compactions
// stream object compactions share the same s3Operator with stream set object compactions
this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionS3Operator, this.config, networkInboundLimiter, networkOutboundLimiter);
this.kvClient = new ControllerKVClient(this.requestSender);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) {
}
}

public CompletableFuture<List<S3ObjectMetadata>> getSSTObjects() {
public CompletableFuture<List<S3ObjectMetadata>> getStreamSetObjects() {
synchronized (this) {
List<S3ObjectMetadata> s3ObjectMetadataList = this.streamsImage.getSSTObjects(config.brokerId()).stream()
List<S3ObjectMetadata> s3ObjectMetadataList = this.streamsImage.getStreamSetObjects(config.brokerId()).stream()
.map(object -> {
S3Object s3Object = this.objectsImage.getObjectMetadata(object.objectId());
return new S3ObjectMetadata(object.objectId(), object.objectType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
package kafka.log.stream.s3.objects;


import com.automq.stream.s3.objects.CommitStreamObjectRequest;
import com.automq.stream.s3.objects.CommitSSTObjectRequest;
import com.automq.stream.s3.objects.CommitSSTObjectResponse;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.CommitStreamSetObjectResponse;
import com.automq.stream.s3.objects.CompactStreamObjectRequest;
import com.automq.stream.s3.objects.ObjectManager;
import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.log.stream.s3.network.ControllerRequestSender;
import kafka.log.stream.s3.network.ControllerRequestSender.RequestTask;
import kafka.log.stream.s3.network.ControllerRequestSender.ResponseHandleResult;
import kafka.log.stream.s3.network.request.WrapRequest;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.message.CommitStreamSetObjectRequestData;
import org.apache.kafka.common.message.CommitStreamSetObjectResponseData;
import org.apache.kafka.common.message.CommitStreamObjectRequestData;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.message.CommitSSTObjectRequestData;
import org.apache.kafka.common.message.CommitSSTObjectResponseData;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -41,7 +42,6 @@
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;
import org.apache.kafka.metadata.stream.InRangeObjects;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -71,9 +71,9 @@ public ControllerObjectManager(ControllerRequestSender requestSender, StreamMeta
@Override
public CompletableFuture<Long> prepareObject(int count, long ttl) {
PrepareS3ObjectRequestData request = new PrepareS3ObjectRequestData()
.setNodeId(nodeId)
.setPreparedCount(count)
.setTimeToLiveInMs(ttl);
.setNodeId(nodeId)
.setPreparedCount(count)
.setTimeToLiveInMs(ttl);
WrapRequest req = new WrapRequest() {
@Override
public ApiKeys apiKey() {
Expand Down Expand Up @@ -101,38 +101,38 @@ public Builder toRequestBuilder() {
}

@Override
public CompletableFuture<CommitSSTObjectResponse> commitSSTObject(CommitSSTObjectRequest commitSSTObjectRequest) {
CommitSSTObjectRequestData request = new CommitSSTObjectRequestData()
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch)
.setOrderId(commitSSTObjectRequest.getOrderId())
.setObjectId(commitSSTObjectRequest.getObjectId())
.setObjectSize(commitSSTObjectRequest.getObjectSize())
.setObjectStreamRanges(commitSSTObjectRequest.getStreamRanges()
.stream()
.map(Convertor::toObjectStreamRangeInRequest).collect(Collectors.toList()))
.setStreamObjects(commitSSTObjectRequest.getStreamObjects()
.stream()
.map(Convertor::toStreamObjectInRequest).collect(Collectors.toList()))
.setCompactedObjectIds(commitSSTObjectRequest.getCompactedObjectIds());
public CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(CommitStreamSetObjectRequest commitStreamSetObjectRequest) {
CommitStreamSetObjectRequestData request = new CommitStreamSetObjectRequestData()
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch)
.setOrderId(commitStreamSetObjectRequest.getOrderId())
.setObjectId(commitStreamSetObjectRequest.getObjectId())
.setObjectSize(commitStreamSetObjectRequest.getObjectSize())
.setObjectStreamRanges(commitStreamSetObjectRequest.getStreamRanges()
.stream()
.map(Convertor::toObjectStreamRangeInRequest).collect(Collectors.toList()))
.setStreamObjects(commitStreamSetObjectRequest.getStreamObjects()
.stream()
.map(Convertor::toStreamObjectInRequest).collect(Collectors.toList()))
.setCompactedObjectIds(commitStreamSetObjectRequest.getCompactedObjectIds());
WrapRequest req = new WrapRequest() {
@Override
public ApiKeys apiKey() {
return ApiKeys.COMMIT_SST_OBJECT;
return ApiKeys.COMMIT_STREAM_SET_OBJECT;
}

@Override
public Builder toRequestBuilder() {
return new org.apache.kafka.common.requests.s3.CommitSSTObjectRequest.Builder(request);
return new org.apache.kafka.common.requests.s3.CommitStreamSetObjectRequest.Builder(request);
}
};
CompletableFuture<CommitSSTObjectResponse> future = new CompletableFuture<>();
RequestTask<org.apache.kafka.common.requests.s3.CommitSSTObjectResponse, CommitSSTObjectResponse> task = new RequestTask<>(req, future, response -> {
CommitSSTObjectResponseData resp = response.data();
CompletableFuture<CommitStreamSetObjectResponse> future = new CompletableFuture<>();
RequestTask<org.apache.kafka.common.requests.s3.CommitStreamSetObjectResponse, CommitStreamSetObjectResponse> task = new RequestTask<>(req, future, response -> {
CommitStreamSetObjectResponseData resp = response.data();
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
return ResponseHandleResult.withSuccess(new CommitSSTObjectResponse());
return ResponseHandleResult.withSuccess(new CommitStreamSetObjectResponse());
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
Expand All @@ -141,25 +141,24 @@ public Builder toRequestBuilder() {
case COMPACTED_OBJECTS_NOT_FOUND:
throw code.exception();
default:
LOGGER.error("Error while committing SST object: {}, code: {}, retry later", request, code);
LOGGER.error("Error while committing stream set object: {}, code: {}, retry later", request, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}

@Override
public CompletableFuture<Void> commitStreamObject(CommitStreamObjectRequest commitStreamObjectRequest) {
public CompletableFuture<Void> compactStreamObject(CompactStreamObjectRequest compactStreamObjectRequest) {
CommitStreamObjectRequestData request = new CommitStreamObjectRequestData()
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch)
.setObjectId(commitStreamObjectRequest.getObjectId())
.setObjectSize(commitStreamObjectRequest.getObjectSize())
.setStreamId(commitStreamObjectRequest.getStreamId())
.setStartOffset(commitStreamObjectRequest.getStartOffset())
.setEndOffset(commitStreamObjectRequest.getEndOffset())
.setSourceObjectIds(commitStreamObjectRequest.getSourceObjectIds());
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch)
.setObjectId(compactStreamObjectRequest.getObjectId())
.setObjectSize(compactStreamObjectRequest.getObjectSize())
.setStreamId(compactStreamObjectRequest.getStreamId())
.setStartOffset(compactStreamObjectRequest.getStartOffset())
.setEndOffset(compactStreamObjectRequest.getEndOffset())
.setSourceObjectIds(compactStreamObjectRequest.getSourceObjectIds());
WrapRequest req = new WrapRequest() {
@Override
public ApiKeys apiKey() {
Expand Down Expand Up @@ -207,7 +206,7 @@ public CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long
@Override
public CompletableFuture<List<S3ObjectMetadata>> getServerObjects() {
try {
return this.metadataManager.getSSTObjects();
return this.metadataManager.getStreamSetObjects();
} catch (Exception e) {
LOGGER.error("Error while get server objects", e);
return CompletableFuture.completedFuture(Collections.emptyList());
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@

import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
import org.apache.kafka.common.message.CommitSSTObjectRequestData;
import org.apache.kafka.common.message.CommitStreamSetObjectRequestData;

public class Convertor {

public static CommitSSTObjectRequestData.StreamObject toStreamObjectInRequest(StreamObject s) {
return new CommitSSTObjectRequestData.StreamObject()
public static CommitStreamSetObjectRequestData.StreamObject toStreamObjectInRequest(StreamObject s) {
return new CommitStreamSetObjectRequestData.StreamObject()
.setStreamId(s.getStreamId())
.setObjectId(s.getObjectId())
.setObjectSize(s.getObjectSize())
.setStartOffset(s.getStartOffset())
.setEndOffset(s.getEndOffset());
}

public static CommitSSTObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest(ObjectStreamRange s) {
return new CommitSSTObjectRequestData.ObjectStreamRange()
public static CommitStreamSetObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest(ObjectStreamRange s) {
return new CommitStreamSetObjectRequestData.ObjectStreamRange()
.setStreamId(s.getStreamId())
.setStartOffset(s.getStartOffset())
.setEndOffset(s.getEndOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import kafka.server.KafkaConfig;
import kafka.server.metadata.BrokerMetadataListener;
import kafka.server.metadata.KRaftMetadataCache;
import org.apache.kafka.image.NodeS3SSTMetadataImage;
import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.S3ObjectsImage;
Expand All @@ -35,7 +35,7 @@
import org.apache.kafka.metadata.stream.S3ObjectState;
import com.automq.stream.s3.metadata.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3SSTObject;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.metadata.StreamState;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -106,10 +106,10 @@ public void setUp() {
0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects);

NodeS3SSTMetadataImage walMetadataImage0 = new NodeS3SSTMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of(
1L, new S3SSTObject(1L, BROKER0, Map.of(
NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of(
1L, new S3StreamSetObject(1L, BROKER0, Map.of(
STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
2L, new S3SSTObject(2L, BROKER0, Map.of(
2L, new S3StreamSetObject(2L, BROKER0, Map.of(
STREAM2, new StreamOffsetRange(STREAM2, 0L, 100L)), 2L)));

S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Expand All @@ -122,7 +122,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS));
streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects);
streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, NodeS3SSTMetadataImage.EMPTY));
Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null);

ranges = new HashMap<>(ranges);
Expand All @@ -131,7 +131,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS));
streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects);
streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, NodeS3SSTMetadataImage.EMPTY));
Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public ControllerResult<Errors> commitObject(long objectId, long objectSize, lon
}
S3Object object = this.objectsMetadata.get(objectId);
if (object == null) {
log.error("object {} not exist when commit SST object", objectId);
log.error("object {} not exist when commit stream set object", objectId);
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
// verify the state
Expand Down
Loading

0 comments on commit 3bb0ac2

Please sign in to comment.