From 68d1499c675940a384af9bd660bae18df2e9426e Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Fri, 3 Nov 2023 15:36:02 +0800 Subject: [PATCH] refactor: remove slow fetch hint; rename esUnit test Signed-off-by: Curtis Wan --- .github/workflows/build_kos.yml | 2 +- build.gradle | 64 ------------- .../errors/es/SlowFetchHintException.java | 44 --------- config/kraft/broker.properties | 2 +- config/kraft/controller.properties | 2 +- config/kraft/server.properties | 2 +- .../log/streamaspect/AlwaysSuccessClient.java | 87 +---------------- .../SeparateSlowAndQuickFetchHint.java | 47 --------- .../main/scala/kafka/server/KafkaApis.scala | 3 - .../scala/kafka/server/ReplicaManager.scala | 28 +----- .../autobalancer/AutoBalancerManagerTest.java | 2 +- .../autobalancer/ExecutionManagerTest.java | 2 +- .../kafka/autobalancer/LoadRetrieverTest.java | 2 +- .../autobalancer/goals/AbstractGoalTest.java | 2 +- .../AbstractResourceCapacityGoalTest.java | 2 +- .../AbstractResourceDistributionGoalTest.java | 2 +- .../autobalancer/goals/GoalTestBase.java | 2 +- .../AutoBalancerMetricsReporterTest.java | 2 +- .../metric/MetricSerdeTest.java | 2 +- .../metric/MetricsUtilsTest.java | 2 +- .../autobalancer/model/ClusterModelTest.java | 2 +- .../streamaspect/AlwaysSuccessClientTest.java | 96 +------------------ .../log/streamaspect/cache/FileCacheTest.java | 2 + .../ElasticLogSegmentTest.scala | 5 +- .../{es => streamaspect}/ElasticLogTest.scala | 5 +- .../ElasticUnifiedLogTest.scala | 5 +- ...java => PartitionChangeBuilderS3Test.java} | 4 +- .../services/kafka/templates/kafka.properties | 2 +- 28 files changed, 33 insertions(+), 389 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java delete mode 100644 core/src/main/scala/kafka/log/streamaspect/SeparateSlowAndQuickFetchHint.java rename core/src/test/scala/unit/kafka/log/{es => streamaspect}/ElasticLogSegmentTest.scala (99%) rename core/src/test/scala/unit/kafka/log/{es => streamaspect}/ElasticLogTest.scala (99%) rename core/src/test/scala/unit/kafka/log/{es => streamaspect}/ElasticUnifiedLogTest.scala (99%) rename metadata/src/test/java/org/apache/kafka/controller/{PartitionChangeBuilderESTest.java => PartitionChangeBuilderS3Test.java} (98%) diff --git a/.github/workflows/build_kos.yml b/.github/workflows/build_kos.yml index 9605aa9b35..f5441d8de7 100644 --- a/.github/workflows/build_kos.yml +++ b/.github/workflows/build_kos.yml @@ -85,4 +85,4 @@ jobs: - name: Setup Gradle uses: gradle/gradle-build-action@v2 - name: Unit Test - run: ./gradlew --build-cache metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest + run: ./gradlew --build-cache metadata:S3UnitTest core:S3UnitTest diff --git a/build.gradle b/build.gradle index 0d220c092e..5b11ce5d5a 100644 --- a/build.gradle +++ b/build.gradle @@ -466,36 +466,6 @@ subprojects { } } - tasks.register('esUnitTest', Test) { - description = 'Runs unit tests for elastic stream storage.' - dependsOn compileJava - - maxParallelForks = maxTestForks - ignoreFailures = userIgnoreFailures - - maxHeapSize = defaultMaxHeapSize - jvmArgs = defaultJvmArgs - - testLogging { - events = userTestLoggingEvents ?: testLoggingEvents - showStandardStreams = userShowStandardStreams ?: testShowStandardStreams - exceptionFormat = testExceptionFormat - displayGranularity = 0 - } - logTestStdout.rehydrate(delegate, owner, this)() - - exclude testsToExclude - - useJUnitPlatform { - includeTags 'esUnit' - } - - retry { - maxRetries = userMaxTestRetries - maxFailures = userMaxTestRetryFailures - } - } - tasks.register('S3UnitTest', Test) { description = 'Runs unit tests for Kafka on S3.' dependsOn compileJava @@ -526,40 +496,6 @@ subprojects { } } - tasks.register('esIntegrationTest', Test) { - description = 'Runs integration tests for elastic stream storage.' - dependsOn compileJava - shouldRunAfter test - - // Increase heap size for integration tests - maxHeapSize = "2560m" - jvmArgs = defaultJvmArgs - - testLogging { - events = userTestLoggingEvents ?: testLoggingEvents - showStandardStreams = userShowStandardStreams ?: testShowStandardStreams - exceptionFormat = testExceptionFormat - displayGranularity = 0 - } - logTestStdout.rehydrate(delegate, owner, this)() - - exclude testsToExclude - - useJUnitPlatform { - includeTags 'esIntegration' - } - - retry { - maxRetries = userMaxTestRetries - maxFailures = userMaxTestRetryFailures - } - - // Allows devs to run tests in a loop to debug flaky tests. See README. - if (project.hasProperty("rerun-tests")) { - outputs.upToDateWhen { false } - } - } - task integrationTest(type: Test, dependsOn: compileJava) { maxParallelForks = maxTestForks ignoreFailures = userIgnoreFailures diff --git a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java deleted file mode 100644 index 69b057307e..0000000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.errors.es; - -import org.apache.kafka.common.errors.RetriableException; - -/** - * Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool. - */ -public class SlowFetchHintException extends RetriableException { - - private static final long serialVersionUID = 1L; - - public SlowFetchHintException() { - super(); - } - - public SlowFetchHintException(String message) { - super(message); - } - - public SlowFetchHintException(Throwable cause) { - super(cause); - } - - public SlowFetchHintException(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index d7b6e3a731..53db546f8f 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -129,7 +129,7 @@ log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Settings for KoS ############################# -# enable store data in elastic stream layer +# enable store data in object storage elasticstream.enable=true # The stream namespace, default is _kafka_$clusterId. diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index abf7b742bc..d1c3eb136c 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -122,7 +122,7 @@ log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Settings for KoS ############################# -# enable store data in elastic stream layer +# enable store data in object storage elasticstream.enable=true # The stream namespace, default is _kafka_$clusterId. diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 66b32bea12..fc55235e54 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -132,7 +132,7 @@ log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Settings for KoS ############################# -# enable store data in elastic stream layer +# enable store data in object storage elasticstream.enable=true # The stream namespace, default is _kafka_$clusterId. diff --git a/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java index 548aedb901..6794b0d93a 100644 --- a/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java @@ -30,7 +30,6 @@ import com.automq.stream.api.StreamClientException; import com.automq.stream.utils.FutureUtil; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +57,6 @@ public class AlwaysSuccessClient implements Client { ErrorCode.STREAM_ALREADY_CLOSED, ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS ); - public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10; private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1, @@ -75,12 +73,9 @@ public class AlwaysSuccessClient implements Client { ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true)); private final ExecutorService fetchCallbackExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); - private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); private final Client innerClient; private final StreamClient streamClient; private final KVClient kvClient; - private final Delayer delayer; /** * The flag to indicate if the callback of append is async. * It is generally true, but for test cases, it is set to false. In test cases, we aim to ensure that @@ -88,23 +83,16 @@ public class AlwaysSuccessClient implements Client { * due to the delay in updating the committed offset. */ private final boolean appendCallbackAsync; - private final long slowFetchTimeoutMillis; public AlwaysSuccessClient(Client client) { - this(client, true, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); + this(client, true); } public AlwaysSuccessClient(Client client, boolean appendCallbackAsync) { - this(client, appendCallbackAsync, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); - } - - public AlwaysSuccessClient(Client client, boolean appendCallbackAsync, long slowFetchTimeoutMillis) { this.innerClient = client; this.streamClient = new StreamClientImpl(client.streamClient()); this.kvClient = client.kvClient(); - this.delayer = new Delayer(delayFetchScheduler); this.appendCallbackAsync = appendCallbackAsync; - this.slowFetchTimeoutMillis = slowFetchTimeoutMillis; } @Override @@ -133,7 +121,6 @@ public void shutdown() { generalCallbackExecutors.shutdownNow(); appendCallbackExecutors.shutdownNow(); fetchCallbackExecutors.shutdownNow(); - delayFetchScheduler.shutdownNow(); } /** @@ -304,80 +291,10 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture< }, LOGGER)); } - /** - * Get a new CompletableFuture with a {@link SlowFetchHintException} if not otherwise completed before the given timeout. - * - * @param id the id of rawFuture in holdUpFetchingFutureMap - * @param rawFuture the raw future - * @param timeout how long to wait before completing exceptionally with a SlowFetchHintException, in units of {@code unit} - * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter - * @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, otherwise a new - * CompletableFuture with a {@link SlowFetchHintException} - */ - private CompletableFuture timeoutAndStoreFuture(String id, - CompletableFuture rawFuture, long timeout, - TimeUnit unit) { - if (unit == null) { - throw new NullPointerException(); - } - - if (rawFuture.isDone()) { - return rawFuture; - } - - final CompletableFuture cf = new CompletableFuture<>(); - rawFuture.whenComplete(new CompleteFetchingFutureAndCancelTimeoutCheck(delayer.delay(() -> { - if (rawFuture == null) { - return; - } - - // If rawFuture is done, then complete the cf with the result of rawFuture. - if (rawFuture.isDone()) { - rawFuture.whenComplete((result, exception) -> { - if (exception != null) { - cf.completeExceptionally(exception); - } else { - cf.complete(result); - } - }); - } else { // else, complete the cf with a SlowFetchHintException and store the rawFuture for slow fetching. - holdUpFetchingFutureMap.putIfAbsent(id, rawFuture); - cf.completeExceptionally(new SlowFetchHintException()); - } - }, timeout, unit), cf)); - return cf; - } - @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { - String holdUpKey = startOffset + "-" + endOffset + "-" + maxBytesHint; CompletableFuture cf = new CompletableFuture<>(); - // If this thread is not marked, then just fetch data. - if (!SeparateSlowAndQuickFetchHint.isMarked()) { - CompletableFuture holdUpCf = holdUpFetchingFutureMap.remove(holdUpKey); - if (holdUpCf != null) { - holdUpCf.thenAccept(cf::complete); - } else { - fetch0(startOffset, endOffset, maxBytesHint, cf); - } - } else { - CompletableFuture firstFetchFuture = new CompletableFuture<>(); - fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture); - // Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException. - timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS) - .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { - if (ex != null) { - if (ex instanceof SlowFetchHintException) { - LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); - cf.completeExceptionally(ex); - } else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { - cf.completeExceptionally(ex); - } - } else { - cf.complete(rst); - } - }, LOGGER)); - } + fetch0(startOffset, endOffset, maxBytesHint, cf); return cf; } diff --git a/core/src/main/scala/kafka/log/streamaspect/SeparateSlowAndQuickFetchHint.java b/core/src/main/scala/kafka/log/streamaspect/SeparateSlowAndQuickFetchHint.java deleted file mode 100644 index 054a9cff0a..0000000000 --- a/core/src/main/scala/kafka/log/streamaspect/SeparateSlowAndQuickFetchHint.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.streamaspect; - -import io.netty.util.concurrent.FastThreadLocal; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class is used to mark if it is needed to diff quick fetch from slow fetch in current thread. - * If marked, data should be fetched within a short time, otherwise, the request should be satisfied in a separated slow-fetch thread pool. - */ -public class SeparateSlowAndQuickFetchHint { - private static final FastThreadLocal MANUAL_RELEASE = new FastThreadLocal<>() { - @Override - protected AtomicBoolean initialValue() { - return new AtomicBoolean(false); - } - }; - - public static boolean isMarked() { - return MANUAL_RELEASE.get().get(); - } - - public static void mark() { - MANUAL_RELEASE.get().set(true); - } - - public static void reset() { - MANUAL_RELEASE.get().set(false); - } -} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index aadfbf3c48..e4bf0e2c87 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1102,10 +1102,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchingExecutors.submit(new Runnable { override def run(): Unit = { ReadManualReleaseHint.mark() - // FIXME: Buggy, replace quick slow fetch to async fetch -// SeparateSlowAndQuickFetchHint.mark() doFetchingRecords() -// SeparateSlowAndQuickFetchHint.reset() ReadManualReleaseHint.reset() } }) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a9493b738b..edba628b47 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,7 +23,7 @@ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ -import kafka.log.streamaspect.{ElasticLogManager, ReadManualReleaseHint} +import kafka.log.streamaspect.ElasticLogManager import kafka.metrics.KafkaMetricsGroup import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers @@ -33,7 +33,6 @@ import kafka.utils.Implicits._ import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors._ -import org.apache.kafka.common.errors.es.SlowFetchHintException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -1075,14 +1074,10 @@ class ReplicaManager(val config: KafkaConfig, var hasPreferredReadReplica = false val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] - var containsSlowFetchHint = false logReadResults.foreach { case (topicIdPartition, logReadResult) => brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() - if (logReadResult.exception.isDefined && logReadResult.exception.get.isInstanceOf[SlowFetchHintException]) { - containsSlowFetchHint = true - } if (logReadResult.error != Errors.NONE) errorReadingData = true if (logReadResult.divergingEpoch.nonEmpty) @@ -1093,25 +1088,6 @@ class ReplicaManager(val config: KafkaConfig, logReadResultMap.put(topicIdPartition, logReadResult) } - // AutoMQ for Kafka inject start - // If there is any slow fetch hint, we will read from local log in a separate thread. - if (containsSlowFetchHint) { - slowFetchExecutors.submit(new Runnable { - override def run(): Unit = { - ReadManualReleaseHint.mark() - val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false) - ReadManualReleaseHint.reset() - val fetchPartitionData = logReadResults.map { case (tp, result) => - val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) - tp -> result.toFetchPartitionData(isReassignmentFetch) - } - responseCallback(fetchPartitionData) - } - }) - return - } - // AutoMQ for Kafka inject end - // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond @@ -1181,7 +1157,6 @@ class ReplicaManager(val config: KafkaConfig, _: ReplicaNotAvailableException | _: KafkaStorageException | _: OffsetOutOfRangeException | - _: SlowFetchHintException | _: InconsistentTopicIdException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, @@ -1428,7 +1403,6 @@ class ReplicaManager(val config: KafkaConfig, _: ReplicaNotAvailableException | _: KafkaStorageException | _: OffsetOutOfRangeException | - _: SlowFetchHintException | _: InconsistentTopicIdException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, diff --git a/core/src/test/java/kafka/autobalancer/AutoBalancerManagerTest.java b/core/src/test/java/kafka/autobalancer/AutoBalancerManagerTest.java index d4d5fbe84e..196971eeb4 100644 --- a/core/src/test/java/kafka/autobalancer/AutoBalancerManagerTest.java +++ b/core/src/test/java/kafka/autobalancer/AutoBalancerManagerTest.java @@ -55,7 +55,7 @@ import java.util.Properties; import java.util.StringJoiner; -@Tag("esUnit") +@Tag("S3Unit") public class AutoBalancerManagerTest extends AutoBalancerClientsIntegrationTestHarness { /** * Set up the unit test. diff --git a/core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java b/core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java index c6ca31640c..e69b26399c 100644 --- a/core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java +++ b/core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java @@ -37,7 +37,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -@Tag("esUnit") +@Tag("S3Unit") public class ExecutionManagerTest { private boolean checkTopicPartition(AlterPartitionReassignmentsRequestData.ReassignableTopic topic, diff --git a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java index 0a1791be62..cd37562de5 100644 --- a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java +++ b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java @@ -49,7 +49,7 @@ import java.util.List; import java.util.Map; -@Tag("esUnit") +@Tag("S3Unit") public class LoadRetrieverTest extends AutoBalancerClientsIntegrationTestHarness { protected static final String METRIC_TOPIC = "AutoBalancerMetricsReporterTest"; diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java index 43dd74aba6..e22146e49e 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java @@ -33,7 +33,7 @@ import java.util.Map; import java.util.StringJoiner; -@Tag("esUnit") +@Tag("S3Unit") public class AbstractGoalTest extends GoalTestBase { private final Map goalMap = new HashMap<>(); diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java index e77f075743..0e616e60f3 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.StringJoiner; -@Tag("esUnit") +@Tag("S3Unit") public class AbstractResourceCapacityGoalTest extends GoalTestBase { private final Map goalMap = new HashMap<>(); diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java index d431786f2e..5f53387b75 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.StringJoiner; -@Tag("esUnit") +@Tag("S3Unit") public class AbstractResourceDistributionGoalTest extends GoalTestBase { private final Map goalMap = new HashMap<>(); diff --git a/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java b/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java index 8950619ff6..134c7a20cf 100644 --- a/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java +++ b/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Tag; -@Tag("esUnit") +@Tag("S3Unit") public class GoalTestBase { protected static final String RACK = "default"; protected static final String TOPIC_0 = "TestTopic0"; diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java index c56f17056e..1cadea43e5 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java @@ -55,7 +55,7 @@ import static kafka.autobalancer.metricsreporter.metric.RawMetricType.TOPIC_PARTITION_BYTES_IN; import static kafka.autobalancer.metricsreporter.metric.RawMetricType.TOPIC_PARTITION_BYTES_OUT; -@Tag("esUnit") +@Tag("S3Unit") public class AutoBalancerMetricsReporterTest extends AutoBalancerClientsIntegrationTestHarness { /** diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java index 6578109591..4d3d4c8dab 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java @@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -@Tag("esUnit") +@Tag("S3Unit") public class MetricSerdeTest { private static final long TIME = 123L; private static final int BROKER_ID = 0; diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java index 41c802b1ba..d36a28bb0f 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -@Tag("esUnit") +@Tag("S3Unit") public class MetricsUtilsTest { @Test diff --git a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java index 295b0dd367..2e2daa5e51 100644 --- a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java +++ b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java @@ -36,7 +36,7 @@ import java.util.Collections; import java.util.List; -@Tag("esUnit") +@Tag("S3Unit") public class ClusterModelTest { @Test diff --git a/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java b/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java index 606b0b0c1b..3258ccdcef 100644 --- a/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java +++ b/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java @@ -27,9 +27,7 @@ import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; -import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -41,10 +39,8 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -53,18 +49,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@Tag("esUnit") +@Tag("S3Unit") class AlwaysSuccessClientTest { private AlwaysSuccessClient client; - @BeforeEach - public void setup() { - SeparateSlowAndQuickFetchHint.mark(); - } - @AfterEach public void teardown() { - SeparateSlowAndQuickFetchHint.reset(); client.shutdown(); } @@ -89,77 +79,6 @@ public void basicAppendAndFetch() throws ExecutionException, InterruptedExceptio stream.destroy(); } - @Test - public void testQuickFetch() throws ExecutionException, InterruptedException { - MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); - long slowFetchTimeoutMillis = 1000 * 2; - client = new AlwaysSuccessClient(memoryClientWithDelay, false, slowFetchTimeoutMillis); - client.start(); - List quickFetchDelayMillisList = List.of(1L, slowFetchTimeoutMillis / 2); - List payloads = List.of("hello".getBytes(), "world".getBytes()); - - // test quick fetch - for (Long delay : quickFetchDelayMillisList) { - memoryClientWithDelay.setDelayMillis(delay); - Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); - CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) - ).get(); - FetchResult fetched = stream.fetch(0, 100, 1000) - .orTimeout(delay + slowFetchTimeoutMillis / 2, TimeUnit.MILLISECONDS) - .get(); - checkAppendAndFetch(payloads, fetched); - stream.destroy(); - } - } - - @Test - public void testSlowFetch() throws ExecutionException, InterruptedException { - MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); - long slowFetchTimeoutMillis = 1000 * 2; - client = new AlwaysSuccessClient(memoryClientWithDelay, false, slowFetchTimeoutMillis); - client.start(); - List payloads = List.of("hello".getBytes(), "world".getBytes()); - - long slowFetchDelay = slowFetchTimeoutMillis * 3 / 2; - memoryClientWithDelay.setDelayMillis(slowFetchDelay); - Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); - CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) - ).get(); - - FetchResult fetched = null; - AtomicBoolean gotSlowFetchHintException = new AtomicBoolean(false); - try { - fetched = stream.fetch(0, 100, 1000) - .orTimeout(slowFetchDelay, TimeUnit.MILLISECONDS) - .get(); - checkAppendAndFetch(payloads, fetched); - } catch (ExecutionException e) { - // should throw SlowFetchHintException after SLOW_FETCH_TIMEOUT_MILLIS ms - assertEquals(SlowFetchHintException.class, e.getCause().getClass()); - gotSlowFetchHintException.set(true); - SeparateSlowAndQuickFetchHint.reset(); - // It should reuse the fetching future above, therefore only (SLOW_FETCH_TIMEOUT_MILLIS / 2) ms is tolerable. - fetched = stream.fetch(0, 100, 1000) - .orTimeout(slowFetchTimeoutMillis - 200, TimeUnit.MILLISECONDS) - .get(); - } - checkAppendAndFetch(payloads, fetched); - assertTrue(gotSlowFetchHintException.get(), "should throw SlowFetchHintException"); - stream.destroy(); - } - @Test public void testOpenStream() { MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); @@ -237,16 +156,9 @@ public void testNormalExceptionHandling() { stream.destroy(); stream = openStream(1).join(); - Stream finalStream = stream; - try { - finalStream.fetch(0, 100, 1000).join(); - } catch (CompletionException ex) { - if (ex.getCause() instanceof SlowFetchHintException) { - // expected - } else { - throw ex; - } - } + + stream.fetch(0, 100, 1000).join(); + stream.destroy(); stream = openStream(1).join(); diff --git a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java index 455910602d..7802b921e8 100644 --- a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java +++ b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -29,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +@Tag("S3Unit") public class FileCacheTest { @Test diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala similarity index 99% rename from core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala rename to core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala index a4b4511e94..3fbfb866f1 100644 --- a/core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala @@ -15,10 +15,9 @@ * limitations under the License. */ -package kafka.log.es +package kafka.log.streamaspect import kafka.log._ -import kafka.log.streamaspect.{ElasticLogManager, ElasticLogSegment} import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{KafkaConfig, LogDirFailureChannel} @@ -37,7 +36,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters.MapHasAsJava -@Tag("esUnit") +@Tag("S3Unit") class ElasticLogSegmentTest { private val topicPartition = new TopicPartition("topic", 0) private val segments = mutable.Map[Long, ElasticLogSegment]() diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala similarity index 99% rename from core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala rename to core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala index c4355259ad..e2fbe8ec7e 100644 --- a/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package kafka.log.es +package kafka.log.streamaspect import kafka.log._ import kafka.log.streamaspect.client.Context -import kafka.log.streamaspect.{ElasticLog, ElasticLogManager, MemoryClient} import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils.{MockTime, Scheduler, TestUtils} import org.apache.kafka.common.errors.KafkaStorageException @@ -35,7 +34,7 @@ import java.util.regex.Pattern import java.util.{Collections, Properties} import scala.jdk.CollectionConverters.IterableHasAsScala -@Tag("esUnit") +@Tag("S3Unit") class ElasticLogTest { val properties: Properties = TestUtils.createSimpleEsBrokerConfig() val kafkaConfig: KafkaConfig = KafkaConfig.fromProps(properties) diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala similarity index 99% rename from core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala rename to core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala index 1279b36bb7..2464d513e2 100755 --- a/core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package kafka.log.es +package kafka.log.streamaspect import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException} import kafka.log._ import kafka.log.streamaspect.client.Context -import kafka.log.streamaspect.{ElasticLeaderEpochCheckpoint, ElasticLogManager, ElasticLogSegment, ElasticUnifiedLog} import kafka.server._ import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} @@ -47,7 +46,7 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ -@Tag("esUnit") +@Tag("S3Unit") class ElasticUnifiedLogTest { var config: KafkaConfig = _ val brokerTopicStats = new BrokerTopicStats diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderS3Test.java similarity index 98% rename from metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java rename to metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderS3Test.java index 5302029f0a..dc957a27aa 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderS3Test.java @@ -29,8 +29,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -@Tag("esUnit") -public class PartitionChangeBuilderESTest { +@Tag("S3Unit") +public class PartitionChangeBuilderS3Test { @BeforeEach public void setUp() { ElasticStreamSwitch.setSwitch(true); diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 73aed1513b..cba3b59743 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -146,7 +146,7 @@ log.cleaner.dedupe.buffer.size=33554432 ############################# Settings for es ############################# create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy -# enable store data in elastic stream layer +# enable store data in object storage elasticstream.enable=true elasticstream.endpoint=s3://