Skip to content

Commit

Permalink
Merge pull request #27 from AutoMQ/add_assigned_record_replay
Browse files Browse the repository at this point in the history
feat(s3): generate and replay assigned record in controller
  • Loading branch information
superhx authored Aug 28, 2023
2 parents 436f347 + 9bfe6d0 commit de5411b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -44,6 +45,7 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

/**
Expand All @@ -69,7 +71,7 @@ public class S3ObjectControlManager {
/**
* The objectId of the next object to be prepared. (start from 0)
*/
private Long nextAssignedObjectId = 0L;
private TimelineLong nextAssignedObjectId;

private final Queue<Long/*objectId*/> preparedObjects;

Expand All @@ -93,6 +95,7 @@ public S3ObjectControlManager(
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
this.nextAssignedObjectId = new TimelineLong(snapshotRegistry);
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedBlockingDeque<>();
this.markDestroyedObjects = new LinkedBlockingDeque<>();
Expand All @@ -119,17 +122,23 @@ public void registerListener(S3ObjectLifeCycleListener listener) {
}

public Long nextAssignedObjectId() {
return nextAssignedObjectId;
return nextAssignedObjectId.get();
}

public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3ObjectRequestData request) {
// TODO: support batch prepare objects
// TODO: pre assigned a batch of objectIds in controller
List<ApiMessageAndVersion> records = new ArrayList<>();
PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData();
int count = request.preparedCount();
List<Long> prepareObjectIds = new ArrayList<>(count);

// update assigned stream id
long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));

for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId + i;
Long objectId = nextAssignedObjectId.get() + i;
prepareObjectIds.add(objectId);
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
Expand All @@ -141,7 +150,11 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setS3ObjectIds(prepareObjectIds);
return ControllerResult.of(records, response);
return ControllerResult.atomicOf(records, response);
}

public void replay(AssignedS3ObjectIdRecord record) {
nextAssignedObjectId.set(record.assignedS3ObjectId() + 1);
}

public void replay(S3ObjectRecord record) {
Expand All @@ -157,7 +170,6 @@ public void replay(S3ObjectRecord record) {
} else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
}
nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1);
}

public void replay(RemoveS3ObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamRecord;
Expand All @@ -51,6 +52,8 @@
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

/**
Expand All @@ -62,32 +65,35 @@ public class StreamControlManager {
// TODO: timeline check
public static class S3StreamMetadata {
// current epoch, when created but not open, use 0 represent
private long currentEpoch;
private TimelineLong currentEpoch;
// rangeIndex, when created but not open, there is no range, use -1 represent
private int currentRangeIndex = -1;
private long startOffset;
private TimelineInteger currentRangeIndex;
private TimelineLong startOffset;
private TimelineHashMap<Integer/*rangeIndex*/, RangeMetadata> ranges;
private TimelineHashSet<S3StreamObject> streamObjects;

public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset,
SnapshotRegistry registry) {
this.currentEpoch = currentEpoch;
this.currentRangeIndex = currentRangeIndex;
this.startOffset = startOffset;
this.currentEpoch = new TimelineLong(registry);
this.currentEpoch.set(currentEpoch);
this.currentRangeIndex = new TimelineInteger(registry);
this.currentRangeIndex.set(currentRangeIndex);
this.startOffset = new TimelineLong(registry);
this.startOffset.set(startOffset);
this.ranges = new TimelineHashMap<>(registry, 0);
this.streamObjects = new TimelineHashSet<>(registry, 0);
}

public long currentEpoch() {
return currentEpoch;
return currentEpoch.get();
}

public int currentRangeIndex() {
return currentRangeIndex;
return currentRangeIndex.get();
}

public long startOffset() {
return startOffset;
return startOffset.get();
}

public Map<Integer, RangeMetadata> ranges() {
Expand Down Expand Up @@ -145,7 +151,7 @@ public String toString() {
/**
* The next stream id to be assigned.
*/
private Long nextAssignedStreamId = 0L;
private final TimelineLong nextAssignedStreamId;

private final TimelineHashMap<Long/*streamId*/, S3StreamMetadata> streamsMetadata;

Expand All @@ -158,24 +164,27 @@ public StreamControlManager(
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(StreamControlManager.class);
this.s3ObjectControlManager = s3ObjectControlManager;
this.nextAssignedStreamId = new TimelineLong(snapshotRegistry);
this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
}

// TODO: refactor to return next offset of stream in response
// TODO: lazy update range's end offset
// TODO: controller allocate the stream id
public ControllerResult<CreateStreamResponseData> createStream(CreateStreamRequestData data) {
// TODO: pre assigned a batch of stream id in controller
CreateStreamResponseData resp = new CreateStreamResponseData();
long streamId = nextAssignedStreamId;
long streamId = nextAssignedStreamId.get();
// update assigned id
ApiMessageAndVersion record0 = new ApiMessageAndVersion(new AssignedStreamIdRecord()
.setAssignedStreamId(streamId), (short) 0);
// create stream
ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(0)
.setStartOffset(0L)
.setRangeIndex(-1), (short) 0);
resp.setStreamId(streamId);
return ControllerResult.of(Arrays.asList(record), resp);
return ControllerResult.atomicOf(Arrays.asList(record0, record), resp);
}

public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData data) {
Expand All @@ -190,37 +199,40 @@ public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData
}
// verify epoch match
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata.currentEpoch > epoch) {
if (streamMetadata.currentEpoch.get() > epoch) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
if (streamMetadata.currentEpoch == epoch) {
if (streamMetadata.currentEpoch.get() == epoch) {
// epoch equals, verify broker
RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex);
if (rangeMetadata == null || rangeMetadata.brokerId() != brokerId) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
if (rangeMetadata != null) {
if (rangeMetadata.brokerId() != brokerId) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
// epoch equals, broker equals, regard it as redundant open operation, just return success
resp.setStartOffset(streamMetadata.startOffset.get());
resp.setNextOffset(rangeMetadata.endOffset());
return ControllerResult.of(Collections.emptyList(), resp);
}
// epoch equals, broker equals, regard it as redundant open operation, just return success
resp.setStartOffset(streamMetadata.startOffset);
return ControllerResult.of(Collections.emptyList(), resp);
}
// now the request in valid, update the stream's epoch and create a new range for this broker
List<ApiMessageAndVersion> records = new ArrayList<>();
long newEpoch = epoch;
int newRangeIndex = streamMetadata.currentRangeIndex + 1;
int newRangeIndex = streamMetadata.currentRangeIndex.get() + 1;
// stream update record
records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex)
.setStartOffset(streamMetadata.startOffset), (short) 0));
.setStartOffset(streamMetadata.startOffset.get()), (short) 0));
// get new range's start offset
// default regard this range is the first range in stream, use 0 as start offset
long startOffset = 0;
if (newRangeIndex > 0) {
// means that the new range is not the first range in stream, get the last range's end offset
RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex);
RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
startOffset = lastRangeMetadata.endOffset();
}
// range create record
Expand All @@ -232,7 +244,8 @@ public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex), (short) 0));
resp.setStartOffset(startOffset);
return ControllerResult.of(records, resp);
resp.setNextOffset(startOffset);
return ControllerResult.atomicOf(records, resp);
}

public ControllerResult<CloseStreamResponseData> closeStream(CloseStreamRequestData data) {
Expand All @@ -255,22 +268,24 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
throw new UnsupportedOperationException();
}

public void replay(AssignedStreamIdRecord record) {
this.nextAssignedStreamId.set(record.assignedStreamId() + 1);
}

public void replay(S3StreamRecord record) {
long streamId = record.streamId();
// already exist, update the stream's self metadata
if (this.streamsMetadata.containsKey(streamId)) {
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
streamMetadata.startOffset = record.startOffset();
streamMetadata.currentEpoch = record.epoch();
streamMetadata.currentRangeIndex = record.rangeIndex();
streamMetadata.startOffset.set(record.startOffset());
streamMetadata.currentEpoch.set(record.epoch());
streamMetadata.currentRangeIndex.set(record.rangeIndex());
return;
}
// not exist, create a new stream
S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(),
record.startOffset(), this.snapshotRegistry);
this.streamsMetadata.put(streamId, streamMetadata);
this.nextAssignedStreamId = Math.max(this.nextAssignedStreamId, streamId + 1);
}

public void replay(RemoveS3StreamRecord record) {
Expand Down Expand Up @@ -310,7 +325,7 @@ public Map<Integer, BrokerS3WALMetadata> brokersMetadata() {
}

public Long nextAssignedStreamId() {
return nextAssignedStreamId;
return nextAssignedStreamId.get();
}

@Override
Expand Down

0 comments on commit de5411b

Please sign in to comment.