Skip to content

Commit

Permalink
[ST] Fix and improve QuotasST (#9813)
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Kral <[email protected]>
  • Loading branch information
im-konge authored Mar 13, 2024
1 parent cbae4d4 commit a1afa58
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,16 @@ public interface TestConstants {
*/
String ROUTE = "route";


/**
* Tag for tests that focus on migration from ZK to KRaft
*/
String MIGRATION = "migration";

/**
* Tag for tests that uses Strimzi quotas plugin
*/
String QUOTAS_PLUGIN = "quotasplugin";

/**
* Tag for tests, without ARM,AARCH64 support
*/
Expand Down
42 changes: 25 additions & 17 deletions systemtest/src/test/java/io/strimzi/systemtest/kafka/QuotasST.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,19 @@
import java.util.Collections;

import static io.strimzi.systemtest.TestConstants.INTERNAL_CLIENTS_USED;
import static io.strimzi.systemtest.TestConstants.QUOTAS_PLUGIN;
import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

/**
* NOTE: STs in this class will not properly work on `minikube` clusters (and maybe not on other clusters that uses local
* storage), because the calculation of currently used storage is based
* on the local storage, which can be shared across multiple Docker containers.
* To properly run this suite, you should use cluster with proper storage.
*/
@Tag(QUOTAS_PLUGIN)
public class QuotasST extends AbstractST {

/**
Expand All @@ -39,9 +48,9 @@ public class QuotasST extends AbstractST {
@ParallelNamespaceTest
@Tag(INTERNAL_CLIENTS_USED)
void testKafkaQuotasPluginIntegration() {
assumeFalse(cluster.isMinikube() || cluster.isMicroShift());

final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
final String producerName = "quotas-producer";
final String consumerName = "quotas-consumer";

resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
Expand All @@ -54,32 +63,31 @@ void testKafkaQuotasPluginIntegration() {
.editKafka()
.addToConfig("client.quota.callback.class", "io.strimzi.kafka.quotas.StaticQuotaCallback")
.addToConfig("client.quota.callback.static.storage.hard", "55000000")
.addToConfig("client.quota.callback.static.storage.soft", "50000000")
.addToConfig("client.quota.callback.static.storage.soft", "20000000")
.addToConfig("client.quota.callback.static.storage.check-interval", "5")
.withNewPersistentClaimStorage()
.withSize("1Gi")
.endPersistentClaimStorage()
.endKafka()
.endSpec()
.build());
resourceManager.createResourceWithWait(KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), testStorage.getNamespaceName()).build());

// Send more messages than disk can store to see if the integration works
KafkaClients basicClients = new KafkaClientsBuilder()
.withProducerName(producerName)
.withConsumerName(consumerName)
final KafkaClients clients = new KafkaClientsBuilder()
.withProducerName(testStorage.getProducerName())
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getClusterName()))
.withTopicName(testStorage.getTopicName())
.withMessageCount(100000000)
.withMessageCount(100000)
.withDelayMs(0)
.withMessage(String.join("", Collections.nCopies(1000, "#")))
.withNamespaceName(testStorage.getNamespaceName())
.build();

resourceManager.createResourceWithWait(basicClients.producerStrimzi());

resourceManager.createResourceWithWait(clients.producerStrimzi());
// Kafka Quotas Plugin should stop producer in around 10-20 seconds with configured throughput
assertThrows(WaitException.class, () -> JobUtils.waitForJobFailure(producerName, Environment.TEST_SUITE_NAMESPACE, 120_000));
assertThrows(WaitException.class, () -> JobUtils.waitForJobFailure(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, 120_000));

String kafkaLog = kubeClient(testStorage.getNamespaceName()).logs(KafkaResources.kafkaPodName(testStorage.getClusterName(), 0));
String brokerPodName = kubeClient().listPods(testStorage.getNamespaceName(), testStorage.getBrokerSelector()).get(0).getMetadata().getName();
String kafkaLog = kubeClient().logsInSpecificNamespace(testStorage.getNamespaceName(), brokerPodName);
String softLimitLog = "disk is beyond soft limit";
String hardLimitLog = "disk is full";
assertThat("Kafka log doesn't contain '" + softLimitLog + "' log", kafkaLog, CoreMatchers.containsString(softLimitLog));
Expand All @@ -95,8 +103,8 @@ void afterEach() {
@BeforeAll
void setup() {
this.clusterOperator = this.clusterOperator
.defaultInstallation()
.createInstallation()
.runInstallation();
.defaultInstallation()
.createInstallation()
.runInstallation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.strimzi.test.k8s.cluster.Kind;
import io.strimzi.test.k8s.cluster.KubeCluster;
import io.strimzi.test.k8s.cluster.Microshift;
import io.strimzi.test.k8s.cluster.Minikube;
import io.strimzi.test.k8s.cluster.OpenShift;
import io.strimzi.test.k8s.cmdClient.KubeCmdClient;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -333,6 +334,10 @@ public boolean isMicroShift() {
return kubeClusterResource.cluster() instanceof Microshift;
}

public boolean isMinikube() {
return kubeClusterResource.cluster() instanceof Minikube;
}

/** Returns list of currently deployed resources */
public List<String> getListOfDeployedResources() {
return deploymentResources;
Expand Down

0 comments on commit a1afa58

Please sign in to comment.