From 1c011645a9fe4661d6ff96955f34823e5bd0d01b Mon Sep 17 00:00:00 2001 From: TheR1sing3un <87409330+TheR1sing3un@users.noreply.github.com> Date: Sun, 10 Sep 2023 21:23:26 -0500 Subject: [PATCH] feat(s3): support retry to-controller request (#87) 1. support retry to-controller request Signed-off-by: TheR1sing3un --- .../s3/CompactedObjectsNotFoundException.java | 26 +++ .../apache/kafka/common/protocol/Errors.java | 3 +- .../kafka/log/s3/ControllerKVClient.java | 89 +++++----- .../scala/kafka/log/s3/DefaultS3Client.java | 5 +- .../s3/network/ControllerRequestSender.java | 166 ++++++++++++++++-- .../s3/objects/ControllerObjectManager.java | 44 +++-- .../s3/streams/ControllerStreamManager.java | 55 +++--- .../main/scala/kafka/server/KafkaConfig.scala | 8 + .../log/s3/ControllerRequestSenderTest.java | 153 ++++++++++++++++ .../stream/S3ObjectControlManager.java | 1 - .../stream/StreamControlManager.java | 103 ++++++++++- .../controller/StreamControlManagerTest.java | 4 +- 12 files changed, 559 insertions(+), 98 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/s3/CompactedObjectsNotFoundException.java create mode 100644 core/src/test/java/kafka/log/s3/ControllerRequestSenderTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/CompactedObjectsNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/CompactedObjectsNotFoundException.java new file mode 100644 index 0000000000..9d8c0a7025 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/CompactedObjectsNotFoundException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class CompactedObjectsNotFoundException extends ApiException { + public CompactedObjectsNotFoundException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ee8c9978fe..613d8e4913 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -127,6 +127,7 @@ import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.s3.CompactedObjectsNotFoundException; import org.apache.kafka.common.errors.s3.ObjectNotExistException; import org.apache.kafka.common.errors.s3.RedundantOperationException; import org.apache.kafka.common.errors.s3.StreamExistException; @@ -389,7 +390,7 @@ public enum Errors { STREAM_NOT_OPENED(505, "The stream is not opened.", StreamNotOpenedException::new), STREAM_NOT_CLOSED(506, "The stream is not closed.", StreamNotClosedException::new), REDUNDANT_OPERATION(507, "The operation is redundant.", RedundantOperationException::new), - + COMPACTED_OBJECTS_NOT_FOUND(508, "The compacted objects are not found.", CompactedObjectsNotFoundException::new), STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new); diff --git a/core/src/main/scala/kafka/log/s3/ControllerKVClient.java b/core/src/main/scala/kafka/log/s3/ControllerKVClient.java index 46d74e7802..c6dd29f653 100644 --- a/core/src/main/scala/kafka/log/s3/ControllerKVClient.java +++ b/core/src/main/scala/kafka/log/s3/ControllerKVClient.java @@ -20,7 +20,10 @@ import kafka.log.es.api.KVClient; import kafka.log.es.api.KeyValue; import kafka.log.s3.network.ControllerRequestSender; +import kafka.log.s3.network.ControllerRequestSender.RequestTask; +import kafka.log.s3.network.ControllerRequestSender.ResponseHandleResult; import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.DeleteKVResponseData; import org.apache.kafka.common.message.GetKVRequestData; import org.apache.kafka.common.message.GetKVResponseData; import org.apache.kafka.common.message.PutKVRequestData; @@ -57,18 +60,20 @@ public CompletableFuture putKV(List list) { .setValue(kv.value().array()) ).collect(Collectors.toList())) ); - return this.requestSender.send(requestBuilder, PutKVResponseData.class) - .thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp); - return null; - default: - LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}", list, code); - throw code.exception(); - } - }); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, requestBuilder, PutKVResponseData.class, resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp); + return ResponseHandleResult.withSuccess(null); + default: + LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}, retry later", list, code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; } @Override @@ -78,22 +83,24 @@ public CompletableFuture> getKV(List list) { new GetKVRequestData() .setKeys(list) ); - return this.requestSender.send(requestBuilder, GetKVResponseData.class) - .thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - List keyValues = resp.keyValues() - .stream() - .map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null)) - .collect(Collectors.toList()); - LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues); - return keyValues; - default: - LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}", String.join(",", list), code); - throw code.exception(); - } - }); + CompletableFuture> future = new CompletableFuture<>(); + RequestTask> task = new RequestTask<>(future, requestBuilder, GetKVResponseData.class, resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + List keyValues = resp.keyValues() + .stream() + .map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null)) + .collect(Collectors.toList()); + LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues); + return ResponseHandleResult.withSuccess(keyValues); + default: + LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}, retry later", list, code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; } @Override @@ -103,17 +110,19 @@ public CompletableFuture delKV(List list) { new DeleteKVRequestData() .setKeys(list) ); - return this.requestSender.send(requestBuilder, PutKVResponseData.class) - .thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp); - return null; - default: - LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}", String.join(",", list), code); - throw code.exception(); - } - }); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, requestBuilder, DeleteKVResponseData.class, resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp); + return ResponseHandleResult.withSuccess(null); + default: + LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}, retry later", String.join(",", list), code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; } } diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 42d9cdcd6f..2fcb597984 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -24,6 +24,7 @@ import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; +import kafka.log.s3.network.ControllerRequestSender.RetryPolicyContext; import kafka.log.s3.objects.ControllerObjectManager; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.S3Operator; @@ -58,7 +59,9 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.config = config; this.metadataManager = new StreamMetadataManager(brokerServer, config); this.operator = operator; - this.requestSender = new ControllerRequestSender(brokerServer); + RetryPolicyContext retryPolicyContext = new RetryPolicyContext(config.s3ControllerRequestRetryMaxCount(), + config.s3ControllerRequestRetryBaseDelayMs()); + this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext); this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator); diff --git a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java index 643f685a90..10f4451df5 100644 --- a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java +++ b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java @@ -17,6 +17,11 @@ package kafka.log.s3.network; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import kafka.server.BrokerServer; import kafka.server.BrokerToControllerChannelManager; import kafka.server.ControllerRequestCompletionHandler; @@ -24,54 +29,187 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; public class ControllerRequestSender { - private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class); + private final RetryPolicyContext retryPolicyContext; private final BrokerServer brokerServer; - private BrokerToControllerChannelManager channelManager; + private final BrokerToControllerChannelManager channelManager; + + private final ScheduledExecutorService retryService; - public ControllerRequestSender(BrokerServer brokerServer) { + public ControllerRequestSender(BrokerServer brokerServer, RetryPolicyContext retryPolicyContext) { + this.retryPolicyContext = retryPolicyContext; this.brokerServer = brokerServer; this.channelManager = brokerServer.clientToControllerChannelManager(); + this.retryService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("controller-request-retry-sender")); } - public CompletableFuture send(AbstractRequest.Builder requestBuilder, - Class responseDataType) { - CompletableFuture cf = new CompletableFuture<>(); - LOGGER.debug("Sending request {}", requestBuilder); + public void send(RequestTask task) { + Builder requestBuilder = task.requestBuilder; + Class responseDataType = task.responseDataType; + task.sendHit(); + LOGGER.trace("Sending task: {}", task); channelManager.sendRequest(requestBuilder, new ControllerRequestCompletionHandler() { @Override public void onTimeout() { // TODO: add timeout retry policy LOGGER.error("Timeout while creating stream"); - cf.completeExceptionally(new TimeoutException("Timeout while creating stream")); + task.completeExceptionally(new TimeoutException("Timeout while creating stream")); } @Override public void onComplete(ClientResponse response) { if (response.authenticationException() != null) { LOGGER.error("Authentication error while sending request: {}", requestBuilder, response.authenticationException()); - cf.completeExceptionally(response.authenticationException()); + task.completeExceptionally(response.authenticationException()); return; } if (response.versionMismatch() != null) { LOGGER.error("Version mismatch while sending request: {}", requestBuilder, response.versionMismatch()); - cf.completeExceptionally(response.versionMismatch()); + task.completeExceptionally(response.versionMismatch()); return; } if (!responseDataType.isInstance(response.responseBody().data())) { LOGGER.error("Unexpected response type: {} while sending request: {}", - response.responseBody().data().getClass().getSimpleName(), requestBuilder); - cf.completeExceptionally(new RuntimeException("Unexpected response type while sending request")); + response.responseBody().data().getClass().getSimpleName(), requestBuilder); + task.completeExceptionally(new RuntimeException("Unexpected response type while sending request")); + } + ApiMessage data = response.responseBody().data(); + try { + ResponseHandleResult result = (ResponseHandleResult) task.responseHandler.apply(data); + if (result.retry()) { + retryTask(task); + return; + } + task.complete(result.getResponse()); + } catch (Exception e) { + task.completeExceptionally(e); } - cf.complete((R) response.responseBody().data()); } }); - return cf; + } + + private void retryTask(RequestTask task) { + if (task.sendCount() > retryPolicyContext.maxRetryCount) { + LOGGER.error("Task: {}, retry count exceed max retry count: {}", task, retryPolicyContext.maxRetryCount); + task.completeExceptionally(new RuntimeException("Retry count exceed max retry count: ")); + return; + } + long delay = retryPolicyContext.retryBaseDelayMs * (1 << (task.sendCount() - 1)); + LOGGER.warn("Retry task: {}, delay : {} ms", task, delay); + retryService.schedule(() -> send(task), delay, TimeUnit.MILLISECONDS); + } + + public static class RequestTask { + + private final CompletableFuture cf; + + private final AbstractRequest.Builder requestBuilder; + + private final Class responseDataType; + + /** + * The response handler is used to determine whether the response is valid or retryable. + */ + private final Function> responseHandler; + + private int sendCount; + + public RequestTask(CompletableFuture future, AbstractRequest.Builder requestBuilder, Class responseDataType, + Function> responseHandler) { + this.cf = future; + this.requestBuilder = requestBuilder; + this.responseDataType = responseDataType; + this.responseHandler = responseHandler; + } + + public CompletableFuture cf() { + return cf; + } + + public void sendHit() { + sendCount++; + } + + public int sendCount() { + return sendCount; + } + + public void complete(Z result) { + cf.complete(result); + } + + public void completeExceptionally(Throwable throwable) { + cf.completeExceptionally(throwable); + } + + @Override + public String toString() { + return "RequestTask{" + + "requestBuilder=" + requestBuilder + + ", responseDataType=" + responseDataType + + ", sendCount=" + sendCount + + '}'; + } + } + + public static class ResponseHandleResult { + + private final boolean retry; + private final R response; + + private ResponseHandleResult(boolean retry, R response) { + this.retry = retry; + this.response = response; + } + + public static ResponseHandleResult withRetry() { + return new ResponseHandleResult(true, null); + } + + public static ResponseHandleResult withSuccess(R response) { + return new ResponseHandleResult(false, response); + } + + public boolean retry() { + return retry; + } + + public R getResponse() { + return response; + } + } + + public static class RetryPolicyContext { + private int maxRetryCount; + private long retryBaseDelayMs; + + public RetryPolicyContext(int maxRetryCount, long retryBaseDelayMs) { + this.maxRetryCount = maxRetryCount; + this.retryBaseDelayMs = retryBaseDelayMs; + } + + public int maxRetryCount() { + return maxRetryCount; + } + + public long retryBaseDelayMs() { + return retryBaseDelayMs; + } + + public void setMaxRetryCount(int maxRetryCount) { + this.maxRetryCount = maxRetryCount; + } + + public void setRetryBaseDelayMs(long retryBaseDelayMs) { + this.retryBaseDelayMs = retryBaseDelayMs; + } } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index f406a0887b..a1b4a107d0 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -20,6 +20,8 @@ import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; +import kafka.log.s3.network.ControllerRequestSender.RequestTask; +import kafka.log.s3.network.ControllerRequestSender.ResponseHandleResult; import kafka.server.KafkaConfig; import org.apache.kafka.common.message.CommitWALObjectRequestData; import org.apache.kafka.common.message.CommitWALObjectResponseData; @@ -60,16 +62,18 @@ public CompletableFuture prepareObject(int count, long ttl) { .setPreparedCount(count) .setTimeToLiveInMs(ttl) ); - return requestSender.send(request, PrepareS3ObjectResponseData.class).thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, request, PrepareS3ObjectResponseData.class, resp -> { + switch (Errors.forCode(resp.errorCode())) { case NONE: - return resp.firstS3ObjectId(); + return ResponseHandleResult.withSuccess(resp.firstS3ObjectId()); default: - LOGGER.error("Error while preparing {} object, code: {}", count, code); - throw code.exception(); + LOGGER.error("Error while preparing {} object, code: {}, retry later", count, Errors.forCode(resp.errorCode())); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override @@ -87,16 +91,22 @@ public CompletableFuture commitWALObject(CommitWALObjec .stream() .map(StreamObject::toStreamObjectInRequest).collect(Collectors.toList())) .setCompactedObjectIds(request.getCompactedObjectIds())); - return requestSender.send(wrapRequestBuilder, CommitWALObjectResponseData.class).thenApply(resp -> { + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, wrapRequestBuilder, CommitWALObjectResponseData.class, resp -> { Errors code = Errors.forCode(resp.errorCode()); switch (code) { case NONE: - return new CommitWALObjectResponse(); - default: - LOGGER.error("Error while committing WAL object: {}, code: {}", request, code); + return ResponseHandleResult.withSuccess(new CommitWALObjectResponse()); + case OBJECT_NOT_EXIST: + case COMPACTED_OBJECTS_NOT_FOUND: throw code.exception(); + default: + LOGGER.error("Error while committing WAL object: {}, code: {}, retry later", request, code); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override @@ -109,16 +119,22 @@ public CompletableFuture commitStreamObject(CommitStreamObjectRequest requ .setStartOffset(request.getStartOffset()) .setEndOffset(request.getEndOffset()) .setSourceObjectIds(request.getSourceObjectIds())); - return requestSender.send(wrapRequestBuilder, CommitWALObjectResponseData.class).thenApply(resp -> { + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, wrapRequestBuilder, CommitWALObjectResponseData.class, resp -> { Errors code = Errors.forCode(resp.errorCode()); switch (code) { case NONE: - return null; - default: - LOGGER.error("Error while committing stream object: {}, code: {}", request, code); + return ResponseHandleResult.withSuccess(null); + case OBJECT_NOT_EXIST: + case COMPACTED_OBJECTS_NOT_FOUND: throw code.exception(); + default: + LOGGER.error("Error while committing stream object: {}, code: {}, retry later", request, code); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java index a108e4ebcd..a42f974960 100644 --- a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java @@ -18,6 +18,8 @@ package kafka.log.s3.streams; import kafka.log.s3.network.ControllerRequestSender; +import kafka.log.s3.network.ControllerRequestSender.RequestTask; +import kafka.log.s3.network.ControllerRequestSender.ResponseHandleResult; import kafka.log.s3.objects.OpenStreamMetadata; import kafka.server.KafkaConfig; import org.apache.kafka.common.message.CloseStreamRequestData; @@ -56,17 +58,20 @@ public ControllerStreamManager(ControllerRequestSender requestSender, KafkaConfi public CompletableFuture> getOpeningStreams() { GetOpeningStreamsRequest.Builder request = new GetOpeningStreamsRequest.Builder( new GetOpeningStreamsRequestData().setBrokerId(config.brokerId()).setBrokerEpoch(config.brokerEpoch())); - return this.requestSender.send(request, GetOpeningStreamsResponseData.class).thenApply(resp -> { + CompletableFuture> future = new CompletableFuture<>(); + RequestTask> task = new RequestTask<>(future, request, GetOpeningStreamsResponseData.class, resp -> { switch (Errors.forCode(resp.errorCode())) { case NONE: - return resp.streamsOffset().stream() + return ResponseHandleResult.withSuccess(resp.streamsOffset().stream() .map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) - .collect(Collectors.toList()); + .collect(Collectors.toList())); default: - LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); + LOGGER.error("Error while getting streams offset: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode())); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override @@ -74,15 +79,18 @@ public CompletableFuture createStream() { CreateStreamRequest.Builder request = new CreateStreamRequest.Builder( new CreateStreamRequestData() ); - return this.requestSender.send(request, CreateStreamResponseData.class).thenApply(resp -> { + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, request, CreateStreamResponseData.class, resp -> { switch (Errors.forCode(resp.errorCode())) { case NONE: - return resp.streamId(); + return ResponseHandleResult.withSuccess(resp.streamId()); default: - LOGGER.error("Error while creating stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); + LOGGER.error("Error while creating stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode())); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override @@ -93,24 +101,24 @@ public CompletableFuture openStream(long streamId, long epoc .setStreamEpoch(epoch) .setBrokerId(config.brokerId()) ); - return this.requestSender.send(request, OpenStreamResponseData.class).thenApply(resp -> { + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, request, OpenStreamResponseData.class, resp -> { switch (Errors.forCode(resp.errorCode())) { case NONE: - return new OpenStreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset()); + return ResponseHandleResult.withSuccess(new OpenStreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset())); case STREAM_NOT_EXIST: - LOGGER.error("Stream not exist while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); case STREAM_FENCED: - LOGGER.error("Stream fenced while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); case STREAM_NOT_CLOSED: - LOGGER.error("Stream not closed while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); + case STREAM_INNER_ERROR: + LOGGER.error("Unexpected error while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); throw Errors.forCode(resp.errorCode()).exception(); default: - LOGGER.error("Error while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); + LOGGER.error("Error while opening stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode())); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override @@ -127,22 +135,23 @@ public CompletableFuture closeStream(long streamId, long epoch) { .setStreamEpoch(epoch) .setBrokerId(config.brokerId()) ); - return this.requestSender.send(request, CloseStreamResponseData.class).thenApply(resp -> { - LOGGER.info("close stream {} response: {}", streamId, resp); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, request, CloseStreamResponseData.class, resp -> { switch (Errors.forCode(resp.errorCode())) { case NONE: - return null; + return ResponseHandleResult.withSuccess(null); case STREAM_NOT_EXIST: case STREAM_FENCED: case STREAM_INNER_ERROR: LOGGER.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); throw Errors.forCode(resp.errorCode()).exception(); default: - // TODO: retry recoverable error LOGGER.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); + return ResponseHandleResult.withRetry(); } }); + this.requestSender.send(task); + return future; } @Override diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b559293d7d..2994d54046 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -689,6 +689,8 @@ object KafkaConfig { val S3StreamObjectCompactionTaskIntervalProp = "s3.stream.object.compaction.task.interval" val S3StreamObjectCompactionMaxSizeProp = "s3.stream.object.compaction.max.size" val S3StreamObjectCompactionLivingTimeThresholdProp = "s3.stream.object.compaction.living.time.threshold" + val S3ControllerRequestRetryMaxCountProp = "s3.controller.request.retry.max.count" + val S3ControllerRequestRetryBaseDelayMsProp = "s3.controller.request.retry.base.delay.ms" val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." @@ -704,6 +706,8 @@ object KafkaConfig { val S3StreamObjectCompactionTaskIntervalDoc = "The S3 stream object compaction task interval in minutes." val S3StreamObjectCompactionMaxSizeDoc = "The S3 stream object compaction max size in GiB." val S3StreamObjectCompactionLivingTimeThresholdDoc = "The S3 stream object compaction living time threshold in hours." + val S3ControllerRequestRetryMaxCountDoc = "The S3 controller request retry max count." + val S3ControllerRequestRetryBaseDelayMsDoc = "The S3 controller request retry base delay in milliseconds." // Kafka on S3 inject end @@ -1517,6 +1521,8 @@ object KafkaConfig { .define(S3StreamObjectCompactionTaskIntervalProp, INT, 60, MEDIUM, S3StreamObjectCompactionTaskIntervalDoc) .define(S3StreamObjectCompactionMaxSizeProp, INT, 10, MEDIUM, S3StreamObjectCompactionMaxSizeDoc) .define(S3StreamObjectCompactionLivingTimeThresholdProp, INT, 1, MEDIUM, S3StreamObjectCompactionLivingTimeThresholdDoc) + .define(S3ControllerRequestRetryMaxCountProp, INT, 5, MEDIUM, S3ControllerRequestRetryMaxCountDoc) + .define(S3ControllerRequestRetryBaseDelayMsProp, LONG, 500, MEDIUM, S3ControllerRequestRetryBaseDelayMsDoc) // Kafka on S3 inject end } @@ -2065,6 +2071,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3StreamObjectCompactionTaskInterval = getInt(KafkaConfig.S3StreamObjectCompactionTaskIntervalProp) val s3StreamObjectCompactionMaxSize = getInt(KafkaConfig.S3StreamObjectCompactionMaxSizeProp) val s3StreamObjectCompactionLivingTimeThreshold = getInt(KafkaConfig.S3StreamObjectCompactionLivingTimeThresholdProp) + val s3ControllerRequestRetryMaxCount = getInt(KafkaConfig.S3ControllerRequestRetryMaxCountProp) + val s3ControllerRequestRetryBaseDelayMs = getLong(KafkaConfig.S3ControllerRequestRetryBaseDelayMsProp) // TODO: ensure incremental epoch => Store epoch in disk, if timestamp flip back, we could use disk epoch to keep the incremental epoch. val brokerEpoch = System.currentTimeMillis() // Kafka on S3 inject end diff --git a/core/src/test/java/kafka/log/s3/ControllerRequestSenderTest.java b/core/src/test/java/kafka/log/s3/ControllerRequestSenderTest.java new file mode 100644 index 0000000000..0b5f8e24ea --- /dev/null +++ b/core/src/test/java/kafka/log/s3/ControllerRequestSenderTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import kafka.log.s3.network.ControllerRequestSender; +import kafka.log.s3.network.ControllerRequestSender.RequestTask; +import kafka.log.s3.network.ControllerRequestSender.ResponseHandleResult; +import kafka.log.s3.network.ControllerRequestSender.RetryPolicyContext; +import kafka.server.BrokerServer; +import kafka.server.BrokerToControllerChannelManager; +import kafka.server.ControllerRequestCompletionHandler; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.CreateStreamRequestData; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.s3.CreateStreamRequest; +import org.apache.kafka.common.requests.s3.CreateStreamResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +@Timeout(40) +@Tag("S3Unit") +public class ControllerRequestSenderTest { + + private BrokerServer brokerServer; + private BrokerToControllerChannelManager channelManager; + private ControllerRequestSender requestSender; + private RetryPolicyContext retryPolicyContext; + + @BeforeEach + public void setUp() { + brokerServer = Mockito.mock(BrokerServer.class); + channelManager = Mockito.mock(BrokerToControllerChannelManager.class); + Mockito.when(brokerServer.clientToControllerChannelManager()).thenReturn(channelManager); + retryPolicyContext = new RetryPolicyContext(2, 100L); + requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext); + } + + @Test + public void testBasic() { + // first time inner-error, second time success + Mockito.doAnswer(ink -> { + ControllerRequestCompletionHandler handler = ink.getArgument(1); + CreateStreamResponse response = new CreateStreamResponse(new CreateStreamResponseData() + .setErrorCode(Errors.STREAM_INNER_ERROR.code())); + ClientResponse clientResponse = new ClientResponse( + null, null, null, -1, -1, false, null, null, response); + handler.onComplete(clientResponse); + return null; + }).doAnswer(ink -> { + ControllerRequestCompletionHandler handler = ink.getArgument(1); + CreateStreamResponse response = new CreateStreamResponse(new CreateStreamResponseData() + .setErrorCode(Errors.NONE.code()) + .setStreamId(13L)); + ClientResponse clientResponse = new ClientResponse( + null, null, null, -1, -1, false, null, null, response); + handler.onComplete(clientResponse); + return null; + }).when(channelManager).sendRequest(any(AbstractRequest.Builder.class), any(ControllerRequestCompletionHandler.class)); + + CreateStreamRequest.Builder request = new CreateStreamRequest.Builder( + new CreateStreamRequestData() + ); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, request, CreateStreamResponseData.class, resp -> { + switch (Errors.forCode(resp.errorCode())) { + case NONE: + return ResponseHandleResult.withSuccess(resp.streamId()); + default: + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + assertDoesNotThrow(() -> { + Long streamId = future.get(1, TimeUnit.SECONDS); + assertEquals(13L, streamId); + }); + Mockito.verify(channelManager, Mockito.times(2)) + .sendRequest(any(AbstractRequest.Builder.class), any(ControllerRequestCompletionHandler.class)); + } + + @Test + public void testMaxRetry() { + Answer failedAns = ink -> { + ControllerRequestCompletionHandler handler = ink.getArgument(1); + CreateStreamResponse response = new CreateStreamResponse(new CreateStreamResponseData() + .setErrorCode(Errors.STREAM_INNER_ERROR.code())); + ClientResponse clientResponse = new ClientResponse( + null, null, null, -1, -1, false, null, null, response); + handler.onComplete(clientResponse); + return null; + }; + Answer successAns = ink -> { + ControllerRequestCompletionHandler handler = ink.getArgument(1); + CreateStreamResponse response = new CreateStreamResponse(new CreateStreamResponseData() + .setErrorCode(Errors.NONE.code()) + .setStreamId(13L)); + ClientResponse clientResponse = new ClientResponse( + null, null, null, -1, -1, false, null, null, response); + handler.onComplete(clientResponse); + return null; + }; + // failed 3 times, success 1 time + Mockito.doAnswer(failedAns).doAnswer(failedAns).doAnswer(failedAns).doAnswer(successAns).when(channelManager) + .sendRequest(any(AbstractRequest.Builder.class), any(ControllerRequestCompletionHandler.class)); + CreateStreamRequest.Builder request = new CreateStreamRequest.Builder( + new CreateStreamRequestData() + ); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(future, request, CreateStreamResponseData.class, resp -> { + switch (Errors.forCode(resp.errorCode())) { + case NONE: + return ResponseHandleResult.withSuccess(resp.streamId()); + default: + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + assertThrows(ExecutionException.class, () -> { + future.get(1, TimeUnit.SECONDS); + }); + Mockito.verify(channelManager, Mockito.times(3)) + .sendRequest(any(AbstractRequest.Builder.class), any(ControllerRequestCompletionHandler.class)); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 292e57676a..e490911e78 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -187,7 +187,6 @@ public ControllerResult markDestroyObjects(List objects) { S3Object object = this.objectsMetadata.get(objectId); if (object == null || object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) { log.error("object {} not exist when mark destroy object", objectId); - // TODO: Maybe we can ignore this situation, because this object is already destroyed ? return ControllerResult.of(Collections.emptyList(), false); } S3ObjectRecord record = new S3ObjectRecord() diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 1dc8de3a51..3d5c6e207c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -211,6 +211,39 @@ public ControllerResult createStream(CreateStreamReque return ControllerResult.atomicOf(Arrays.asList(record0, record), resp); } + /** + * Open stream. + *

+ * Response Errors Enum: + *

    + *
  • + * STREAM_FENCED: + *
      + *
    1. stream's epoch is larger than request epoch
    2. + *
    3. stream's current range's broker is not equal to request broker
    4. + *
    5. stream's epoch matched, but stream's state is CLOSED
    6. + *
    + *
  • + *
  • + * STREAM_NOT_EXIST + *
      + *
    1. stream's id not exist in current stream-metadata
    2. + *
    + *
  • + *
  • + * STREAM_NOT_CLOSED + *
      + *
    1. request with higher epoch but stream's state is OPENED
    2. + *
    + *
  • + *
  • + * STREAM_INNER_ERROR + *
      + *
    1. stream's current range not exist when stream has been opened
    2. + *
    + *
  • + *
+ */ public ControllerResult openStream(OpenStreamRequestData data) { OpenStreamResponseData resp = new OpenStreamResponseData(); long streamId = data.streamId(); @@ -294,6 +327,33 @@ public ControllerResult openStream(OpenStreamRequestData return ControllerResult.atomicOf(records, resp); } + /** + * Close stream. + *

+ * Response Errors Enum: + *

    + *
  • + * STREAM_FENCED: + *
      + *
    1. stream's epoch is larger than request epoch
    2. + *
    3. stream's current range's broker is not equal to request broker
    4. + *
    + *
  • + *
  • + * STREAM_NOT_EXIST + *
      + *
    1. stream's id not exist in current stream-metadata
    2. + *
    + *
  • + *
  • + * STREAM_INNER_ERROR + *
      + *
    1. stream's current range not exist when stream has been opened
    2. + *
    3. close stream with higher epoch
    4. + *
    + *
  • + *
+ */ public ControllerResult closeStream(CloseStreamRequestData data) { CloseStreamResponseData resp = new CloseStreamResponseData(); long streamId = data.streamId(); @@ -354,6 +414,26 @@ public ControllerResult deleteStream(DeleteStreamReque throw new UnsupportedOperationException(); } + /** + * Commit wal object. + *

+ * Response Errors Enum: + *

    + *
  • + * OBJECT_NOT_EXIST + *
      + *
    1. wal object not exist when commit
    2. + *
    3. stream object not exist when commit
    4. + *
    + *
  • + *
  • + * COMPACTED_OBJECTS_NOT_FOUND + *
      + *
    1. compacted objects not found when mark destroy
    2. + *
    + *
  • + *
+ */ public ControllerResult commitWALObject(CommitWALObjectRequestData data) { CommitWALObjectResponseData resp = new CommitWALObjectResponseData(); List records = new ArrayList<>(); @@ -385,7 +465,7 @@ public ControllerResult commitWALObject(CommitWALOb ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(compactedObjectIds); if (!destroyResult.response()) { log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", compactedObjectIds); - resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + resp.setErrorCode(Errors.COMPACTED_OBJECTS_NOT_FOUND.code()); return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(destroyResult.records()); @@ -447,6 +527,25 @@ public ControllerResult commitWALObject(CommitWALOb return ControllerResult.atomicOf(records, resp); } + /** + * Commit stream object. + *

+ * Response Errors Enum: + *

    + *
  • + * OBJECT_NOT_EXIST + *
      + *
    1. stream object not exist when commit
    2. + *
    + *
  • + *
  • + * COMPACTED_OBJECTS_NOT_FOUND + *
      + *
    1. compacted objects not found when mark destroy
    2. + *
    + *
  • + *
+ */ public ControllerResult commitStreamObject(CommitStreamObjectRequestData data) { long streamObjectId = data.objectId(); long streamId = data.streamId(); @@ -478,7 +577,7 @@ public ControllerResult commitStreamObject(Commi ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds); if (!destroyResult.response()) { log.error("[CommitStreamObject]: Mark destroy compacted objects: {} failed", sourceObjectIds); - resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + resp.setErrorCode(Errors.COMPACTED_OBJECTS_NOT_FOUND.code()); return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(destroyResult.records()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 8f6302e6ab..0701fc125d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -475,7 +475,7 @@ public void testCommitWalCompacted() { .setObjectStreamRanges(streamRanges2) .setCompactedObjectIds(List.of(0L, 1L, 10L)); ControllerResult result6 = manager.commitWALObject(commitRequest2); - assertEquals(Errors.STREAM_INNER_ERROR.code(), result6.response().errorCode()); + assertEquals(Errors.COMPACTED_OBJECTS_NOT_FOUND.code(), result6.response().errorCode()); assertEquals(0, result6.records().size()); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); @@ -652,7 +652,7 @@ public void testCommitStreamObject() { .setObjectSize(999) .setSourceObjectIds(List.of(10L)); result2 = manager.commitStreamObject(streamObjectRequest); - assertEquals(Errors.STREAM_INNER_ERROR.code(), result2.response().errorCode()); + assertEquals(Errors.COMPACTED_OBJECTS_NOT_FOUND.code(), result2.response().errorCode()); replay(manager, result2.records()); // 7. verify stream objects