From 93a328f2a56676809bfe5c5efe903bea6f6c6035 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 23 Nov 2023 12:26:18 +0100 Subject: [PATCH] next try --- e2e/cluster/kind.go | 4 +- e2e/kafka/kafka_test.go | 147 ++++++++++++++++++---------------------- 2 files changed, 68 insertions(+), 83 deletions(-) diff --git a/e2e/cluster/kind.go b/e2e/cluster/kind.go index b5975da34..2ea91e9df 100644 --- a/e2e/cluster/kind.go +++ b/e2e/cluster/kind.go @@ -356,8 +356,8 @@ func withTimeout(f env.Func, timeout time.Duration) env.Func { if time.Since(start) > timeout { return ctx, fmt.Errorf("timeout (%s) trying to execute function: %w", timeout, err) } - tlog.WithError(err).Debug("function did not succeed. Retrying after 1s") - time.Sleep(time.Second) + tlog.WithError(err).Debug("function did not succeed. Retrying after 5s") + time.Sleep(5 * time.Second) } } } diff --git a/e2e/kafka/kafka_test.go b/e2e/kafka/kafka_test.go index eff037b42..2e2925e05 100644 --- a/e2e/kafka/kafka_test.go +++ b/e2e/kafka/kafka_test.go @@ -16,9 +16,7 @@ import ( "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/e2e-framework/klient" "sigs.k8s.io/e2e-framework/klient/k8s" "sigs.k8s.io/e2e-framework/klient/wait" "sigs.k8s.io/e2e-framework/klient/wait/conditions" @@ -38,11 +36,11 @@ var ( func TestMain(m *testing.M) { logrus.StandardLogger().SetLevel(logrus.DebugLevel) - scheme.Scheme.AddKnownTypeWithName(schema.GroupVersionKind{ - Group: "kafka.strimzi.io", - Version: "v1beta2", - Kind: "Kafka", - }, &Kafka{}) + // scheme.Scheme.AddKnownTypeWithName(schema.GroupVersionKind{ + // Group: "kafka.strimzi.io", + // Version: "v1beta2", + // Kind: "Kafka", + // }, &Kafka{}) testCluster = cluster.NewKind( clusterNamePrefix+time.Now().Format("20060102-150405"), @@ -54,54 +52,9 @@ func TestMain(m *testing.M) { cluster.Deploy(cluster.Deployment{ Order: cluster.ExternalServices, ManifestFile: path.Join("manifests", "11-kafka-cluster.yml"), ReadyFunction: func(cfg *envconf.Config) error { - client, err := cfg.NewClient() - if err != nil { - return fmt.Errorf("can't create k8s client: %w", err) - } // wait for kafka to be ready - kfk := Kafka{ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, Name: "kafka-cluster", - }} - var depl appsv1.DeploymentList - err = cfg.Client().Resources(namespace).List(context.TODO(), &depl) - if err != nil { - return fmt.Errorf("can't list depls: %w", err) - } - deplInfo := []string{} - for _, p := range depl.Items { - deplInfo = append(deplInfo, fmt.Sprintf("%s (%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.Replicas)) - } - klog.Infof("Deployments: " + strings.Join(deplInfo, " ,,,,, ")) - var sfs appsv1.StatefulSetList - err = cfg.Client().Resources(namespace).List(context.TODO(), &sfs) - if err != nil { - return fmt.Errorf("can't list sfs: %w", err) - } - sfsInfo := []string{} - for _, p := range sfs.Items { - sfsInfo = append(sfsInfo, fmt.Sprintf("%s (%d/%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.AvailableReplicas, p.Status.Replicas)) - } - klog.Infof("StatefulSets: " + strings.Join(sfsInfo, " ,,,,, ")) - if err := wait.For(conditions.New(client.Resources()). - ResourceMatch(&kfk, func(object k8s.Object) bool { - kafka, ok := object.(*Kafka) - if !ok { - klog.Errorf("could not cast Kafka obj: %v", object) - return false - } - for _, cond := range kafka.Status.Conditions { - klog.WithFields(logrus.Fields{ - "reason": cond.Reason, - "msg": cond.Message, - "type": cond.Type, - "status": cond.Status, - }).Info("Waiting for kafka to be up and running") - if cond.Type == conditionReady { - return cond.Status == metav1.ConditionTrue - } - } - return kafka.Status.Ready() - }), wait.WithTimeout(time.Minute*1)); err != nil { + if err := waitForKafka(cfg.Client()); err != nil { + debugListResources(cfg.Client()) return fmt.Errorf("waiting for kafka cluster to be ready: %w", err) } return nil @@ -132,38 +85,70 @@ func TestBasicFlowCapture(t *testing.T) { bt.DoTest(t) } -const conditionReady = "Ready" - -// Kafka meta object for its usage within the API -type Kafka struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty"` - Status *KafkaStatus `json:"status,omitempty"` +func waitForKafka(client klient.Client) error { + if err := waitForStatefulSet(client, "kafka-cluster-zookeeper"); err != nil { + return err + } + if err := waitForStatefulSet(client, "kafka-cluster-kafka"); err != nil { + return err + } + if err := waitForDeployment(client, "strimzi-cluster-operator"); err != nil { + return err + } + if err := waitForDeployment(client, "kafka-cluster-entity-operator"); err != nil { + return err + } + return nil } -type KafkaStatus struct { - Conditions []metav1.Condition `json:"conditions,omitempty"` +func waitForDeployment(client klient.Client, name string) error { + depl := appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}} + if err := wait.For(conditions.New(client.Resources(namespace)).ResourceMatch(&depl, func(object k8s.Object) bool { + d := object.(*appsv1.Deployment) + return d.Status.ReadyReplicas == 1 + }), wait.WithTimeout(time.Second*5)); err != nil { + return fmt.Errorf("deployment %s not ready: %w", name, err) + } + return nil } -func (k *Kafka) DeepCopyObject() runtime.Object { - return &(*k) +func waitForStatefulSet(client klient.Client, name string) error { + sfs := appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: name}} + if err := wait.For(conditions.New(client.Resources(namespace)).ResourceMatch(&sfs, func(object k8s.Object) bool { + klog.Infof("got obj: %v", object) + s, ok := object.(*appsv1.StatefulSet) + if !ok { + klog.Errorf("could not cast %v", object) + } + klog.Infof("Status: %v", s.Status) + return s.Status.AvailableReplicas == 1 && s.Status.ReadyReplicas == 1 + }), wait.WithTimeout(time.Second*5)); err != nil { + return fmt.Errorf("statefulset %s not ready: %w", name, err) + } + return nil } -func (ks *KafkaStatus) Ready() bool { - if ks == nil { - return false +func debugListResources(client klient.Client) { + var depl appsv1.DeploymentList + err := client.Resources(namespace).List(context.TODO(), &depl) + if err != nil { + klog.Errorf("Can't list deployments: %v", err) + return } - klog.Infof("Kafka len of conditions: %d", len(ks.Conditions)) - for _, cond := range ks.Conditions { - klog.WithFields(logrus.Fields{ - "reason": cond.Reason, - "msg": cond.Message, - "type": cond.Type, - "status": cond.Status, - }).Info("Waiting for kafka to be up and running") - if cond.Type == conditionReady { - return cond.Status == metav1.ConditionTrue - } + deplInfo := []string{} + for _, p := range depl.Items { + deplInfo = append(deplInfo, fmt.Sprintf("%s (%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.Replicas)) + } + klog.Infof("Deployments: " + strings.Join(deplInfo, ", ")) + var sfs appsv1.StatefulSetList + err = client.Resources(namespace).List(context.TODO(), &sfs) + if err != nil { + klog.Errorf("Can't list stateful sets: %v", err) + return + } + sfsInfo := []string{} + for _, p := range sfs.Items { + sfsInfo = append(sfsInfo, fmt.Sprintf("%s (%d/%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.AvailableReplicas, p.Status.Replicas)) } - return false + klog.Infof("StatefulSets: " + strings.Join(sfsInfo, ", ")) }