Skip to content

Commit

Permalink
refactor: remove slow fetch hint; rename esUnit test (#422)
Browse files Browse the repository at this point in the history
Signed-off-by: Curtis Wan <[email protected]>
  • Loading branch information
mooc9988 authored Nov 6, 2023
1 parent b17928e commit 0e5def2
Show file tree
Hide file tree
Showing 25 changed files with 30 additions and 386 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_kos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ jobs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Unit Test
run: ./gradlew --build-cache metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest
run: ./gradlew --build-cache metadata:S3UnitTest core:S3UnitTest
64 changes: 0 additions & 64 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -466,36 +466,6 @@ subprojects {
}
}

tasks.register('esUnitTest', Test) {
description = 'Runs unit tests for elastic stream storage.'
dependsOn compileJava

maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures

maxHeapSize = defaultMaxHeapSize
jvmArgs = defaultJvmArgs

testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()

exclude testsToExclude

useJUnitPlatform {
includeTags 'esUnit'
}

retry {
maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures
}
}

tasks.register('S3UnitTest', Test) {
description = 'Runs unit tests for Kafka on S3.'
dependsOn compileJava
Expand Down Expand Up @@ -526,40 +496,6 @@ subprojects {
}
}

tasks.register('esIntegrationTest', Test) {
description = 'Runs integration tests for elastic stream storage.'
dependsOn compileJava
shouldRunAfter test

// Increase heap size for integration tests
maxHeapSize = "2560m"
jvmArgs = defaultJvmArgs

testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()

exclude testsToExclude

useJUnitPlatform {
includeTags 'esIntegration'
}

retry {
maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures
}

// Allows devs to run tests in a loop to debug flaky tests. See README.
if (project.hasProperty("rerun-tests")) {
outputs.upToDateWhen { false }
}
}

task integrationTest(type: Test, dependsOn: compileJava) {
maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.automq.stream.api.StreamClientException;
import com.automq.stream.utils.FutureUtil;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,7 +57,6 @@ public class AlwaysSuccessClient implements Client {
ErrorCode.STREAM_ALREADY_CLOSED,
ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS
);
public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10;
private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1,
Expand All @@ -75,36 +73,26 @@ public class AlwaysSuccessClient implements Client {
ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
private final ExecutorService fetchCallbackExecutors = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
private final Client innerClient;
private final StreamClient streamClient;
private final KVClient kvClient;
private final Delayer delayer;
/**
* The flag to indicate if the callback of append is async.
* It is generally true, but for test cases, it is set to false. In test cases, we aim to ensure that
* the committed offset is promptly updated right after appending. Otherwise, the subsequent fetch request may fail
* due to the delay in updating the committed offset.
*/
private final boolean appendCallbackAsync;
private final long slowFetchTimeoutMillis;

public AlwaysSuccessClient(Client client) {
this(client, true, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
this(client, true);
}

public AlwaysSuccessClient(Client client, boolean appendCallbackAsync) {
this(client, appendCallbackAsync, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
}

public AlwaysSuccessClient(Client client, boolean appendCallbackAsync, long slowFetchTimeoutMillis) {
this.innerClient = client;
this.streamClient = new StreamClientImpl(client.streamClient());
this.kvClient = client.kvClient();
this.delayer = new Delayer(delayFetchScheduler);
this.appendCallbackAsync = appendCallbackAsync;
this.slowFetchTimeoutMillis = slowFetchTimeoutMillis;
}

@Override
Expand Down Expand Up @@ -133,7 +121,6 @@ public void shutdown() {
generalCallbackExecutors.shutdownNow();
appendCallbackExecutors.shutdownNow();
fetchCallbackExecutors.shutdownNow();
delayFetchScheduler.shutdownNow();
}

/**
Expand Down Expand Up @@ -304,80 +291,10 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture<
}, LOGGER));
}

/**
* Get a new CompletableFuture with a {@link SlowFetchHintException} if not otherwise completed before the given timeout.
*
* @param id the id of rawFuture in holdUpFetchingFutureMap
* @param rawFuture the raw future
* @param timeout how long to wait before completing exceptionally with a SlowFetchHintException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, otherwise a new
* CompletableFuture with a {@link SlowFetchHintException}
*/
private CompletableFuture<FetchResult> timeoutAndStoreFuture(String id,
CompletableFuture<FetchResult> rawFuture, long timeout,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException();
}

if (rawFuture.isDone()) {
return rawFuture;
}

final CompletableFuture<FetchResult> cf = new CompletableFuture<>();
rawFuture.whenComplete(new CompleteFetchingFutureAndCancelTimeoutCheck(delayer.delay(() -> {
if (rawFuture == null) {
return;
}

// If rawFuture is done, then complete the cf with the result of rawFuture.
if (rawFuture.isDone()) {
rawFuture.whenComplete((result, exception) -> {
if (exception != null) {
cf.completeExceptionally(exception);
} else {
cf.complete(result);
}
});
} else { // else, complete the cf with a SlowFetchHintException and store the rawFuture for slow fetching.
holdUpFetchingFutureMap.putIfAbsent(id, rawFuture);
cf.completeExceptionally(new SlowFetchHintException());
}
}, timeout, unit), cf));
return cf;
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
String holdUpKey = startOffset + "-" + endOffset + "-" + maxBytesHint;
CompletableFuture<FetchResult> cf = new CompletableFuture<>();
// If this thread is not marked, then just fetch data.
if (!SeparateSlowAndQuickFetchHint.isMarked()) {
CompletableFuture<FetchResult> holdUpCf = holdUpFetchingFutureMap.remove(holdUpKey);
if (holdUpCf != null) {
holdUpCf.thenAccept(cf::complete);
} else {
fetch0(startOffset, endOffset, maxBytesHint, cf);
}
} else {
CompletableFuture<FetchResult> firstFetchFuture = new CompletableFuture<>();
fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture);
// Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException.
timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (ex instanceof SlowFetchHintException) {
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(ex);
} else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
cf.completeExceptionally(ex);
}
} else {
cf.complete(rst);
}
}, LOGGER));
}
fetch0(startOffset, endOffset, maxBytesHint, cf);
return cf;
}

Expand Down

This file was deleted.

3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchingExecutors.submit(new Runnable {
override def run(): Unit = {
ReadManualReleaseHint.mark()
// FIXME: Buggy, replace quick slow fetch to async fetch
// SeparateSlowAndQuickFetchHint.mark()
doFetchingRecords()
// SeparateSlowAndQuickFetchHint.reset()
ReadManualReleaseHint.reset()
}
})
Expand Down
Loading

0 comments on commit 0e5def2

Please sign in to comment.