Skip to content

Commit

Permalink
next try
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Nov 23, 2023
1 parent bbf7448 commit d760247
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 83 deletions.
4 changes: 2 additions & 2 deletions e2e/cluster/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func withTimeout(f env.Func, timeout time.Duration) env.Func {
if time.Since(start) > timeout {
return ctx, fmt.Errorf("timeout (%s) trying to execute function: %w", timeout, err)
}
tlog.WithError(err).Debug("function did not succeed. Retrying after 1s")
time.Sleep(time.Second)
tlog.WithError(err).Debug("function did not succeed. Retrying after 5s")
time.Sleep(5 * time.Second)
}
}
}
Expand Down
171 changes: 90 additions & 81 deletions e2e/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/e2e-framework/klient"
"sigs.k8s.io/e2e-framework/klient/k8s"
"sigs.k8s.io/e2e-framework/klient/wait"
"sigs.k8s.io/e2e-framework/klient/wait/conditions"
Expand All @@ -38,11 +36,11 @@ var (

func TestMain(m *testing.M) {
logrus.StandardLogger().SetLevel(logrus.DebugLevel)
scheme.Scheme.AddKnownTypeWithName(schema.GroupVersionKind{
Group: "kafka.strimzi.io",
Version: "v1beta2",
Kind: "Kafka",
}, &Kafka{})
// scheme.Scheme.AddKnownTypeWithName(schema.GroupVersionKind{
// Group: "kafka.strimzi.io",
// Version: "v1beta2",
// Kind: "Kafka",
// }, &Kafka{})

testCluster = cluster.NewKind(
clusterNamePrefix+time.Now().Format("20060102-150405"),
Expand All @@ -54,54 +52,9 @@ func TestMain(m *testing.M) {
cluster.Deploy(cluster.Deployment{
Order: cluster.ExternalServices, ManifestFile: path.Join("manifests", "11-kafka-cluster.yml"),
ReadyFunction: func(cfg *envconf.Config) error {
client, err := cfg.NewClient()
if err != nil {
return fmt.Errorf("can't create k8s client: %w", err)
}
// wait for kafka to be ready
kfk := Kafka{ObjectMeta: metav1.ObjectMeta{
Namespace: namespace, Name: "kafka-cluster",
}}
var depl appsv1.DeploymentList
err = cfg.Client().Resources(namespace).List(context.TODO(), &depl)
if err != nil {
return fmt.Errorf("can't list depls: %w", err)
}
deplInfo := []string{}
for _, p := range depl.Items {
deplInfo = append(deplInfo, fmt.Sprintf("%s (%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.Replicas))
}
klog.Infof("Deployments: " + strings.Join(deplInfo, " ,,,,, "))
var sfs appsv1.StatefulSetList
err = cfg.Client().Resources(namespace).List(context.TODO(), &sfs)
if err != nil {
return fmt.Errorf("can't list sfs: %w", err)
}
sfsInfo := []string{}
for _, p := range sfs.Items {
sfsInfo = append(sfsInfo, fmt.Sprintf("%s (%d/%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.AvailableReplicas, p.Status.Replicas))
}
klog.Infof("StatefulSets: " + strings.Join(sfsInfo, " ,,,,, "))
if err := wait.For(conditions.New(client.Resources()).
ResourceMatch(&kfk, func(object k8s.Object) bool {
kafka, ok := object.(*Kafka)
if !ok {
klog.Errorf("could not cast Kafka obj: %v", object)
return false
}
for _, cond := range kafka.Status.Conditions {
klog.WithFields(logrus.Fields{
"reason": cond.Reason,
"msg": cond.Message,
"type": cond.Type,
"status": cond.Status,
}).Info("Waiting for kafka to be up and running")
if cond.Type == conditionReady {
return cond.Status == metav1.ConditionTrue
}
}
return kafka.Status.Ready()
}), wait.WithTimeout(time.Minute*1)); err != nil {
if err := waitForKafka(cfg.Client()); err != nil {
debugListResources(cfg.Client())
return fmt.Errorf("waiting for kafka cluster to be ready: %w", err)
}
return nil
Expand Down Expand Up @@ -132,38 +85,94 @@ func TestBasicFlowCapture(t *testing.T) {
bt.DoTest(t)
}

const conditionReady = "Ready"

// Kafka meta object for its usage within the API
type Kafka struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Status *KafkaStatus `json:"status,omitempty"`
func waitForKafka(client klient.Client) error {
if err := waitForStatefulSet(client, "kafka-cluster-zookeeper"); err != nil {
return err
}
if err := waitForStatefulSet(client, "kafka-cluster-kafka"); err != nil {
return err
}
if err := waitForDeployment(client, "strimzi-cluster-operator"); err != nil {
return err
}
if err := waitForDeployment(client, "kafka-cluster-entity-operator"); err != nil {
return err
}
return nil
}

type KafkaStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
func waitForDeployment(client klient.Client, name string) error {
depl := appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}}
if err := wait.For(conditions.New(client.Resources(namespace)).ResourceMatch(&depl, func(object k8s.Object) bool {
d := object.(*appsv1.Deployment)
return d.Status.ReadyReplicas == 1
}), wait.WithTimeout(time.Second*5)); err != nil {
return fmt.Errorf("deployment %s not ready: %w", name, err)
}
return nil
}

func (k *Kafka) DeepCopyObject() runtime.Object {
return &(*k)
func waitForStatefulSet(client klient.Client, name string) error {
sfs := appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}}
if err := wait.For(conditions.New(client.Resources(namespace)).ResourceMatch(&sfs, func(object k8s.Object) bool {
klog.Infof("got obj: %v", object)
s, ok := object.(*appsv1.StatefulSet)
if !ok {
klog.Errorf("could not cast %v", object)
}
klog.Infof("Status: %v", s.Status)
return s.Status.AvailableReplicas == 1 && s.Status.ReadyReplicas == 1
}), wait.WithTimeout(time.Second*5)); err != nil {
klog.Errorf("statefulset %s not ready: %v, trying 2", name, err)
sfs = appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}}
if err := wait.For(conditions.New(client.Resources()).ResourceMatch(&sfs, func(object k8s.Object) bool {
klog.Infof("2: got obj: %v", object)
s, ok := object.(*appsv1.StatefulSet)
if !ok {
klog.Errorf("2: could not cast %v", object)
}
klog.Infof("2: Status: %v", s.Status)
return s.Status.AvailableReplicas == 1 && s.Status.ReadyReplicas == 1
}), wait.WithTimeout(time.Second*5)); err != nil {
klog.Errorf("statefulset %s not ready: %v, trying 3", name, err)
sfs = appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: name}}
if err := wait.For(conditions.New(client.Resources()).ResourceMatch(&sfs, func(object k8s.Object) bool {
klog.Infof("3: got obj: %v", object)
s, ok := object.(*appsv1.StatefulSet)
if !ok {
klog.Errorf("3: could not cast %v", object)
}
klog.Infof("3: Status: %v", s.Status)
return s.Status.AvailableReplicas == 1 && s.Status.ReadyReplicas == 1
}), wait.WithTimeout(time.Second*5)); err != nil {
return fmt.Errorf("3: statefulset %s not ready: %w", name, err)
}
}
}
return nil
}

func (ks *KafkaStatus) Ready() bool {
if ks == nil {
return false
func debugListResources(client klient.Client) {
var depl appsv1.DeploymentList
err := client.Resources(namespace).List(context.TODO(), &depl)
if err != nil {
klog.Errorf("Can't list deployments: %v", err)
return
}
klog.Infof("Kafka len of conditions: %d", len(ks.Conditions))
for _, cond := range ks.Conditions {
klog.WithFields(logrus.Fields{
"reason": cond.Reason,
"msg": cond.Message,
"type": cond.Type,
"status": cond.Status,
}).Info("Waiting for kafka to be up and running")
if cond.Type == conditionReady {
return cond.Status == metav1.ConditionTrue
}
deplInfo := []string{}
for _, p := range depl.Items {
deplInfo = append(deplInfo, fmt.Sprintf("%s (%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.Replicas))
}
klog.Infof("Deployments: " + strings.Join(deplInfo, ", "))
var sfs appsv1.StatefulSetList
err = client.Resources(namespace).List(context.TODO(), &sfs)
if err != nil {
klog.Errorf("Can't list stateful sets: %v", err)
return
}
sfsInfo := []string{}
for _, p := range sfs.Items {
sfsInfo = append(sfsInfo, fmt.Sprintf("%s (%d/%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.AvailableReplicas, p.Status.Replicas))
}
return false
klog.Infof("StatefulSets: " + strings.Join(sfsInfo, ", "))
}

0 comments on commit d760247

Please sign in to comment.