Skip to content

Commit

Permalink
feat(s3): support retry to-controller request (#87)
Browse files Browse the repository at this point in the history
1. support retry to-controller request

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un authored Sep 11, 2023
1 parent 1a82c99 commit 1c01164
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class CompactedObjectsNotFoundException extends ApiException {
public CompactedObjectsNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.s3.CompactedObjectsNotFoundException;
import org.apache.kafka.common.errors.s3.ObjectNotExistException;
import org.apache.kafka.common.errors.s3.RedundantOperationException;
import org.apache.kafka.common.errors.s3.StreamExistException;
Expand Down Expand Up @@ -389,7 +390,7 @@ public enum Errors {
STREAM_NOT_OPENED(505, "The stream is not opened.", StreamNotOpenedException::new),
STREAM_NOT_CLOSED(506, "The stream is not closed.", StreamNotClosedException::new),
REDUNDANT_OPERATION(507, "The operation is redundant.", RedundantOperationException::new),

COMPACTED_OBJECTS_NOT_FOUND(508, "The compacted objects are not found.", CompactedObjectsNotFoundException::new),


STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new);
Expand Down
89 changes: 49 additions & 40 deletions core/src/main/scala/kafka/log/s3/ControllerKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import kafka.log.es.api.KVClient;
import kafka.log.es.api.KeyValue;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.network.ControllerRequestSender.RequestTask;
import kafka.log.s3.network.ControllerRequestSender.ResponseHandleResult;
import org.apache.kafka.common.message.DeleteKVRequestData;
import org.apache.kafka.common.message.DeleteKVResponseData;
import org.apache.kafka.common.message.GetKVRequestData;
import org.apache.kafka.common.message.GetKVResponseData;
import org.apache.kafka.common.message.PutKVRequestData;
Expand Down Expand Up @@ -57,18 +60,20 @@ public CompletableFuture<Void> putKV(List<KeyValue> list) {
.setValue(kv.value().array())
).collect(Collectors.toList()))
);
return this.requestSender.send(requestBuilder, PutKVResponseData.class)
.thenApply(resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp);
return null;
default:
LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}", list, code);
throw code.exception();
}
});
CompletableFuture<Void> future = new CompletableFuture<>();
RequestTask<PutKVResponseData, Void> task = new RequestTask<>(future, requestBuilder, PutKVResponseData.class, resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp);
return ResponseHandleResult.withSuccess(null);
default:
LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}, retry later", list, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}

@Override
Expand All @@ -78,22 +83,24 @@ public CompletableFuture<List<KeyValue>> getKV(List<String> list) {
new GetKVRequestData()
.setKeys(list)
);
return this.requestSender.send(requestBuilder, GetKVResponseData.class)
.thenApply(resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
List<KeyValue> keyValues = resp.keyValues()
.stream()
.map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null))
.collect(Collectors.toList());
LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues);
return keyValues;
default:
LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}", String.join(",", list), code);
throw code.exception();
}
});
CompletableFuture<List<KeyValue>> future = new CompletableFuture<>();
RequestTask<GetKVResponseData, List<KeyValue>> task = new RequestTask<>(future, requestBuilder, GetKVResponseData.class, resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
List<KeyValue> keyValues = resp.keyValues()
.stream()
.map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null))
.collect(Collectors.toList());
LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues);
return ResponseHandleResult.withSuccess(keyValues);
default:
LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}, retry later", list, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}

@Override
Expand All @@ -103,17 +110,19 @@ public CompletableFuture<Void> delKV(List<String> list) {
new DeleteKVRequestData()
.setKeys(list)
);
return this.requestSender.send(requestBuilder, PutKVResponseData.class)
.thenApply(resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp);
return null;
default:
LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}", String.join(",", list), code);
throw code.exception();
}
});
CompletableFuture<Void> future = new CompletableFuture<>();
RequestTask<DeleteKVResponseData, Void> task = new RequestTask<>(future, requestBuilder, DeleteKVResponseData.class, resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp);
return ResponseHandleResult.withSuccess(null);
default:
LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}, retry later", String.join(",", list), code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}
}
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/log/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.metadata.StreamMetadataManager;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.network.ControllerRequestSender.RetryPolicyContext;
import kafka.log.s3.objects.ControllerObjectManager;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
Expand Down Expand Up @@ -58,7 +59,9 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator
this.config = config;
this.metadataManager = new StreamMetadataManager(brokerServer, config);
this.operator = operator;
this.requestSender = new ControllerRequestSender(brokerServer);
RetryPolicyContext retryPolicyContext = new RetryPolicyContext(config.s3ControllerRequestRetryMaxCount(),
config.s3ControllerRequestRetryBaseDelayMs());
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
this.streamManager = new ControllerStreamManager(this.requestSender, config);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config);
this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator);
Expand Down
166 changes: 152 additions & 14 deletions core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,199 @@

package kafka.log.s3.network;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import kafka.server.BrokerServer;
import kafka.server.BrokerToControllerChannelManager;
import kafka.server.ControllerRequestCompletionHandler;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequest.Builder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class ControllerRequestSender {
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class);

private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class);
private final RetryPolicyContext retryPolicyContext;
private final BrokerServer brokerServer;
private BrokerToControllerChannelManager channelManager;
private final BrokerToControllerChannelManager channelManager;

private final ScheduledExecutorService retryService;

public ControllerRequestSender(BrokerServer brokerServer) {
public ControllerRequestSender(BrokerServer brokerServer, RetryPolicyContext retryPolicyContext) {
this.retryPolicyContext = retryPolicyContext;
this.brokerServer = brokerServer;
this.channelManager = brokerServer.clientToControllerChannelManager();
this.retryService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("controller-request-retry-sender"));
}

public <T extends AbstractRequest, R extends ApiMessage> CompletableFuture<R> send(AbstractRequest.Builder<T> requestBuilder,
Class<R> responseDataType) {
CompletableFuture<R> cf = new CompletableFuture<>();
LOGGER.debug("Sending request {}", requestBuilder);
public void send(RequestTask task) {
Builder requestBuilder = task.requestBuilder;
Class responseDataType = task.responseDataType;
task.sendHit();
LOGGER.trace("Sending task: {}", task);
channelManager.sendRequest(requestBuilder, new ControllerRequestCompletionHandler() {
@Override
public void onTimeout() {
// TODO: add timeout retry policy
LOGGER.error("Timeout while creating stream");
cf.completeExceptionally(new TimeoutException("Timeout while creating stream"));
task.completeExceptionally(new TimeoutException("Timeout while creating stream"));
}

@Override
public void onComplete(ClientResponse response) {
if (response.authenticationException() != null) {
LOGGER.error("Authentication error while sending request: {}", requestBuilder, response.authenticationException());
cf.completeExceptionally(response.authenticationException());
task.completeExceptionally(response.authenticationException());
return;
}
if (response.versionMismatch() != null) {
LOGGER.error("Version mismatch while sending request: {}", requestBuilder, response.versionMismatch());
cf.completeExceptionally(response.versionMismatch());
task.completeExceptionally(response.versionMismatch());
return;
}
if (!responseDataType.isInstance(response.responseBody().data())) {
LOGGER.error("Unexpected response type: {} while sending request: {}",
response.responseBody().data().getClass().getSimpleName(), requestBuilder);
cf.completeExceptionally(new RuntimeException("Unexpected response type while sending request"));
response.responseBody().data().getClass().getSimpleName(), requestBuilder);
task.completeExceptionally(new RuntimeException("Unexpected response type while sending request"));
}
ApiMessage data = response.responseBody().data();
try {
ResponseHandleResult result = (ResponseHandleResult) task.responseHandler.apply(data);
if (result.retry()) {
retryTask(task);
return;
}
task.complete(result.getResponse());
} catch (Exception e) {
task.completeExceptionally(e);
}
cf.complete((R) response.responseBody().data());
}
});
return cf;
}

private void retryTask(RequestTask task) {
if (task.sendCount() > retryPolicyContext.maxRetryCount) {
LOGGER.error("Task: {}, retry count exceed max retry count: {}", task, retryPolicyContext.maxRetryCount);
task.completeExceptionally(new RuntimeException("Retry count exceed max retry count: "));
return;
}
long delay = retryPolicyContext.retryBaseDelayMs * (1 << (task.sendCount() - 1));
LOGGER.warn("Retry task: {}, delay : {} ms", task, delay);
retryService.schedule(() -> send(task), delay, TimeUnit.MILLISECONDS);
}

public static class RequestTask<T/*controller response data type*/, Z/*upper response data type*/> {

private final CompletableFuture<Z> cf;

private final AbstractRequest.Builder requestBuilder;

private final Class<T> responseDataType;

/**
* The response handler is used to determine whether the response is valid or retryable.
*/
private final Function<T, ResponseHandleResult<Z>> responseHandler;

private int sendCount;

public RequestTask(CompletableFuture<Z> future, AbstractRequest.Builder requestBuilder, Class<T> responseDataType,
Function<T, ResponseHandleResult<Z>> responseHandler) {
this.cf = future;
this.requestBuilder = requestBuilder;
this.responseDataType = responseDataType;
this.responseHandler = responseHandler;
}

public CompletableFuture<Z> cf() {
return cf;
}

public void sendHit() {
sendCount++;
}

public int sendCount() {
return sendCount;
}

public void complete(Z result) {
cf.complete(result);
}

public void completeExceptionally(Throwable throwable) {
cf.completeExceptionally(throwable);
}

@Override
public String toString() {
return "RequestTask{" +
"requestBuilder=" + requestBuilder +
", responseDataType=" + responseDataType +
", sendCount=" + sendCount +
'}';
}
}

public static class ResponseHandleResult<R> {

private final boolean retry;
private final R response;

private ResponseHandleResult(boolean retry, R response) {
this.retry = retry;
this.response = response;
}

public static ResponseHandleResult withRetry() {
return new ResponseHandleResult(true, null);
}

public static <R> ResponseHandleResult withSuccess(R response) {
return new ResponseHandleResult(false, response);
}

public boolean retry() {
return retry;
}

public R getResponse() {
return response;
}
}

public static class RetryPolicyContext {
private int maxRetryCount;
private long retryBaseDelayMs;

public RetryPolicyContext(int maxRetryCount, long retryBaseDelayMs) {
this.maxRetryCount = maxRetryCount;
this.retryBaseDelayMs = retryBaseDelayMs;
}

public int maxRetryCount() {
return maxRetryCount;
}

public long retryBaseDelayMs() {
return retryBaseDelayMs;
}

public void setMaxRetryCount(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
}

public void setRetryBaseDelayMs(long retryBaseDelayMs) {
this.retryBaseDelayMs = retryBaseDelayMs;
}
}
}
Loading

0 comments on commit 1c01164

Please sign in to comment.