Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove slow fetch hint; rename esUnit test #422

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_kos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ jobs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Unit Test
run: ./gradlew --build-cache metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest
run: ./gradlew --build-cache metadata:S3UnitTest core:S3UnitTest
64 changes: 0 additions & 64 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -466,36 +466,6 @@ subprojects {
}
}

tasks.register('esUnitTest', Test) {
description = 'Runs unit tests for elastic stream storage.'
dependsOn compileJava

maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures

maxHeapSize = defaultMaxHeapSize
jvmArgs = defaultJvmArgs

testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()

exclude testsToExclude

useJUnitPlatform {
includeTags 'esUnit'
}

retry {
maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures
}
}

tasks.register('S3UnitTest', Test) {
description = 'Runs unit tests for Kafka on S3.'
dependsOn compileJava
Expand Down Expand Up @@ -526,40 +496,6 @@ subprojects {
}
}

tasks.register('esIntegrationTest', Test) {
description = 'Runs integration tests for elastic stream storage.'
dependsOn compileJava
shouldRunAfter test

// Increase heap size for integration tests
maxHeapSize = "2560m"
jvmArgs = defaultJvmArgs

testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()

exclude testsToExclude

useJUnitPlatform {
includeTags 'esIntegration'
}

retry {
maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures
}

// Allows devs to run tests in a loop to debug flaky tests. See README.
if (project.hasProperty("rerun-tests")) {
outputs.upToDateWhen { false }
}
}

task integrationTest(type: Test, dependsOn: compileJava) {
maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.automq.stream.api.StreamClientException;
import com.automq.stream.utils.FutureUtil;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,7 +57,6 @@ public class AlwaysSuccessClient implements Client {
ErrorCode.STREAM_ALREADY_CLOSED,
ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS
);
public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10;
private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1,
Expand All @@ -75,36 +73,26 @@ public class AlwaysSuccessClient implements Client {
ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
private final ExecutorService fetchCallbackExecutors = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
private final Client innerClient;
private final StreamClient streamClient;
private final KVClient kvClient;
private final Delayer delayer;
/**
* The flag to indicate if the callback of append is async.
* It is generally true, but for test cases, it is set to false. In test cases, we aim to ensure that
* the committed offset is promptly updated right after appending. Otherwise, the subsequent fetch request may fail
* due to the delay in updating the committed offset.
*/
private final boolean appendCallbackAsync;
private final long slowFetchTimeoutMillis;

public AlwaysSuccessClient(Client client) {
this(client, true, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
this(client, true);
}

public AlwaysSuccessClient(Client client, boolean appendCallbackAsync) {
this(client, appendCallbackAsync, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
}

public AlwaysSuccessClient(Client client, boolean appendCallbackAsync, long slowFetchTimeoutMillis) {
this.innerClient = client;
this.streamClient = new StreamClientImpl(client.streamClient());
this.kvClient = client.kvClient();
this.delayer = new Delayer(delayFetchScheduler);
this.appendCallbackAsync = appendCallbackAsync;
this.slowFetchTimeoutMillis = slowFetchTimeoutMillis;
}

@Override
Expand Down Expand Up @@ -133,7 +121,6 @@ public void shutdown() {
generalCallbackExecutors.shutdownNow();
appendCallbackExecutors.shutdownNow();
fetchCallbackExecutors.shutdownNow();
delayFetchScheduler.shutdownNow();
}

/**
Expand Down Expand Up @@ -304,80 +291,10 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture<
}, LOGGER));
}

/**
* Get a new CompletableFuture with a {@link SlowFetchHintException} if not otherwise completed before the given timeout.
*
* @param id the id of rawFuture in holdUpFetchingFutureMap
* @param rawFuture the raw future
* @param timeout how long to wait before completing exceptionally with a SlowFetchHintException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, otherwise a new
* CompletableFuture with a {@link SlowFetchHintException}
*/
private CompletableFuture<FetchResult> timeoutAndStoreFuture(String id,
CompletableFuture<FetchResult> rawFuture, long timeout,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException();
}

if (rawFuture.isDone()) {
return rawFuture;
}

final CompletableFuture<FetchResult> cf = new CompletableFuture<>();
rawFuture.whenComplete(new CompleteFetchingFutureAndCancelTimeoutCheck(delayer.delay(() -> {
if (rawFuture == null) {
return;
}

// If rawFuture is done, then complete the cf with the result of rawFuture.
if (rawFuture.isDone()) {
rawFuture.whenComplete((result, exception) -> {
if (exception != null) {
cf.completeExceptionally(exception);
} else {
cf.complete(result);
}
});
} else { // else, complete the cf with a SlowFetchHintException and store the rawFuture for slow fetching.
holdUpFetchingFutureMap.putIfAbsent(id, rawFuture);
cf.completeExceptionally(new SlowFetchHintException());
}
}, timeout, unit), cf));
return cf;
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
String holdUpKey = startOffset + "-" + endOffset + "-" + maxBytesHint;
CompletableFuture<FetchResult> cf = new CompletableFuture<>();
// If this thread is not marked, then just fetch data.
if (!SeparateSlowAndQuickFetchHint.isMarked()) {
CompletableFuture<FetchResult> holdUpCf = holdUpFetchingFutureMap.remove(holdUpKey);
if (holdUpCf != null) {
holdUpCf.thenAccept(cf::complete);
} else {
fetch0(startOffset, endOffset, maxBytesHint, cf);
}
} else {
CompletableFuture<FetchResult> firstFetchFuture = new CompletableFuture<>();
fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture);
// Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException.
timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (ex instanceof SlowFetchHintException) {
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(ex);
} else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
cf.completeExceptionally(ex);
}
} else {
cf.complete(rst);
}
}, LOGGER));
}
fetch0(startOffset, endOffset, maxBytesHint, cf);
return cf;
}

Expand Down

This file was deleted.

3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchingExecutors.submit(new Runnable {
override def run(): Unit = {
ReadManualReleaseHint.mark()
// FIXME: Buggy, replace quick slow fetch to async fetch
// SeparateSlowAndQuickFetchHint.mark()
doFetchingRecords()
// SeparateSlowAndQuickFetchHint.reset()
ReadManualReleaseHint.reset()
}
})
Expand Down
Loading