Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix controller check prepare object NPE #124

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,6 @@

package org.apache.kafka.controller.stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
Expand All @@ -47,9 +34,24 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.kafka.metadata.stream.ObjectUtils.NOOP_OBJECT_ID;

/**
Expand All @@ -75,9 +77,9 @@ public class S3ObjectControlManager {
/**
* The objectId of the next object to be prepared. (start from 0)
*/
private TimelineLong nextAssignedObjectId;
private final TimelineLong nextAssignedObjectId;

private final Queue<Long/*objectId*/> preparedObjects;
private final TimelineHashSet<Long /* objectId */> preparedObjects;

// TODO: support different deletion policies, based on time dimension or space dimension?
private final Queue<Long/*objectId*/> markDestroyedObjects;
Expand All @@ -89,32 +91,31 @@ public class S3ObjectControlManager {
private final ScheduledExecutorService lifecycleCheckTimer;

public S3ObjectControlManager(
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
S3Config config,
S3Operator operator) {
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
S3Config config,
S3Operator operator) {
this.quorumController = quorumController;
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
this.nextAssignedObjectId = new TimelineLong(snapshotRegistry);
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedBlockingDeque<>();
this.preparedObjects = new TimelineHashSet<>(snapshotRegistry, 0);
this.markDestroyedObjects = new LinkedBlockingDeque<>();
this.operator = operator;
this.lifecycleListeners = new ArrayList<>();
this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor();
this.lifecycleCheckTimer.scheduleWithFixedDelay(() -> {
triggerCheckEvent();
}, DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
this.lifecycleCheckTimer.scheduleWithFixedDelay(this::triggerCheckEvent,
DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

private void triggerCheckEvent() {
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
null, null, OptionalLong.empty());
this.quorumController.checkS3ObjectsLifecycle(ctx).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to check the S3Object's lifecycle", exp);
Expand All @@ -139,18 +140,18 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
// update assigned stream id
long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));

long firstAssignedObjectId = nextAssignedObjectId.get();
for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId.get() + i;
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.PREPARED.toByte())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
.setObjectId(objectId)
.setObjectState(S3ObjectState.PREPARED.toByte())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setFirstS3ObjectId(firstAssignedObjectId);
Expand All @@ -176,14 +177,14 @@ public ControllerResult<Errors> commitObject(long objectId, long objectSize, lon
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(committedTs);
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(committedTs);
return ControllerResult.of(List.of(
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
Expand All @@ -195,12 +196,12 @@ public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
return ControllerResult.of(Collections.emptyList(), false);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
.setObjectId(objectId)
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
records.add(new ApiMessageAndVersion(record, (short) 0));
}
return ControllerResult.atomicOf(records, true);
Expand All @@ -214,20 +215,24 @@ public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
// TODO: recover the prepared objects and mark destroyed objects when restart the controller
if (object.getS3ObjectState() == S3ObjectState.PREPARED) {
preparedObjects.add(object.getObjectId());
} else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
} else {
preparedObjects.remove(object.getObjectId());
if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
}
}
}

public void replay(RemoveS3ObjectRecord record) {
objectsMetadata.remove(record.objectId());
markDestroyedObjects.remove(record.objectId());
preparedObjects.remove(record.objectId());
}

/**
Expand All @@ -239,30 +244,30 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
List<ApiMessageAndVersion> records = new ArrayList<>();
// check the expired objects
this.preparedObjects.stream().
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
});
});
});
// check the mark destroyed objects
ObjectPair[] destroyedObjects = this.markDestroyedObjects.stream()
// must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey()))
.toArray(ObjectPair[]::new);
// must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey()))
.toArray(ObjectPair[]::new);
String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new);
if (destroyedObjectKeys == null || destroyedObjectKeys.length == 0) {
return ControllerResult.of(records, null);
Expand All @@ -272,16 +277,16 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
this.operator.delele(destroyedObjectKeys).whenCompleteAsync((success, e) -> {
if (e != null || !success) {
log.error("Failed to delete the S3Object from S3, objectKeys: {}",
String.join(",", destroyedObjectKeys), e);
String.join(",", destroyedObjectKeys), e);
return;
}
// notify the controller an objects deletion event to drive the removal of the objects
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
null, null, OptionalLong.empty());
this.quorumController.notifyS3ObjectDeleted(ctx, destroyedObjectIds).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to notify the controller the S3Object deletion event, objectIds: {}",
destroyedObjectIds, exp);
destroyedObjectIds, exp);
}
});
});
Expand All @@ -298,15 +303,11 @@ public ControllerResult<Void> notifyS3ObjectDeleted(Set<Long> deletedObjectIds)
List<ApiMessageAndVersion> records = new ArrayList<>();
deletedObjectIds.stream().filter(markDestroyedObjects::contains).forEach(objectId -> {
records.add(new ApiMessageAndVersion(
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
});
return ControllerResult.of(records, null);
}

public Queue<Long> preparedObjects() {
return preparedObjects;
}

public Map<Long, S3Object> objectsMetadata() {
return objectsMetadata;
}
Expand Down Expand Up @@ -350,9 +351,9 @@ public String objectKey() {
@Override
public String toString() {
return "ObjectPair{" +
"objectId=" + objectId +
", objectKey='" + objectKey + '\'' +
'}';
"objectId=" + objectId +
", objectKey='" + objectKey + '\'' +
'}';
}
}

Expand Down