Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor use of Informers in UO and SPS controller #10885

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
Expand All @@ -17,8 +16,6 @@
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.fabric8.kubernetes.client.readiness.Readiness;
import io.micrometer.core.instrument.Timer;
import io.strimzi.api.kafka.model.connect.KafkaConnect;
Expand All @@ -45,6 +42,7 @@
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.model.StatusUtils;
import io.strimzi.operator.common.operator.resource.concurrent.Informer;

import java.util.Collection;
import java.util.HashSet;
Expand All @@ -62,9 +60,7 @@ public class StrimziPodSetController implements Runnable {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(StrimziPodSetController.class);

private static final long DEFAULT_RESYNC_PERIOD_MS = 5 * 60 * 1_000L; // 5 minutes by default
private static final LabelSelector POD_LABEL_SELECTOR = new LabelSelectorBuilder()
.withMatchExpressions(new LabelSelectorRequirement(Labels.STRIMZI_KIND_LABEL, "Exists", null))
.build();
private static final LabelSelector POD_LABEL_SELECTOR = new LabelSelector(List.of(new LabelSelectorRequirement(Labels.STRIMZI_KIND_LABEL, "Exists", null)), null);

private final Thread controllerThread;

Expand All @@ -77,16 +73,11 @@ public class StrimziPodSetController implements Runnable {
private final String watchedNamespace;

private final BlockingQueue<SimplifiedReconciliation> workQueue;
private final SharedIndexInformer<Pod> podInformer;
private final SharedIndexInformer<StrimziPodSet> strimziPodSetInformer;
private final SharedIndexInformer<Kafka> kafkaInformer;
private final SharedIndexInformer<KafkaConnect> kafkaConnectInformer;
private final SharedIndexInformer<KafkaMirrorMaker2> kafkaMirrorMaker2Informer;
private final Lister<Pod> podLister;
private final Lister<StrimziPodSet> strimziPodSetLister;
private final Lister<Kafka> kafkaLister;
private final Lister<KafkaConnect> kafkaConnectLister;
private final Lister<KafkaMirrorMaker2> kafkaMirrorMaker2Lister;
private final Informer<Pod> podInformer;
private final Informer<StrimziPodSet> strimziPodSetInformer;
private final Informer<Kafka> kafkaInformer;
private final Informer<KafkaConnect> kafkaConnectInformer;
private final Informer<KafkaMirrorMaker2> kafkaMirrorMaker2Informer;

/**
* Creates the StrimziPodSet controller. The controller should normally exist once per operator for cluster-wide mode
Expand Down Expand Up @@ -117,7 +108,7 @@ public StrimziPodSetController(
) {
this.podOperator = podOperator;
this.strimziPodSetOperator = strimziPodSetOperator;
this.crSelector = (crSelectorLabels == null || crSelectorLabels.toMap().isEmpty()) ? null : new LabelSelector(null, crSelectorLabels.toMap());
this.crSelector = new LabelSelector(null, (crSelectorLabels == null || crSelectorLabels.toMap().isEmpty()) ? null : crSelectorLabels.toMap());
this.watchedNamespace = watchedNamespace;
this.workQueue = new ArrayBlockingQueue<>(podSetControllerWorkQueueSize);

Expand All @@ -126,20 +117,15 @@ public StrimziPodSetController(

// Kafka, KafkaConnect and KafkaMirrorMaker2 informers and listers are used to get the CRs quickly.
// This is needed for verification of the CR selector labels.
this.kafkaInformer = kafkaOperator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS);
this.kafkaLister = new Lister<>(kafkaInformer.getIndexer());
this.kafkaConnectInformer = kafkaConnectOperator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS);
this.kafkaConnectLister = new Lister<>(kafkaConnectInformer.getIndexer());
this.kafkaMirrorMaker2Informer = kafkaMirrorMaker2Operator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS);
this.kafkaMirrorMaker2Lister = new Lister<>(kafkaMirrorMaker2Informer.getIndexer());
this.kafkaInformer = kafkaOperator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS);
this.kafkaConnectInformer = kafkaConnectOperator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS);
this.kafkaMirrorMaker2Informer = kafkaMirrorMaker2Operator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS);

// StrimziPodSet informer and lister is used to get events about StrimziPodSet and get StrimziPodSet quickly
this.strimziPodSetInformer = strimziPodSetOperator.informer(watchedNamespace, DEFAULT_RESYNC_PERIOD_MS);
this.strimziPodSetLister = new Lister<>(strimziPodSetInformer.getIndexer());
this.strimziPodSetInformer = strimziPodSetOperator.informer(watchedNamespace, new LabelSelector(), DEFAULT_RESYNC_PERIOD_MS);

// Pod informer and lister is used to get events about pods and get pods quickly
this.podInformer = podOperator.informer(watchedNamespace, POD_LABEL_SELECTOR, DEFAULT_RESYNC_PERIOD_MS);
this.podLister = new Lister<>(podInformer.getIndexer());

this.controllerThread = new Thread(this, "StrimziPodSetController");
}
Expand All @@ -158,26 +144,13 @@ protected boolean isSynced() {

protected void startController() {
strimziPodSetInformer.addEventHandler(new PodSetEventHandler());
strimziPodSetInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("StrimziPodSet", isStarted, throwable));

podInformer.addEventHandler(new PodEventHandler());
podInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Pod", isStarted, throwable));

kafkaInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Kafka", isStarted, throwable));
kafkaConnectInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaConnect", isStarted, throwable));
kafkaMirrorMaker2Informer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaMirrorMaker2", isStarted, throwable));

strimziPodSetInformer.start();
podInformer.start();
kafkaInformer.start();
kafkaConnectInformer.start();
kafkaMirrorMaker2Informer.start();

strimziPodSetInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("StrimziPodSet", t, stop));
podInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Pod", t, stop));
kafkaInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Kafka", t, stop));
kafkaConnectInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaConnect", t, stop));
kafkaMirrorMaker2Informer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaMirrorMaker2", t, stop));
}

protected void stopController() {
Expand Down Expand Up @@ -256,9 +229,8 @@ private StrimziPodSet findParentPodSetForPod(Pod pod) {
* @return The parent StrimziPodSet (or null if not found)
*/
private StrimziPodSet findParentPodSetForPodByLabels(Pod pod) {
return strimziPodSetLister
.namespace(pod.getMetadata().getNamespace())
.list()
return strimziPodSetInformer
.list(pod.getMetadata().getNamespace())
.stream()
.filter(podSet -> podSet.getSpec() != null
&& Util.matchesSelector(podSet.getSpec().getSelector(), pod))
Expand All @@ -283,9 +255,8 @@ private StrimziPodSet findParentPodSetForPodByOwnerReference(Pod pod) {
return null;
} else {
// We have owner reference => we find the StrimziPodSet based on it
return strimziPodSetLister
.namespace(pod.getMetadata().getNamespace())
.list()
return strimziPodSetInformer
.list(pod.getMetadata().getNamespace())
.stream()
.filter(podSet -> podSet.getMetadata().getName().equals(owner.getName()))
.findFirst()
Expand Down Expand Up @@ -327,9 +298,9 @@ private HasMetadata findCustomResource(StrimziPodSet podSet) {
HasMetadata cr = null;

switch (podSet.getMetadata().getLabels().get(Labels.STRIMZI_KIND_LABEL)) {
case Kafka.RESOURCE_KIND -> cr = kafkaLister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName);
case KafkaConnect.RESOURCE_KIND -> cr = kafkaConnectLister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName);
case KafkaMirrorMaker2.RESOURCE_KIND -> cr = kafkaMirrorMaker2Lister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName);
case Kafka.RESOURCE_KIND -> cr = kafkaInformer.get(podSet.getMetadata().getNamespace(), customResourceName);
case KafkaConnect.RESOURCE_KIND -> cr = kafkaConnectInformer.get(podSet.getMetadata().getNamespace(), customResourceName);
case KafkaMirrorMaker2.RESOURCE_KIND -> cr = kafkaMirrorMaker2Informer.get(podSet.getMetadata().getNamespace(), customResourceName);
default -> LOGGER.warnOp("StrimziPodSet {} belongs to unsupported custom resource kind {}", podSet.getMetadata().getName(), podSet.getMetadata().getLabels().get(Labels.STRIMZI_KIND_LABEL));
}

Expand Down Expand Up @@ -359,7 +330,7 @@ private void reconcile(Reconciliation reconciliation) {
try {
String name = reconciliation.name();
String namespace = reconciliation.namespace();
StrimziPodSet podSet = strimziPodSetLister.namespace(namespace).get(name);
StrimziPodSet podSet = strimziPodSetInformer.get(namespace, name);

if (podSet == null) {
LOGGER.debugCr(reconciliation, "StrimziPodSet is null => nothing to do");
Expand Down Expand Up @@ -431,7 +402,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, StrimziPodSet podS
if (!new StatusDiff(podSet.getStatus(), desiredStatus).isEmpty()) {
try {
LOGGER.debugCr(reconciliation, "Updating status of StrimziPodSet {} in namespace {}", reconciliation.name(), reconciliation.namespace());
StrimziPodSet latestPodSet = strimziPodSetLister.namespace(reconciliation.namespace()).get(reconciliation.name());
StrimziPodSet latestPodSet = strimziPodSetInformer.get(reconciliation.namespace(), reconciliation.name());
if (latestPodSet != null) {
StrimziPodSet updatedPodSet = new StrimziPodSetBuilder(latestPodSet)
.withStatus(desiredStatus)
Expand Down Expand Up @@ -461,7 +432,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, StrimziPodSet podS
* @param podCounter Pod Counter used to count pods for the status
*/
private void maybeCreateOrPatchPod(Reconciliation reconciliation, Pod pod, OwnerReference owner, PodCounter podCounter) {
Pod currentPod = podLister.namespace(reconciliation.namespace()).get(pod.getMetadata().getName());
Pod currentPod = podInformer.get(reconciliation.namespace(), pod.getMetadata().getName());

if (currentPod == null) {
// Pod does not exist => we create it
Expand Down Expand Up @@ -512,9 +483,8 @@ private void maybeCreateOrPatchPod(Reconciliation reconciliation, Pod pod, Owner
* @param podCounter Pod Counter used to count pods for the status
*/
private void removeDeletedPods(Reconciliation reconciliation, LabelSelector selector, Collection<String> desiredPodNames, PodCounter podCounter) {
Set<String> toBeDeleted = podLister
.namespace(reconciliation.namespace())
.list()
Set<String> toBeDeleted = podInformer
.list(reconciliation.namespace())
.stream()
.filter(pod -> Util.matchesSelector(selector, pod))
.map(pod -> pod.getMetadata().getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.strimzi.operator.common.config.ConfigParameter;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.operator.resource.ReconcileResult;
import io.strimzi.operator.common.operator.resource.concurrent.Informer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

Expand Down Expand Up @@ -403,34 +404,6 @@ public Future<Void> deleteAsync(Reconciliation reconciliation, String namespace,
return internalDelete(reconciliation, namespace, name, cascading).map((Void) null);
}

/**
* Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide). The
* informer returned by this method is not running and has to be started by the code using it.
*
* @param namespace Namespace on which to inform
* @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds
*
* @return Informer instance
*/
public SharedIndexInformer<T> informer(String namespace, long resyncIntervalMs) {
return runnableInformer(applyNamespace(namespace), resyncIntervalMs);
}

/**
* Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide)
* matching the selector. The informer returned by this method is not running and has to be started by the code
* using it.
*
* @param namespace Namespace on which to inform
* @param selectorLabels Selector which should be matched by the resources
* @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds
*
* @return Informer instance
*/
public SharedIndexInformer<T> informer(String namespace, Map<String, String> selectorLabels, long resyncIntervalMs) {
return runnableInformer(applyNamespace(namespace).withLabels(selectorLabels), resyncIntervalMs);
}

/**
* Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide)
* matching the selector. The informer returned by this method is not running and has to be started by the code
Expand All @@ -442,8 +415,8 @@ public SharedIndexInformer<T> informer(String namespace, Map<String, String> sel
*
* @return Informer instance
*/
public SharedIndexInformer<T> informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) {
return runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs);
public Informer<T> informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) {
return new Informer<>(runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void startStop(VertxTestContext context, String namespaces, boolean podS
return mockCmInformer;
});

when(mockNamespacedCms.withLabels(any())).thenReturn(mockNamespacedCms);
when(mockNamespacedCms.withLabelSelector(any(LabelSelector.class))).thenReturn(mockNamespacedCms);
when(mockCms.inNamespace(namespace)).thenReturn(mockNamespacedCms);

// Mock Pods
Expand Down Expand Up @@ -230,7 +230,7 @@ private void startStopAllNamespaces(VertxTestContext context, String namespaces,
when(mockCmInformer.stopped()).thenReturn(CompletableFuture.completedFuture(null));

AnyNamespaceOperation mockFilteredCms = mock(AnyNamespaceOperation.class);
when(mockFilteredCms.withLabels(any())).thenReturn(mockFilteredCms);
when(mockFilteredCms.withLabelSelector(any(LabelSelector.class))).thenReturn(mockFilteredCms);
when(mockFilteredCms.watch(any())).thenAnswer(invo -> {
numWatchers.incrementAndGet();
Watch mockWatch = mock(Watch.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
package io.strimzi.operator.common;

import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.strimzi.operator.common.operator.resource.concurrent.Informer;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -16,57 +16,21 @@
public class InformerUtils {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(InformerUtils.class);

/**
* Logs exceptions in the informers to give us a better overview of what is happening.
*
* @param type Type of the informer
* @param isStarted Flag indicating whether the informer is already started
* @param throwable Throwable describing the exception which occurred
*
* @return Boolean indicating whether the informer should retry or not.
*/
public static boolean loggingExceptionHandler(String type, boolean isStarted, Throwable throwable) {
LOGGER.errorOp("Caught exception in the " + type + " informer which is " + (isStarted ? "started" : "not started"), throwable);
// We always want the informer to retry => we just want to log the error
return true;
}

/**
* Watches for informers to not stop unless we are shutting down the controller. If it stops unexpectedly, we will
* terminate the operator.
*
* @param type Type of the informer
* @param reason Reason why the informer stopped
* @param stopping Flag indicating if the controller shutdown is in progress (in which case the informer is expected to stop)
*/
public static void stoppedInformerHandler(String type, Throwable reason, boolean stopping) {
if (!stopping) {
// the informer is not being stopped, so this is unexpected!
if (reason != null) {
LOGGER.errorOp("{} informer stopped unexpectedly", type, reason);
} else {
LOGGER.errorOp("{} informer stopped unexpectedly without a reason", type);
}
} else {
LOGGER.infoOp("{} informer stopped", type);
}
}

/**
* Synchronously stops one or more informers. It will stop them and then wait for up to the specified timeout for
* each of them to actually stop.
*
* @param timeoutMs Timeout in milliseconds for how long we will wait for each informer to stop
* @param informers Informers which should be stopped.
*/
public static void stopAll(long timeoutMs, SharedIndexInformer<?>... informers) {
public static void stopAll(long timeoutMs, Informer<?>... informers) {
LOGGER.infoOp("Stopping informers");
for (SharedIndexInformer<?> informer : informers) {
for (Informer<?> informer : informers) {
informer.stop();
}

try {
for (SharedIndexInformer<?> informer : informers) {
for (Informer<?> informer : informers) {
informer.stopped().toCompletableFuture().get(timeoutMs, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | TimeoutException | ExecutionException e) {
Expand Down
Loading