diff --git a/pkg/controllers/common/cond.go b/pkg/controllers/common/cond.go index a13f7a8f8d..e28f797864 100644 --- a/pkg/controllers/common/cond.go +++ b/pkg/controllers/common/cond.go @@ -14,7 +14,10 @@ package common -import "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +import ( + "github.com/pingcap/tidb-operator/pkg/runtime" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) func CondPDHasBeenDeleted(ctx PDState) task.Condition { return task.CondFunc(func() bool { @@ -40,26 +43,14 @@ func CondClusterIsPaused(ctx ClusterState) task.Condition { }) } -func CondPDGroupHasBeenDeleted(ctx PDGroupState) task.Condition { +func CondGroupIsDeleting[G runtime.Group](state GroupState[G]) task.Condition { return task.CondFunc(func() bool { - return ctx.PDGroup() == nil + return !state.Group().GetDeletionTimestamp().IsZero() }) } -func CondPDGroupIsDeleting(ctx PDGroupState) task.Condition { +func CondGroupHasBeenDeleted[RG runtime.GroupT[G], G runtime.GroupSet](state GroupState[RG]) task.Condition { return task.CondFunc(func() bool { - return !ctx.PDGroup().GetDeletionTimestamp().IsZero() - }) -} - -func CondTiKVGroupHasBeenDeleted(ctx TiKVGroupState) task.Condition { - return task.CondFunc(func() bool { - return ctx.TiKVGroup() == nil - }) -} - -func CondTiKVGroupIsDeleting(ctx TiKVGroupState) task.Condition { - return task.CondFunc(func() bool { - return !ctx.TiKVGroup().GetDeletionTimestamp().IsZero() + return state.Group() == nil }) } diff --git a/pkg/controllers/common/cond_test.go b/pkg/controllers/common/cond_test.go index 9561a4135d..5cf9be0e43 100644 --- a/pkg/controllers/common/cond_test.go +++ b/pkg/controllers/common/cond_test.go @@ -18,11 +18,106 @@ import ( "testing" "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/runtime" "github.com/pingcap/tidb-operator/pkg/utils/fake" ) +func TestCondGroupHasBeenDeleted(t *testing.T) { + t.Run("PDGroup", testCondGroupHasBeenDeleted[runtime.PDGroup]) + t.Run("TiDBGroup", testCondGroupHasBeenDeleted[runtime.TiDBGroup]) + t.Run("TiKVGroup", testCondGroupHasBeenDeleted[runtime.TiKVGroup]) + t.Run("TiFlashGroup", testCondGroupHasBeenDeleted[runtime.TiFlashGroup]) +} + +func testCondGroupHasBeenDeleted[ + G runtime.GroupSet, + RG runtime.GroupT[G], +](t *testing.T) { + cases := []struct { + desc string + state GroupState[RG] + expectedCond bool + }{ + { + desc: "cond is false", + state: FakeGroupState( + fake.Fake(func(obj RG) RG { + obj.SetName("test") + return obj + }), + ), + }, + { + desc: "cond is true", + state: FakeGroupState[RG](nil), + expectedCond: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + cond := CondGroupHasBeenDeleted(c.state) + assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc) + }) + } +} + +func TestCondGroupIsDeleting(t *testing.T) { + t.Run("PDGroup", testCondGroupIsDeleting[runtime.PDGroup]) + t.Run("TiDBGroup", testCondGroupIsDeleting[runtime.TiDBGroup]) + t.Run("TiKVGroup", testCondGroupIsDeleting[runtime.TiKVGroup]) + t.Run("TiFlashGroup", testCondGroupIsDeleting[runtime.TiFlashGroup]) +} + +func testCondGroupIsDeleting[ + G runtime.GroupSet, + RG runtime.GroupT[G], +](t *testing.T) { + cases := []struct { + desc string + state GroupState[RG] + expectedCond bool + }{ + { + desc: "cond is false", + state: FakeGroupState( + fake.Fake(func(obj RG) RG { + obj.SetName("test") + return obj + }), + ), + }, + { + desc: "cond is true", + state: FakeGroupState( + fake.Fake(func(obj RG) RG { + obj.SetName("test") + now := metav1.Now() + obj.SetDeletionTimestamp(&now) + return obj + }), + ), + expectedCond: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + cond := CondGroupIsDeleting(c.state) + assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc) + }) + } +} + func TestCondPDHasBeenDeleted(t *testing.T) { cases := []struct { desc string diff --git a/pkg/controllers/common/task.go b/pkg/controllers/common/task.go index 3260796a7d..71bb26691b 100644 --- a/pkg/controllers/common/task.go +++ b/pkg/controllers/common/task.go @@ -20,12 +20,15 @@ import ( "fmt" "slices" "strings" + "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kuberuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + utilerr "k8s.io/apimachinery/pkg/util/errors" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" @@ -121,6 +124,11 @@ func TaskContextTiKVGroup(state TiKVGroupStateInitializer, c client.Client) task return taskContextResource("TiKVGroup", w, c, false) } +func TaskContextTiDBGroup(state TiDBGroupStateInitializer, c client.Client) task.Task { + w := state.TiDBGroupInitializer() + return taskContextResource("TiDBGroup", w, c, false) +} + func TaskContextPDSlice(state PDSliceStateInitializer, c client.Client) task.Task { w := state.PDSliceInitializer() return taskContextResourceSlice[v1alpha1.PD, v1alpha1.PDList]("PDSlice", w, c) @@ -131,6 +139,11 @@ func TaskContextTiKVSlice(state TiKVSliceStateInitializer, c client.Client) task return taskContextResourceSlice[v1alpha1.TiKV, v1alpha1.TiKVList]("TiKVSlice", w, c) } +func TaskContextTiDBSlice(state TiDBSliceStateInitializer, c client.Client) task.Task { + w := state.TiDBSliceInitializer() + return taskContextResourceSlice[v1alpha1.TiDB, v1alpha1.TiDBList]("TiDBSlice", w, c) +} + func TaskSuspendPod(state PodState, c client.Client) task.Task { return task.NameTaskFunc("SuspendPod", func(ctx context.Context) task.Result { pod := state.Pod() @@ -165,6 +178,58 @@ func TaskGroupFinalizerAdd[ }) } +const defaultDelWaitTime = 10 * time.Second + +func TaskGroupFinalizerDel[ + GT runtime.GroupTuple[OG, RG], + IT runtime.InstanceTuple[OI, RI], + OG client.Object, + RG runtime.Group, + OI client.Object, + RI runtime.Instance, +](state GroupAndInstanceSliceState[RG, RI], c client.Client) task.Task { + var it IT + var gt GT + return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result { + var errList []error + var names []string + for _, peer := range state.Slice() { + names = append(names, peer.GetName()) + if peer.GetDeletionTimestamp().IsZero() { + if err := c.Delete(ctx, it.To(peer)); err != nil { + if errors.IsNotFound(err) { + continue + } + errList = append(errList, fmt.Errorf("try to delete the instance %v failed: %w", peer.GetName(), err)) + continue + } + } + } + + if len(errList) != 0 { + return task.Fail().With("failed to delete all instances: %v", utilerr.NewAggregate(errList)) + } + + if len(names) != 0 { + return task.Retry(defaultDelWaitTime).With("wait for all instances being removed, %v still exists", names) + } + + wait, err := k8s.DeleteGroupSubresource(ctx, c, state.Group(), &corev1.ServiceList{}) + if err != nil { + return task.Fail().With("cannot delete subresources: %w", err) + } + if wait { + return task.Wait().With("wait all subresources deleted") + } + + if err := k8s.RemoveFinalizer(ctx, c, gt.To(state.Group())); err != nil { + return task.Fail().With("failed to ensure finalizer has been removed: %w", err) + } + + return task.Complete().With("finalizer has been removed") + }) +} + func TaskGroupStatusSuspend[ GT runtime.GroupTuple[OG, RG], OG client.Object, diff --git a/pkg/controllers/common/task_test.go b/pkg/controllers/common/task_test.go index 13957eb5d5..a3544af1f8 100644 --- a/pkg/controllers/common/task_test.go +++ b/pkg/controllers/common/task_test.go @@ -519,6 +519,254 @@ func testTaskGroupFinalizerAdd[ } } +func TestTaskGroupFinalizerDel(t *testing.T) { + t.Run("TiKVGroup", testTaskGroupFinalizerDel[runtime.TiKVGroupTuple, runtime.TiKVTuple]) + t.Run("TiDBGroup", testTaskGroupFinalizerDel[runtime.TiDBGroupTuple, runtime.TiDBTuple]) +} + +func testTaskGroupFinalizerDel[ + GT runtime.GroupTuple[OG, RG], + IT runtime.InstanceTuple[OI, RI], + OG client.Object, + RG runtime.GroupT[G], + OI client.Object, + RI runtime.InstanceT[I], + G runtime.GroupSet, + I runtime.InstanceSet, +](t *testing.T) { + now := metav1.Now() + cases := []struct { + desc string + state GroupAndInstanceSliceState[RG, RI] + subresources []client.Object + unexpectedErr bool + + expectedStatus task.Status + expectedObj RG + }{ + { + desc: "no instances and no sub resources and no finalizer", + state: FakeGroupAndInstanceSliceState[RG, RI]( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + return obj + }), + ), + expectedStatus: task.SComplete, + expectedObj: fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + return obj + }), + }, + { + desc: "no instances and no sub resources", + state: FakeGroupAndInstanceSliceState[RG, RI]( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + expectedStatus: task.SComplete, + expectedObj: fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{}) + return obj + }), + }, + { + desc: "no instances and no sub resources but call api failed", + state: FakeGroupAndInstanceSliceState[RG, RI]( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "no instances but has sub resources", + state: FakeGroupAndInstanceSliceState[RG, RI]( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + subresources: []client.Object{ + fake.FakeObj("aaa", + fake.Label[corev1.Service](v1alpha1.LabelKeyManagedBy, v1alpha1.LabelValManagedByOperator), + fake.Label[corev1.Service](v1alpha1.LabelKeyCluster, ""), + fake.Label[corev1.Service](v1alpha1.LabelKeyComponent, runtime.Component[G, RG]()), + fake.Label[corev1.Service](v1alpha1.LabelKeyGroup, "aaa"), + ), + }, + + expectedStatus: task.SWait, + expectedObj: fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + }, + { + desc: "no instances but has sub resources and call api failed", + state: FakeGroupAndInstanceSliceState[RG, RI]( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + subresources: []client.Object{ + fake.FakeObj("aaa", + fake.Label[corev1.Service](v1alpha1.LabelKeyManagedBy, v1alpha1.LabelValManagedByOperator), + fake.Label[corev1.Service](v1alpha1.LabelKeyCluster, ""), + fake.Label[corev1.Service](v1alpha1.LabelKeyComponent, runtime.Component[G, RG]()), + fake.Label[corev1.Service](v1alpha1.LabelKeyGroup, "aaa"), + ), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "has instances with finalizer", + state: FakeGroupAndInstanceSliceState( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + fake.Fake(func(obj RI) RI { + obj.SetName("aaa") + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + expectedStatus: task.SRetry, + expectedObj: fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + }, + { + desc: "has instances with finalizer but call api failed", + state: FakeGroupAndInstanceSliceState( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + fake.Fake(func(obj RI) RI { + obj.SetName("aaa") + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "has deleting instances with finalizer but call api failed", + state: FakeGroupAndInstanceSliceState( + fake.Fake(func(obj RG) RG { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + fake.Fake(func(obj RI) RI { + obj.SetName("aaa") + obj.SetDeletionTimestamp(&now) + obj.SetFinalizers([]string{ + v1alpha1.Finalizer, + }) + return obj + }), + ), + unexpectedErr: true, + + expectedStatus: task.SRetry, + }, + } + + var gt GT + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + objs := []client.Object{ + gt.To(c.state.Group()), + } + + objs = append(objs, c.subresources...) + + fc := client.NewFakeClient(objs...) + if c.unexpectedErr { + // cannot remove finalizer + fc.WithError("update", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + // cannot delete sub resources + fc.WithError("delete", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + res, done := task.RunTask(ctx, TaskGroupFinalizerDel[GT, IT](c.state, fc)) + assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), c.desc) + assert.False(tt, done, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + obj := gt.To(new(G)) + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, obj), c.desc) + assert.Equal(tt, c.expectedObj, gt.From(obj), c.desc) + }) + } +} + func TestTaskGroupStatusSuspend(t *testing.T) { t.Run("PDGroup", testTaskGroupStatusSuspend[runtime.PDGroupTuple, runtime.PD]) t.Run("TiKVGroup", testTaskGroupStatusSuspend[runtime.TiKVGroupTuple, runtime.TiKV]) diff --git a/pkg/controllers/pdgroup/builder.go b/pkg/controllers/pdgroup/builder.go index 03582879f6..c383fd8352 100644 --- a/pkg/controllers/pdgroup/builder.go +++ b/pkg/controllers/pdgroup/builder.go @@ -26,7 +26,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get pdgroup common.TaskContextPDGroup(state, r.Client), // if it's gone just return - task.IfBreak(common.CondPDGroupHasBeenDeleted(state)), + task.IfBreak(common.CondGroupHasBeenDeleted(state)), // get cluster common.TaskContextCluster(state, r.Client), @@ -36,7 +36,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get all pds common.TaskContextPDSlice(state, r.Client), - task.IfBreak(common.CondPDGroupIsDeleting(state), + task.IfBreak(common.CondGroupIsDeleting(state), tasks.TaskFinalizerDel(state, r.Client, r.PDClientManager), ), common.TaskGroupFinalizerAdd[runtime.PDGroupTuple](state, r.Client), diff --git a/pkg/controllers/pdgroup/tasks/finalizer.go b/pkg/controllers/pdgroup/tasks/finalizer.go index c2ea612198..ab85d6ee39 100644 --- a/pkg/controllers/pdgroup/tasks/finalizer.go +++ b/pkg/controllers/pdgroup/tasks/finalizer.go @@ -22,7 +22,6 @@ import ( utilerr "k8s.io/apimachinery/pkg/util/errors" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" "github.com/pingcap/tidb-operator/pkg/runtime" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/k8s" @@ -46,6 +45,12 @@ func TaskFinalizerDel(state State, c client.Client, m pdm.PDClientManager) task. // PD controller cannot clean up finalizer after quorum is lost // Forcely clean up all finalizers of pd instances + // NOTE: + // Now we forcely clean up pd finalizers in pd group controller when the pd group is deleted, + // but forcely clean up tikv finalizers in tikv controller when the cluster is deleted. + // We can all forcely clean up finalizers when the cluster is deleted and change this task to + // the common one. + // TODO(liubo02): refactor to use common task if err := k8s.RemoveFinalizer(ctx, c, peer); err != nil { errList = append(errList, err) } @@ -76,12 +81,3 @@ func TaskFinalizerDel(state State, c client.Client, m pdm.PDClientManager) task. return task.Complete().With("finalizer has been removed") }) } - -func TaskFinalizerAdd(state common.PDGroupState, c client.Client) task.Task { - return task.NameTaskFunc("FinalizerAdd", func(ctx context.Context) task.Result { - if err := k8s.EnsureFinalizer(ctx, c, state.PDGroup()); err != nil { - return task.Fail().With("failed to ensure finalizer has been added: %v", err) - } - return task.Complete().With("finalizer is added") - }) -} diff --git a/pkg/controllers/pdgroup/tasks/status.go b/pkg/controllers/pdgroup/tasks/status.go index e2ffc65246..f4ad731ef4 100644 --- a/pkg/controllers/pdgroup/tasks/status.go +++ b/pkg/controllers/pdgroup/tasks/status.go @@ -30,11 +30,11 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { pdg := state.PDGroup() needUpdate := meta.SetStatusCondition(&pdg.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: pdg.Generation, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) replicas, readyReplicas, updateReplicas, currentReplicas := calcReplicas(state.PDSlice(), state.CurrentRevision, state.UpdateRevision) diff --git a/pkg/controllers/pdgroup/tasks/status_test.go b/pkg/controllers/pdgroup/tasks/status_test.go index b8e018f39d..674a73d4dc 100644 --- a/pkg/controllers/pdgroup/tasks/status_test.go +++ b/pkg/controllers/pdgroup/tasks/status_test.go @@ -55,11 +55,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 0 @@ -85,11 +85,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SWait, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 0 @@ -126,11 +126,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 @@ -167,11 +167,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 @@ -204,11 +204,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 @@ -247,11 +247,11 @@ func TestTaskStatus(t *testing.T) { State: &state{ pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.PDGroupSuspendReason, - Message: "pd group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 diff --git a/pkg/controllers/pdgroup/tasks/svc.go b/pkg/controllers/pdgroup/tasks/svc.go index 9b9313daab..2aa69ea3b6 100644 --- a/pkg/controllers/pdgroup/tasks/svc.go +++ b/pkg/controllers/pdgroup/tasks/svc.go @@ -95,7 +95,7 @@ func newInternalService(pdg *v1alpha1.PDGroup) *corev1.Service { ipFamilyPolicy := corev1.IPFamilyPolicyPreferDualStack return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-pd", pdg.Name), + Name: InternalServiceName(pdg.Name), Namespace: pdg.Namespace, Labels: map[string]string{ v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, diff --git a/pkg/controllers/pdgroup/tasks/util.go b/pkg/controllers/pdgroup/tasks/util.go index 4c08c1fe00..3965fb971d 100644 --- a/pkg/controllers/pdgroup/tasks/util.go +++ b/pkg/controllers/pdgroup/tasks/util.go @@ -27,6 +27,10 @@ func HeadlessServiceName(groupName string) string { return fmt.Sprintf("%s-pd-peer", groupName) } +func InternalServiceName(groupName string) string { + return fmt.Sprintf("%s-pd", groupName) +} + func NotLeaderPolicy() updater.PreferPolicy[*runtime.PD] { return updater.PreferPolicyFunc[*runtime.PD](func(pds []*runtime.PD) []*runtime.PD { var notLeader []*runtime.PD diff --git a/pkg/controllers/tidbgroup/builder.go b/pkg/controllers/tidbgroup/builder.go new file mode 100644 index 0000000000..c39bb84a69 --- /dev/null +++ b/pkg/controllers/tidbgroup/builder.go @@ -0,0 +1,54 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tidbgroup + +import ( + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/controllers/tidbgroup/tasks" + "github.com/pingcap/tidb-operator/pkg/runtime" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.TaskReporter) task.TaskRunner { + runner := task.NewTaskRunner(reporter, + // get tidbgroup + common.TaskContextTiDBGroup(state, r.Client), + // if it's gone just return + task.IfBreak(common.CondGroupHasBeenDeleted(state)), + + // get cluster + common.TaskContextCluster(state, r.Client), + // if it's paused just return + task.IfBreak(common.CondClusterIsPaused(state)), + + // get all tikvs + common.TaskContextTiDBSlice(state, r.Client), + + task.IfBreak(common.CondGroupIsDeleting(state), + common.TaskGroupFinalizerDel[runtime.TiDBGroupTuple, runtime.TiDBTuple](state, r.Client), + ), + common.TaskGroupFinalizerAdd[runtime.TiDBGroupTuple](state, r.Client), + + task.IfBreak( + common.CondClusterIsSuspending(state), + common.TaskGroupStatusSuspend[runtime.TiDBGroupTuple](state, r.Client), + ), + tasks.TaskService(state, r.Client), + tasks.TaskUpdater(state, r.Client), + tasks.TaskStatus(state, r.Client), + ) + + return runner +} diff --git a/pkg/controllers/tidbgroup/controller.go b/pkg/controllers/tidbgroup/controller.go index 16b52f2852..f695903907 100644 --- a/pkg/controllers/tidbgroup/controller.go +++ b/pkg/controllers/tidbgroup/controller.go @@ -35,7 +35,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/client" "github.com/pingcap/tidb-operator/pkg/controllers/tidbgroup/tasks" "github.com/pingcap/tidb-operator/pkg/utils/k8s" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) type Reconciler struct { @@ -60,7 +60,8 @@ func Setup(mgr manager.Manager, c client.Client) error { func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Object, reconcile.Request] { return handler.TypedFuncs[client.Object, reconcile.Request]{ UpdateFunc: func(ctx context.Context, event event.TypedUpdateEvent[client.Object], - queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { + queue workqueue.TypedRateLimitingInterface[reconcile.Request], + ) { cluster := event.ObjectNew.(*v1alpha1.Cluster) var list v1alpha1.TiDBGroupList @@ -98,19 +99,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu }() rtx := &tasks.ReconcileContext{ - // some fields will be set in the context task - Context: ctx, - Key: req.NamespacedName, + State: tasks.NewState(req.NamespacedName), } - runner := task.NewTaskRunner[tasks.ReconcileContext](reporter) - runner.AddTasks( - tasks.NewTaskContext(logger, r.Client), - tasks.NewTaskFinalizer(logger, r.Client), - tasks.NewTaskService(logger, r.Client), - tasks.NewTaskUpdater(logger, r.Client), - tasks.NewTaskStatus(logger, r.Client), - ) + runner := r.NewRunner(rtx, reporter) - return runner.Run(rtx) + return runner.Run(ctx) } diff --git a/pkg/controllers/tidbgroup/tasks/ctx.go b/pkg/controllers/tidbgroup/tasks/ctx.go index a6a8c086c2..d3eea8d278 100644 --- a/pkg/controllers/tidbgroup/tasks/ctx.go +++ b/pkg/controllers/tidbgroup/tasks/ctx.go @@ -14,117 +14,10 @@ package tasks -import ( - "cmp" - "context" - "slices" - - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/types" - - "github.com/pingcap/tidb-operator/apis/core/v1alpha1" - "github.com/pingcap/tidb-operator/pkg/action" - "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/tidbapi/v1" - "github.com/pingcap/tidb-operator/pkg/utils/task" -) - type ReconcileContext struct { - context.Context - - Key types.NamespacedName - - TiDBClient tidbapi.TiDBClient - - IsAvailable bool - Suspended bool - - TiDBGroup *v1alpha1.TiDBGroup - TiDBs []*v1alpha1.TiDB - Cluster *v1alpha1.Cluster - UpgradeChecker action.UpgradeChecker - - // Status fields - v1alpha1.CommonStatus -} - -func (ctx *ReconcileContext) Self() *ReconcileContext { - return ctx -} - -type TaskContext struct { - Logger logr.Logger - Client client.Client -} - -func NewTaskContext(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskContext{ - Logger: logger, - Client: c, - } -} - -func (*TaskContext) Name() string { - return "Context" -} - -func (t *TaskContext) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - var tidbg v1alpha1.TiDBGroup - if err := t.Client.Get(ctx, rtx.Key, &tidbg); err != nil { - if !errors.IsNotFound(err) { - return task.Fail().With("can't get tidb group: %w", err) - } - - return task.Complete().Break().With("tidb group has been deleted") - } - rtx.TiDBGroup = &tidbg - - var cluster v1alpha1.Cluster - if err := t.Client.Get(ctx, client.ObjectKey{ - Name: tidbg.Spec.Cluster.Name, - Namespace: tidbg.Namespace, - }, &cluster); err != nil { - return task.Fail().With("cannot find cluster %s: %w", tidbg.Spec.Cluster.Name, err) - } - rtx.Cluster = &cluster - - if cluster.ShouldPauseReconcile() { - return task.Complete().Break().With("cluster reconciliation is paused") - } - - var tidbList v1alpha1.TiDBList - if err := t.Client.List(ctx, &tidbList, client.InNamespace(tidbg.Namespace), client.MatchingLabels{ - v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, - v1alpha1.LabelKeyCluster: cluster.Name, - v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiDB, - v1alpha1.LabelKeyGroup: tidbg.Name, - }); err != nil { - return task.Fail().With("cannot list tidb instances: %w", err) - } - - rtx.TiDBs = make([]*v1alpha1.TiDB, len(tidbList.Items)) - rtx.Suspended = len(tidbList.Items) > 0 - for i := range tidbList.Items { - rtx.TiDBs[i] = &tidbList.Items[i] - if meta.IsStatusConditionTrue(tidbList.Items[i].Status.Conditions, v1alpha1.TiDBCondHealth) { - // TiDB Group is available if any of its members is available - rtx.IsAvailable = true - } - if !meta.IsStatusConditionTrue(tidbList.Items[i].Status.Conditions, v1alpha1.TiDBCondSuspended) { - // TiDB Group is not suspended if any of its members is not suspended - rtx.Suspended = false - } - } - - slices.SortFunc(rtx.TiDBs, func(a, b *v1alpha1.TiDB) int { - return cmp.Compare(a.Name, b.Name) - }) - - rtx.UpgradeChecker = action.NewUpgradeChecker(t.Client, rtx.Cluster, t.Logger) + State - return task.Complete().With("new context completed") + UpdateRevision string + CurrentRevision string + CollisionCount int32 } diff --git a/pkg/controllers/tidbgroup/tasks/finalizer.go b/pkg/controllers/tidbgroup/tasks/finalizer.go deleted file mode 100644 index dc103a65c6..0000000000 --- a/pkg/controllers/tidbgroup/tasks/finalizer.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tasks - -import ( - "fmt" - - "github.com/go-logr/logr" - utilerr "k8s.io/apimachinery/pkg/util/errors" - - "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/k8s" - "github.com/pingcap/tidb-operator/pkg/utils/task" -) - -type TaskFinalizer struct { - Client client.Client - Logger logr.Logger -} - -func NewTaskFinalizer(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskFinalizer{ - Client: c, - Logger: logger, - } -} - -func (*TaskFinalizer) Name() string { - return "Finalizer" -} - -func (t *TaskFinalizer) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - if !rtx.TiDBGroup.GetDeletionTimestamp().IsZero() { - errList := []error{} - names := []string{} - for _, tidb := range rtx.TiDBs { - names = append(names, tidb.Name) - if tidb.GetDeletionTimestamp().IsZero() { - if err := t.Client.Delete(ctx, tidb); err != nil { - errList = append(errList, fmt.Errorf("try to delete the tidb instance %v failed: %w", tidb.Name, err)) - } - } - } - - if len(errList) != 0 { - return task.Fail().With("failed to delete all tidb instances: %v", utilerr.NewAggregate(errList)) - } - - if len(rtx.TiDBs) != 0 { - return task.Fail().With("wait for all tidb instances being removed, %v still exists", names) - } - - if err := k8s.EnsureGroupSubResourceDeleted(ctx, t.Client, - rtx.TiDBGroup.Namespace, rtx.TiDBGroup.Name); err != nil { - return task.Fail().With("cannot delete subresources: %w", err) - } - if err := k8s.RemoveFinalizer(ctx, t.Client, rtx.TiDBGroup); err != nil { - return task.Fail().With("failed to ensure finalizer has been removed: %w", err) - } - } else { - if err := k8s.EnsureFinalizer(ctx, t.Client, rtx.TiDBGroup); err != nil { - return task.Fail().With("failed to ensure finalizer has been added: %w", err) - } - } - - return task.Complete().With("finalizer is synced") -} diff --git a/pkg/controllers/tidbgroup/tasks/state.go b/pkg/controllers/tidbgroup/tasks/state.go new file mode 100644 index 0000000000..2d9bb9dc2f --- /dev/null +++ b/pkg/controllers/tidbgroup/tasks/state.go @@ -0,0 +1,101 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/runtime" +) + +type state struct { + key types.NamespacedName + + cluster *v1alpha1.Cluster + dbg *v1alpha1.TiDBGroup + dbs []*v1alpha1.TiDB +} + +type State interface { + common.TiDBGroupStateInitializer + common.ClusterStateInitializer + common.TiDBSliceStateInitializer + + common.TiDBGroupState + common.ClusterState + common.TiDBSliceState + + common.GroupState[*runtime.TiDBGroup] + common.InstanceSliceState[*runtime.TiDB] +} + +func NewState(key types.NamespacedName) State { + s := &state{ + key: key, + } + return s +} + +func (s *state) TiDBGroup() *v1alpha1.TiDBGroup { + return s.dbg +} + +func (s *state) Group() *runtime.TiDBGroup { + return runtime.FromTiDBGroup(s.dbg) +} + +func (s *state) Cluster() *v1alpha1.Cluster { + return s.cluster +} + +func (s *state) TiDBSlice() []*v1alpha1.TiDB { + return s.dbs +} + +func (s *state) Slice() []*runtime.TiDB { + return runtime.FromTiDBSlice(s.dbs) +} + +func (s *state) TiDBGroupInitializer() common.TiDBGroupInitializer { + return common.NewResource(func(dbg *v1alpha1.TiDBGroup) { s.dbg = dbg }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.Name(s.key.Name)). + Initializer() +} + +func (s *state) ClusterInitializer() common.ClusterInitializer { + return common.NewResource(func(cluster *v1alpha1.Cluster) { s.cluster = cluster }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.NameFunc(func() string { + return s.dbg.Spec.Cluster.Name + })). + Initializer() +} + +func (s *state) TiDBSliceInitializer() common.TiDBSliceInitializer { + return common.NewResourceSlice(func(dbs []*v1alpha1.TiDB) { s.dbs = dbs }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithLabels(common.LabelsFunc(func() map[string]string { + return map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiDB, + v1alpha1.LabelKeyCluster: s.cluster.Name, + v1alpha1.LabelKeyGroup: s.dbg.Name, + } + })). + Initializer() +} diff --git a/pkg/controllers/tidbgroup/tasks/state_test.go b/pkg/controllers/tidbgroup/tasks/state_test.go new file mode 100644 index 0000000000..0341f8a8aa --- /dev/null +++ b/pkg/controllers/tidbgroup/tasks/state_test.go @@ -0,0 +1,105 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestState(t *testing.T) { + cases := []struct { + desc string + key types.NamespacedName + objs []client.Object + + expected State + }{ + { + desc: "normal", + key: types.NamespacedName{ + Name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Cluster.Name = "aaa" + return obj + }), + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Labels = map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiDB, + v1alpha1.LabelKeyCluster: "aaa", + v1alpha1.LabelKeyGroup: "aaa", + } + return obj + }), + }, + + expected: &state{ + key: types.NamespacedName{ + Name: "aaa", + }, + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Cluster.Name = "aaa" + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + dbs: []*v1alpha1.TiDB{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Labels = map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiDB, + v1alpha1.LabelKeyCluster: "aaa", + v1alpha1.LabelKeyGroup: "aaa", + } + return obj + }), + }, + }, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + s := NewState(c.key) + + ctx := context.Background() + res, done := task.RunTask(ctx, task.Block( + common.TaskContextTiDBGroup(s, fc), + common.TaskContextCluster(s, fc), + common.TaskContextTiDBSlice(s, fc), + )) + assert.Equal(tt, task.SComplete, res.Status(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.expected, s, c.desc) + }) + } +} diff --git a/pkg/controllers/tidbgroup/tasks/status.go b/pkg/controllers/tidbgroup/tasks/status.go index 2a2c1b4bb1..7b93e5d30e 100644 --- a/pkg/controllers/tidbgroup/tasks/status.go +++ b/pkg/controllers/tidbgroup/tasks/status.go @@ -15,97 +15,98 @@ package tasks import ( - "github.com/go-logr/logr" + "context" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type TaskStatus struct { - Client client.Client - Logger logr.Logger -} +// TODO(liubo02): extract to common task +func TaskStatus(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("Status", func(ctx context.Context) task.Result { + dbg := state.TiDBGroup() -func NewTaskStatus(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskStatus{ - Client: c, - Logger: logger, - } -} + needUpdate := meta.SetStatusCondition(&dbg.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: dbg.Generation, + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", + }) -func (*TaskStatus) Name() string { - return "Status" -} + replicas, readyReplicas, updateReplicas, currentReplicas := calcReplicas(state.TiDBSlice(), state.CurrentRevision, state.UpdateRevision) -//nolint:gocyclo // refactor if possible -func (t *TaskStatus) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() + // all instances are updated + if updateReplicas == replicas { + // update current revision + state.CurrentRevision = state.UpdateRevision + // update status of pdg version + // TODO(liubo02): version of a group is hard to understand + // We need to change it to a more meaningful field + needUpdate = SetIfChanged(&dbg.Status.Version, dbg.Spec.Version) || needUpdate + } - availStatus := metav1.ConditionFalse - availMessage := "tidb group is not available" - if rtx.IsAvailable { - availStatus = metav1.ConditionTrue - availMessage = "tidb group is available" - } - conditionChanged := meta.SetStatusCondition(&rtx.TiDBGroup.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiDBGroupCondAvailable, - Status: availStatus, - ObservedGeneration: rtx.TiDBGroup.Generation, - Reason: v1alpha1.TiDBGroupAvailableReason, - Message: availMessage, + needUpdate = SetIfChanged(&dbg.Status.ObservedGeneration, dbg.Generation) || needUpdate + needUpdate = SetIfChanged(&dbg.Status.Replicas, replicas) || needUpdate + needUpdate = SetIfChanged(&dbg.Status.ReadyReplicas, readyReplicas) || needUpdate + needUpdate = SetIfChanged(&dbg.Status.UpdatedReplicas, updateReplicas) || needUpdate + needUpdate = SetIfChanged(&dbg.Status.CurrentReplicas, currentReplicas) || needUpdate + needUpdate = SetIfChanged(&dbg.Status.UpdateRevision, state.UpdateRevision) || needUpdate + needUpdate = SetIfChanged(&dbg.Status.CurrentRevision, state.CurrentRevision) || needUpdate + needUpdate = NewAndSetIfChanged(&dbg.Status.CollisionCount, state.CollisionCount) || needUpdate + + if needUpdate { + if err := c.Status().Update(ctx, dbg); err != nil { + return task.Fail().With("cannot update status: %w", err) + } + } + + return task.Complete().With("status is synced") }) +} - suspendStatus := metav1.ConditionFalse - suspendMessage := "tidb group is not suspended" - if rtx.Suspended { - suspendStatus = metav1.ConditionTrue - suspendMessage = "tidb group is suspended" - } else if rtx.Cluster.ShouldSuspendCompute() { - suspendMessage = "tidb group is suspending" - } - conditionChanged = meta.SetStatusCondition(&rtx.TiDBGroup.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiDBGroupCondSuspended, - Status: suspendStatus, - ObservedGeneration: rtx.TiDBGroup.Generation, - Reason: v1alpha1.TiDBGroupSuspendReason, - Message: suspendMessage, - }) || conditionChanged - - // Update the current revision if all instances are synced. - if int(rtx.TiDBGroup.GetDesiredReplicas()) == len(rtx.TiDBs) && v1alpha1.AllInstancesSynced(rtx.TiDBs, rtx.UpdateRevision) { - conditionChanged = true - rtx.CurrentRevision = rtx.UpdateRevision - rtx.TiDBGroup.Status.Version = rtx.TiDBGroup.Spec.Version - } - var readyReplicas int32 - for _, tidb := range rtx.TiDBs { - if tidb.IsHealthy() { +func calcReplicas(dbs []*v1alpha1.TiDB, currentRevision, updateRevision string) ( + replicas, + readyReplicas, + updateReplicas, + currentReplicas int32, +) { + for _, peer := range dbs { + replicas++ + if peer.IsHealthy() { readyReplicas++ } + if peer.CurrentRevision() == currentRevision { + currentReplicas++ + } + if peer.CurrentRevision() == updateRevision { + updateReplicas++ + } } - if conditionChanged || rtx.TiDBGroup.Status.ReadyReplicas != readyReplicas || - rtx.TiDBGroup.Status.Replicas != int32(len(rtx.TiDBs)) || //nolint:gosec // expected type conversion - !v1alpha1.IsReconciled(rtx.TiDBGroup) || - v1alpha1.StatusChanged(rtx.TiDBGroup, rtx.CommonStatus) { - rtx.TiDBGroup.Status.ReadyReplicas = readyReplicas - rtx.TiDBGroup.Status.Replicas = int32(len(rtx.TiDBs)) //nolint:gosec// expected type conversion - rtx.TiDBGroup.Status.ObservedGeneration = rtx.TiDBGroup.Generation - rtx.TiDBGroup.Status.CurrentRevision = rtx.CurrentRevision - rtx.TiDBGroup.Status.UpdateRevision = rtx.UpdateRevision - rtx.TiDBGroup.Status.CollisionCount = rtx.CollisionCount - - if err := t.Client.Status().Update(ctx, rtx.TiDBGroup); err != nil { - return task.Fail().With("cannot update status: %w", err) + return +} + +func NewAndSetIfChanged[T comparable](dst **T, src T) bool { + if *dst == nil { + zero := new(T) + if *zero == src { + return false } + *dst = zero } + return SetIfChanged(*dst, src) +} - if !rtx.IsAvailable && !rtx.Suspended { - return task.Fail().With("tidb group may not be available, requeue to retry") +func SetIfChanged[T comparable](dst *T, src T) bool { + if *dst != src { + *dst = src + return true } - return task.Complete().With("status is synced") + return false } diff --git a/pkg/controllers/tidbgroup/tasks/status_test.go b/pkg/controllers/tidbgroup/tasks/status_test.go new file mode 100644 index 0000000000..f152fc65f0 --- /dev/null +++ b/pkg/controllers/tidbgroup/tasks/status_test.go @@ -0,0 +1,276 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestTaskStatus(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + unexpectedErr bool + + expectedStatus task.Status + expectedObj *v1alpha1.TiDBGroup + }{ + { + desc: "no tidbs", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3)), + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + CollisionCount: 3, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3), func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 0 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 0 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = ptr.To[int32](3) + return obj + }), + }, + { + desc: "all tidbs are outdated and healthy", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3)), + dbs: []*v1alpha1.TiDB{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Status.CurrentRevision = oldRevision + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiDBCondHealth, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3), func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 1 + obj.Status.UpdatedReplicas = 0 + obj.Status.CurrentReplicas = 1 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = oldRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "all tidbs are updated and healthy", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3)), + dbs: []*v1alpha1.TiDB{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Status.CurrentRevision = newRevision + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiDBCondHealth, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3), func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 1 + obj.Status.UpdatedReplicas = 1 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "all tidbs are updated but not healthy", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3)), + dbs: []*v1alpha1.TiDB{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Status.CurrentRevision = newRevision + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3), func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 1 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "status changed but cannot call api", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3)), + dbs: []*v1alpha1.TiDB{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Status.CurrentRevision = newRevision + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "status is not changed and cannot call api", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiDBGroup](3), func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 1 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + dbs: []*v1alpha1.TiDB{ + fake.FakeObj("aaa", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Status.CurrentRevision = newRevision + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.state.TiDBGroup()) + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + ctx := context.Background() + res, done := task.RunTask(ctx, TaskStatus(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + dbg := &v1alpha1.TiDBGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, dbg), c.desc) + for i := range dbg.Status.Conditions { + cond := &dbg.Status.Conditions[i] + cond.LastTransitionTime = metav1.Time{} + } + assert.Equal(tt, c.expectedObj, dbg, c.desc) + }) + } +} diff --git a/pkg/controllers/tidbgroup/tasks/svc.go b/pkg/controllers/tidbgroup/tasks/svc.go index e79a44b37e..f051c6d9da 100644 --- a/pkg/controllers/tidbgroup/tasks/svc.go +++ b/pkg/controllers/tidbgroup/tasks/svc.go @@ -15,54 +15,35 @@ package tasks import ( + "context" "fmt" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type TaskService struct { - Logger logr.Logger - Client client.Client -} +func TaskService(state common.TiDBGroupState, c client.Client) task.Task { + return task.NameTaskFunc("Service", func(ctx context.Context) task.Result { + dbg := state.TiDBGroup() -func NewTaskService(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskService{ - Logger: logger, - Client: c, - } -} + headless := newHeadlessService(dbg) + if err := c.Apply(ctx, headless); err != nil { + return task.Fail().With(fmt.Sprintf("can't create headless service: %v", err)) + } -func (*TaskService) Name() string { - return "Service" -} - -func (t *TaskService) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - if rtx.Cluster.ShouldSuspendCompute() { - return task.Complete().With("skip service for suspension") - } + svc := newInternalService(dbg) + if err := c.Apply(ctx, svc); err != nil { + return task.Fail().With(fmt.Sprintf("can't create internal service: %v", err)) + } - tidbg := rtx.TiDBGroup - - svcHeadless := newHeadlessService(tidbg) - if err := t.Client.Apply(ctx, svcHeadless); err != nil { - return task.Fail().With(fmt.Sprintf("can't create headless service of tidb: %v", err)) - } - - svc := newService(tidbg) - if err := t.Client.Apply(ctx, svc); err != nil { - return task.Fail().With(fmt.Sprintf("can't create service of tidb: %v", err)) - } - - return task.Complete().With("service of tidb has been applied") + return task.Complete().With("services have been applied") + }) } func newHeadlessService(tidbg *v1alpha1.TiDBGroup) *corev1.Service { @@ -103,16 +84,12 @@ func newHeadlessService(tidbg *v1alpha1.TiDBGroup) *corev1.Service { } } -func newService(tidbg *v1alpha1.TiDBGroup) *corev1.Service { - svcType := corev1.ServiceTypeClusterIP - if tidbg.Spec.Service != nil && tidbg.Spec.Service.Type != "" { - svcType = tidbg.Spec.Service.Type - } +func newInternalService(tidbg *v1alpha1.TiDBGroup) *corev1.Service { ipFamilyPolicy := corev1.IPFamilyPolicyPreferDualStack return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: tidbg.Name + "-tidb", + Name: InternalServiceName(tidbg.Name), Namespace: tidbg.Namespace, Labels: map[string]string{ v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, @@ -145,7 +122,7 @@ func newService(tidbg *v1alpha1.TiDBGroup) *corev1.Service { TargetPort: intstr.FromString(v1alpha1.TiDBPortNameStatus), }, }, - Type: svcType, + Type: corev1.ServiceTypeClusterIP, IPFamilyPolicy: &ipFamilyPolicy, }, } diff --git a/pkg/controllers/tidbgroup/tasks/svc_test.go b/pkg/controllers/tidbgroup/tasks/svc_test.go new file mode 100644 index 0000000000..55648f3300 --- /dev/null +++ b/pkg/controllers/tidbgroup/tasks/svc_test.go @@ -0,0 +1,135 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestTaskService(t *testing.T) { + cases := []struct { + desc string + state State + objs []client.Object + unexpectedErr bool + + expectedStatus task.Status + }{ + { + desc: "no svc exists", + state: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + }, + + expectedStatus: task.SComplete, + }, + { + desc: "headless svc has exists", + state: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + }, + objs: []client.Object{ + fake.FakeObj[corev1.Service]("aaa-tidb-peer"), + }, + + expectedStatus: task.SComplete, + }, + { + desc: "internal svc has exists", + state: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + }, + objs: []client.Object{ + fake.FakeObj[corev1.Service]("aaa-tidb"), + }, + + expectedStatus: task.SComplete, + }, + { + desc: "apply headless svc with unexpected err", + state: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "apply internal svc with unexpected err", + state: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + }, + objs: []client.Object{ + newHeadlessService(fake.FakeObj[v1alpha1.TiDBGroup]("aaa")), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "all svcs are updated with unexpected err", + state: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + }, + objs: []client.Object{ + newHeadlessService(fake.FakeObj[v1alpha1.TiDBGroup]("aaa")), + newInternalService(fake.FakeObj[v1alpha1.TiDBGroup]("aaa")), + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + ctx := context.Background() + fc := client.NewFakeClient(c.state.TiDBGroup()) + for _, obj := range c.objs { + require.NoError(tt, fc.Apply(ctx, obj), c.desc) + } + + if c.unexpectedErr { + // cannot update svc + fc.WithError("patch", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + res, done := task.RunTask(ctx, TaskService(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + if !c.unexpectedErr { + svcs := corev1.ServiceList{} + require.NoError(tt, fc.List(ctx, &svcs), c.desc) + assert.Len(tt, svcs.Items, 2, c.desc) + } + }) + } +} diff --git a/pkg/controllers/tidbgroup/tasks/updater.go b/pkg/controllers/tidbgroup/tasks/updater.go index f566d0f1f0..1381c9eab0 100644 --- a/pkg/controllers/tidbgroup/tasks/updater.go +++ b/pkg/controllers/tidbgroup/tasks/updater.go @@ -15,12 +15,16 @@ package tasks import ( + "context" "fmt" + "time" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/action" "github.com/pingcap/tidb-operator/pkg/client" "github.com/pingcap/tidb-operator/pkg/runtime" "github.com/pingcap/tidb-operator/pkg/updater" @@ -28,125 +32,107 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/k8s/revision" maputil "github.com/pingcap/tidb-operator/pkg/utils/map" "github.com/pingcap/tidb-operator/pkg/utils/random" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/third_party/kubernetes/pkg/controller/history" ) -// TaskUpdater is a task for updating TiDBGroup when its spec is changed. -type TaskUpdater struct { - Logger logr.Logger - Client client.Client - CRCli history.Interface -} +const ( + defaultUpdateWaitTime = time.Second * 30 +) -func NewTaskUpdater(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskUpdater{ - Logger: logger, - Client: c, - CRCli: history.NewClient(c), - } -} +// TaskUpdater is a task to scale or update PD when spec of TiDBGroup is changed. +func TaskUpdater(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("Updater", func(ctx context.Context) task.Result { + logger := logr.FromContextOrDiscard(ctx) + historyCli := history.NewClient(c) + dbg := state.TiDBGroup() + + selector := labels.SelectorFromSet(labels.Set{ + // TODO(liubo02): add label of managed by operator ? + v1alpha1.LabelKeyCluster: dbg.Spec.Cluster.Name, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, + v1alpha1.LabelKeyGroup: dbg.Name, + }) + + revisions, err := historyCli.ListControllerRevisions(dbg, selector) + if err != nil { + return task.Fail().With("cannot list controller revisions: %w", err) + } + history.SortControllerRevisions(revisions) -func (*TaskUpdater) Name() string { - return "Updater" -} + // Get the current(old) and update(new) ControllerRevisions. + currentRevision, updateRevision, collisionCount, err := revision.GetCurrentAndUpdate(dbg, revisions, historyCli, dbg) + if err != nil { + return task.Fail().With("cannot get revisions: %w", err) + } + state.CurrentRevision = currentRevision.Name + state.UpdateRevision = updateRevision.Name + state.CollisionCount = collisionCount + + // TODO(liubo02): add a controller to do it + if err = revision.TruncateHistory(historyCli, state.TiDBSlice(), revisions, + currentRevision, updateRevision, state.Cluster().Spec.RevisionHistoryLimit); err != nil { + logger.Error(err, "failed to truncate history") + } -func (t *TaskUpdater) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - // TODO: move to task v2 - if !rtx.TiDBGroup.GetDeletionTimestamp().IsZero() { - return task.Complete().With("tidb group has been deleted") - } - - if rtx.Cluster.ShouldSuspendCompute() { - return task.Complete().With("skip updating TiDBGroup for suspension") - } - - // List all controller revisions for the TiDBGroup - selector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: map[string]string{ - v1alpha1.LabelKeyCluster: rtx.Cluster.Name, - v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiDB, - v1alpha1.LabelKeyGroup: rtx.TiDBGroup.Name, - }, - }) - revisions, err := t.CRCli.ListControllerRevisions(rtx.TiDBGroup, selector) - if err != nil { - return task.Fail().With("cannot list controller revisions: %w", err) - } - history.SortControllerRevisions(revisions) - - // Get the current(old) and update(new) ControllerRevisions for TiDBGroup - currentRevision, updateRevision, collisionCount, err := revision.GetCurrentAndUpdate(rtx.TiDBGroup, revisions, t.CRCli, rtx.TiDBGroup) - if err != nil { - return task.Fail().With("cannot get revisions: %w", err) - } - rtx.CurrentRevision = currentRevision.Name - rtx.UpdateRevision = updateRevision.Name - rtx.CollisionCount = &collisionCount - - if err = revision.TruncateHistory(t.CRCli, rtx.TiDBs, revisions, - currentRevision, updateRevision, rtx.Cluster.Spec.RevisionHistoryLimit); err != nil { - t.Logger.Error(err, "failed to truncate history") - } - - if needVersionUpgrade(rtx.TiDBGroup) && !rtx.UpgradeChecker.CanUpgrade(ctx, rtx.TiDBGroup) { - return task.Fail().Continue().With( - "preconditions of upgrading the tidb group %s/%s are not met", - rtx.TiDBGroup.Namespace, rtx.TiDBGroup.Name) - } - - desired := 1 - if rtx.TiDBGroup.Spec.Replicas != nil { - desired = int(*rtx.TiDBGroup.Spec.Replicas) - } - - var topos []v1alpha1.ScheduleTopology - for _, p := range rtx.TiDBGroup.Spec.SchedulePolicies { - switch p.Type { - case v1alpha1.SchedulePolicyTypeEvenlySpread: - topos = p.EvenlySpread.Topologies - default: - // do nothing + checker := action.NewUpgradeChecker(c, state.Cluster(), logger) + + if needVersionUpgrade(dbg) && !checker.CanUpgrade(ctx, dbg) { + // TODO(liubo02): change to Wait + return task.Retry(defaultUpdateWaitTime).With("wait until preconditions of upgrading is met") + } + + desired := 1 + if dbg.Spec.Replicas != nil { + desired = int(*dbg.Spec.Replicas) + } + + var topos []v1alpha1.ScheduleTopology + for _, p := range dbg.Spec.SchedulePolicies { + switch p.Type { + case v1alpha1.SchedulePolicyTypeEvenlySpread: + topos = p.EvenlySpread.Topologies + default: + // do nothing + } } - } - - topoPolicy, err := policy.NewTopologyPolicy[*runtime.TiDB](topos) - if err != nil { - return task.Fail().With("invalid topo policy, it should be validated: %w", err) - } - - for _, tidb := range rtx.TiDBs { - topoPolicy.Add(runtime.FromTiDB(tidb)) - } - - wait, err := updater.New[runtime.TiDBTuple](). - WithInstances(runtime.FromTiDBSlice(rtx.TiDBs)...). - WithDesired(desired). - WithClient(t.Client). - WithMaxSurge(0). - WithMaxUnavailable(1). - WithRevision(rtx.UpdateRevision). - WithNewFactory(TiDBNewer(rtx.TiDBGroup, rtx.UpdateRevision)). - WithAddHooks(topoPolicy). - WithUpdateHooks( - policy.KeepName[*runtime.TiDB](), - policy.KeepTopology[*runtime.TiDB](), - ). - WithDelHooks(topoPolicy). - WithScaleInPreferPolicy( - topoPolicy, - ). - Build(). - Do(ctx) - if err != nil { - return task.Fail().With("cannot update instances: %w", err) - } - if wait { - return task.Complete().With("wait for all instances ready") - } - return task.Complete().With("all instances are synced") + + topoPolicy, err := policy.NewTopologyPolicy[*runtime.TiDB](topos) + if err != nil { + return task.Fail().With("invalid topo policy, it should be validated: %w", err) + } + + for _, tidb := range state.TiDBSlice() { + topoPolicy.Add(runtime.FromTiDB(tidb)) + } + + wait, err := updater.New[runtime.TiDBTuple](). + WithInstances(runtime.FromTiDBSlice(state.TiDBSlice())...). + WithDesired(desired). + WithClient(c). + WithMaxSurge(0). + WithMaxUnavailable(1). + WithRevision(state.UpdateRevision). + WithNewFactory(TiDBNewer(dbg, state.UpdateRevision)). + WithAddHooks(topoPolicy). + WithUpdateHooks( + policy.KeepName[*runtime.TiDB](), + policy.KeepTopology[*runtime.TiDB](), + ). + WithDelHooks(topoPolicy). + WithScaleInPreferPolicy( + topoPolicy, + ). + Build(). + Do(ctx) + if err != nil { + return task.Fail().With("cannot update instances: %w", err) + } + if wait { + return task.Wait().With("wait for all instances ready") + } + return task.Complete().With("all instances are synced") + }) } func needVersionUpgrade(dbg *v1alpha1.TiDBGroup) bool { diff --git a/pkg/controllers/tidbgroup/tasks/updater_test.go b/pkg/controllers/tidbgroup/tasks/updater_test.go index 12eca42fc6..8bffde1f7a 100644 --- a/pkg/controllers/tidbgroup/tasks/updater_test.go +++ b/pkg/controllers/tidbgroup/tasks/updater_test.go @@ -16,80 +16,290 @@ package tasks import ( "context" + "fmt" "testing" - "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/runtime" "github.com/pingcap/tidb-operator/pkg/utils/fake" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -func FakeContext(changes ...fake.ChangeFunc[ReconcileContext, *ReconcileContext]) *ReconcileContext { - ctx := fake.Fake(changes...) - ctx.Context = context.TODO() - return ctx -} - -func WithCluster(cluster *v1alpha1.Cluster) fake.ChangeFunc[ReconcileContext, *ReconcileContext] { - return func(obj *ReconcileContext) *ReconcileContext { - obj.Cluster = cluster - return obj - } -} +const ( + // TODO(liubo02): fake history client to avoid real revision calc + oldRevision = "aaa-7fff59c8" + newRevision = "aaa-69cf8bf4d9" +) -func WithTiDBGroup(dbg *v1alpha1.TiDBGroup) fake.ChangeFunc[ReconcileContext, *ReconcileContext] { - return func(obj *ReconcileContext) *ReconcileContext { - obj.TiDBGroup = dbg - return obj - } -} +func TestTaskUpdater(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + unexpectedErr bool -func TestUpdater(t *testing.T) { - tests := []struct { - name string - ctx *ReconcileContext - objs []client.Object - expected task.Result - expectFunc func(t *testing.T, ctx *ReconcileContext, cli client.Client) + expectedStatus task.Status + expectedUpdateRevision string + expectedCurrentRevision string + expectedTiDBNum int }{ { - name: "first time to sync", - ctx: FakeContext( - WithCluster(fake.FakeObj[v1alpha1.Cluster]("test")), - WithTiDBGroup(fake.FakeObj("test-tidbgroup", - func(dbg *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { - dbg.Spec.Cluster = v1alpha1.ClusterReference{Name: "test"} - dbg.Spec.Replicas = ptr.To(int32(1)) - return dbg + desc: "no dbs with 1 replicas", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: oldRevision, + expectedCurrentRevision: oldRevision, + expectedTiDBNum: 1, + }, + { + desc: "version upgrade check", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + // use an wrong version to trigger version check + // TODO(liubo02): it's not happened actually. Maybe remove whole checking + obj.Spec.Version = "xxx" + obj.Status.Version = "yyy" + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SRetry, + expectedUpdateRevision: "aaa-7847fff478", + expectedCurrentRevision: "aaa-7847fff478", + }, + { + desc: "1 updated tidb with 1 replicas", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: []*v1alpha1.TiDB{ + fakeAvailableTiDB("aaa-xxx", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), oldRevision), }, - )), - ), - expected: task.Complete().With(""), - expectFunc: func(t *testing.T, _ *ReconcileContext, cli client.Client) { - var crList appsv1.ControllerRevisionList - require.NoError(t, cli.List(context.TODO(), &crList)) - assert.Len(t, crList.Items, 1) + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: oldRevision, + expectedCurrentRevision: oldRevision, + expectedTiDBNum: 1, + }, + { + desc: "no dbs with 2 replicas", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedTiDBNum: 2, + }, + { + desc: "no dbs with 2 replicas and call api failed", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + }, + { + desc: "1 outdated tidb with 2 replicas", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: []*v1alpha1.TiDB{ + fakeAvailableTiDB("aaa-xxx", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), oldRevision), + }, + }, + }, + + expectedStatus: task.SWait, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedTiDBNum: 2, + }, + { + desc: "1 outdated tidb with 2 replicas but cannot call api, will fail", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: []*v1alpha1.TiDB{ + fakeAvailableTiDB("aaa-xxx", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), oldRevision), + }, + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + }, + { + desc: "2 updated tidb with 2 replicas", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: []*v1alpha1.TiDB{ + fakeAvailableTiDB("aaa-xxx", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), newRevision), + fakeAvailableTiDB("aaa-yyy", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), newRevision), + }, + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedTiDBNum: 2, + }, + { + desc: "2 updated tidb with 2 replicas and cannot call api, can complete", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: []*v1alpha1.TiDB{ + fakeAvailableTiDB("aaa-xxx", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), newRevision), + fakeAvailableTiDB("aaa-yyy", fake.FakeObj[v1alpha1.TiDBGroup]("aaa"), newRevision), + }, + }, + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedTiDBNum: 2, + }, + { + // NOTE: it not really check whether the policy is worked + // It should be tested in /pkg/updater and /pkg/updater/policy package + desc: "topology evenly spread", + state: &ReconcileContext{ + State: &state{ + dbg: fake.FakeObj("aaa", func(obj *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + obj.Spec.Replicas = ptr.To[int32](3) + obj.Spec.SchedulePolicies = append(obj.Spec.SchedulePolicies, v1alpha1.SchedulePolicy{ + Type: v1alpha1.SchedulePolicyTypeEvenlySpread, + EvenlySpread: &v1alpha1.SchedulePolicyEvenlySpread{ + Topologies: []v1alpha1.ScheduleTopology{ + { + Topology: v1alpha1.Topology{ + "zone": "us-west-1a", + }, + }, + { + Topology: v1alpha1.Topology{ + "zone": "us-west-1b", + }, + }, + { + Topology: v1alpha1.Topology{ + "zone": "us-west-1c", + }, + }, + }, + }, + }) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: "aaa-655cc6bb8f", + expectedCurrentRevision: "aaa-655cc6bb8f", + expectedTiDBNum: 3, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - fc := client.NewFakeClient(tt.objs...) - updaterTask := NewTaskUpdater(logr.Discard(), fc) - got := updaterTask.Sync(tt.ctx) - assert.Equal(t, tt.expected.IsFailed(), got.IsFailed()) - assert.Equal(t, tt.expected.ShouldContinue(), got.ShouldContinue()) - assert.Equal(t, tt.expected.RequeueAfter(), got.RequeueAfter()) - - if tt.expectFunc != nil { - tt.expectFunc(t, tt.ctx, fc) + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + ctx := context.Background() + fc := client.NewFakeClient(c.state.TiDBGroup(), c.state.Cluster()) + for _, obj := range c.state.TiDBSlice() { + require.NoError(tt, fc.Apply(ctx, obj), c.desc) + } + + if c.unexpectedErr { + // cannot create or update tidb instance + fc.WithError("patch", "tidbs", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + res, done := task.RunTask(ctx, TaskUpdater(c.state, fc)) + assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), c.desc) + assert.False(tt, done, c.desc) + + assert.Equal(tt, c.expectedUpdateRevision, c.state.UpdateRevision, c.desc) + assert.Equal(tt, c.expectedCurrentRevision, c.state.CurrentRevision, c.desc) + if !c.unexpectedErr { + dbs := v1alpha1.TiDBList{} + require.NoError(tt, fc.List(ctx, &dbs), c.desc) + assert.Len(tt, dbs.Items, c.expectedTiDBNum, c.desc) } }) } } + +func fakeAvailableTiDB(name string, dbg *v1alpha1.TiDBGroup, rev string) *v1alpha1.TiDB { + return fake.FakeObj(name, func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + tidb := runtime.ToTiDB(TiDBNewer(dbg, rev).New()) + tidb.Name = "" + tidb.Status.Conditions = append(tidb.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiDBCondHealth, + Status: metav1.ConditionTrue, + }) + tidb.Status.CurrentRevision = rev + tidb.DeepCopyInto(obj) + return obj + }) +} diff --git a/pkg/controllers/tidbgroup/tasks/util.go b/pkg/controllers/tidbgroup/tasks/util.go index c30620815f..1f4230a586 100644 --- a/pkg/controllers/tidbgroup/tasks/util.go +++ b/pkg/controllers/tidbgroup/tasks/util.go @@ -21,3 +21,7 @@ import ( func HeadlessServiceName(groupName string) string { return fmt.Sprintf("%s-tidb-peer", groupName) } + +func InternalServiceName(groupName string) string { + return fmt.Sprintf("%s-tidb", groupName) +} diff --git a/pkg/controllers/tikvgroup/builder.go b/pkg/controllers/tikvgroup/builder.go index 96254bcfa2..3ea6394cb7 100644 --- a/pkg/controllers/tikvgroup/builder.go +++ b/pkg/controllers/tikvgroup/builder.go @@ -26,7 +26,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get tikvgroup common.TaskContextTiKVGroup(state, r.Client), // if it's gone just return - task.IfBreak(common.CondTiKVGroupHasBeenDeleted(state)), + task.IfBreak(common.CondGroupHasBeenDeleted(state)), // get cluster common.TaskContextCluster(state, r.Client), @@ -36,8 +36,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get all tikvs common.TaskContextTiKVSlice(state, r.Client), - task.IfBreak(common.CondTiKVGroupIsDeleting(state), - tasks.TaskFinalizerDel(state, r.Client), + task.IfBreak(common.CondGroupIsDeleting(state), + common.TaskGroupFinalizerDel[runtime.TiKVGroupTuple, runtime.TiKVTuple](state, r.Client), ), common.TaskGroupFinalizerAdd[runtime.TiKVGroupTuple](state, r.Client), diff --git a/pkg/controllers/tikvgroup/tasks/finalizer.go b/pkg/controllers/tikvgroup/tasks/finalizer.go deleted file mode 100644 index cb37a2615c..0000000000 --- a/pkg/controllers/tikvgroup/tasks/finalizer.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tasks - -import ( - "context" - "fmt" - "time" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - utilerr "k8s.io/apimachinery/pkg/util/errors" - - "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/runtime" - "github.com/pingcap/tidb-operator/pkg/utils/k8s" - "github.com/pingcap/tidb-operator/pkg/utils/task/v3" -) - -const defaultDelWaitTime = 10 * time.Second - -func TaskFinalizerDel(state State, c client.Client) task.Task { - return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result { - var errList []error - var names []string - for _, peer := range state.TiKVSlice() { - names = append(names, peer.Name) - if peer.GetDeletionTimestamp().IsZero() { - if err := c.Delete(ctx, peer); err != nil { - if errors.IsNotFound(err) { - continue - } - errList = append(errList, fmt.Errorf("try to delete the tikv instance %v failed: %w", peer.Name, err)) - continue - } - } - } - - if len(errList) != 0 { - return task.Fail().With("failed to delete all tikv instances: %v", utilerr.NewAggregate(errList)) - } - - if len(names) != 0 { - return task.Retry(defaultDelWaitTime).With("wait for all tikv instances being removed, %v still exists", names) - } - - wait, err := k8s.DeleteGroupSubresource(ctx, c, runtime.FromTiKVGroup(state.TiKVGroup()), &corev1.ServiceList{}) - if err != nil { - return task.Fail().With("cannot delete subresources: %w", err) - } - if wait { - return task.Wait().With("wait all subresources deleted") - } - - if err := k8s.RemoveFinalizer(ctx, c, state.TiKVGroup()); err != nil { - return task.Fail().With("failed to ensure finalizer has been removed: %w", err) - } - - return task.Complete().With("finalizer has been removed") - }) -} diff --git a/pkg/controllers/tikvgroup/tasks/finalizer_test.go b/pkg/controllers/tikvgroup/tasks/finalizer_test.go deleted file mode 100644 index 583f61135b..0000000000 --- a/pkg/controllers/tikvgroup/tasks/finalizer_test.go +++ /dev/null @@ -1,237 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tasks - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/pingcap/tidb-operator/apis/core/v1alpha1" - "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/fake" - "github.com/pingcap/tidb-operator/pkg/utils/task/v3" -) - -const ( - defaultTestClusterName = "cluster" -) - -func TestTaskFinalizerDel(t *testing.T) { - now := metav1.Now() - cases := []struct { - desc string - state State - subresources []client.Object - unexpectedErr bool - - expectedStatus task.Status - expectedObj *v1alpha1.TiKVGroup - }{ - { - desc: "no tikv and no sub resources and no finalizer", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - return obj - }), - }, - expectedStatus: task.SComplete, - expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - return obj - }), - }, - { - desc: "no tikv and no sub resources", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - - expectedStatus: task.SComplete, - expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = []string{} - return obj - }), - }, - { - desc: "no tikv and no sub resources but call api failed", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - unexpectedErr: true, - - expectedStatus: task.SFail, - }, - { - desc: "no tikv but has sub resources", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - subresources: []client.Object{ - fake.FakeObj("aaa", - fake.Label[corev1.Service](v1alpha1.LabelKeyManagedBy, v1alpha1.LabelValManagedByOperator), - fake.Label[corev1.Service](v1alpha1.LabelKeyCluster, defaultTestClusterName), - fake.Label[corev1.Service](v1alpha1.LabelKeyComponent, v1alpha1.LabelValComponentTiKV), - fake.Label[corev1.Service](v1alpha1.LabelKeyGroup, "aaa"), - ), - }, - - expectedStatus: task.SWait, - expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - { - desc: "no tikv but has sub resources and call api failed", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - subresources: []client.Object{ - fake.FakeObj("aaa", - fake.Label[corev1.Service](v1alpha1.LabelKeyManagedBy, v1alpha1.LabelValManagedByOperator), - fake.Label[corev1.Service](v1alpha1.LabelKeyCluster, defaultTestClusterName), - fake.Label[corev1.Service](v1alpha1.LabelKeyComponent, v1alpha1.LabelValComponentTiKV), - fake.Label[corev1.Service](v1alpha1.LabelKeyGroup, "aaa"), - ), - }, - unexpectedErr: true, - - expectedStatus: task.SFail, - }, - { - desc: "has tikv with finalizer", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - kvs: []*v1alpha1.TiKV{ - fake.FakeObj("aaa", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - }, - - expectedStatus: task.SRetry, - expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - { - desc: "has tikv with finalizer but call api failed", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - kvs: []*v1alpha1.TiKV{ - fake.FakeObj("aaa", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - }, - unexpectedErr: true, - - expectedStatus: task.SFail, - }, - { - desc: "has deleting tikv with finalizer but call api failed", - state: &state{ - kvg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.TiKVGroup](&now), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { - obj.Spec.Cluster.Name = defaultTestClusterName - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - kvs: []*v1alpha1.TiKV{ - fake.FakeObj("aaa", fake.DeleteNow[v1alpha1.TiKV](), func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { - obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) - return obj - }), - }, - }, - unexpectedErr: true, - - expectedStatus: task.SRetry, - }, - } - - for i := range cases { - c := &cases[i] - t.Run(c.desc, func(tt *testing.T) { - tt.Parallel() - - objs := []client.Object{ - c.state.TiKVGroup(), - } - - objs = append(objs, c.subresources...) - - fc := client.NewFakeClient(objs...) - if c.unexpectedErr { - // cannot remove finalizer - fc.WithError("update", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) - // cannot delete sub resources - fc.WithError("delete", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - res, done := task.RunTask(ctx, TaskFinalizerDel(c.state, fc)) - assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), c.desc) - assert.False(tt, done, c.desc) - - // no need to check update result - if c.unexpectedErr { - return - } - - kvg := &v1alpha1.TiKVGroup{} - require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, kvg), c.desc) - assert.Equal(tt, c.expectedObj, kvg, c.desc) - }) - } -} diff --git a/pkg/controllers/tikvgroup/tasks/status.go b/pkg/controllers/tikvgroup/tasks/status.go index 2342fcd498..0fa1c68a4b 100644 --- a/pkg/controllers/tikvgroup/tasks/status.go +++ b/pkg/controllers/tikvgroup/tasks/status.go @@ -29,11 +29,11 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { return task.NameTaskFunc("Status", func(ctx context.Context) task.Result { kvg := state.TiKVGroup() needUpdate := meta.SetStatusCondition(&kvg.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: kvg.Generation, - Reason: v1alpha1.TiKVGroupSuspendReason, - Message: "tikv group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) replicas, readyReplicas, updateReplicas, currentReplicas := calcReplicas(state.TiKVSlice(), state.CurrentRevision, state.UpdateRevision) diff --git a/pkg/controllers/tikvgroup/tasks/status_test.go b/pkg/controllers/tikvgroup/tasks/status_test.go index fa94890762..2fd774e465 100644 --- a/pkg/controllers/tikvgroup/tasks/status_test.go +++ b/pkg/controllers/tikvgroup/tasks/status_test.go @@ -54,11 +54,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiKVGroup](3), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.TiKVGroupSuspendReason, - Message: "tikv group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 0 @@ -94,11 +94,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiKVGroup](3), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.TiKVGroupSuspendReason, - Message: "tikv group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 @@ -134,11 +134,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiKVGroup](3), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.TiKVGroupSuspendReason, - Message: "tikv group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 @@ -170,11 +170,11 @@ func TestTaskStatus(t *testing.T) { expectedStatus: task.SComplete, expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiKVGroup](3), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.TiKVGroupSuspendReason, - Message: "tikv group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 @@ -212,11 +212,11 @@ func TestTaskStatus(t *testing.T) { State: &state{ kvg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.TiKVGroup](3), func(obj *v1alpha1.TiKVGroup) *v1alpha1.TiKVGroup { obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVGroupCondSuspended, + Type: v1alpha1.CondSuspended, Status: metav1.ConditionFalse, ObservedGeneration: 3, - Reason: v1alpha1.TiKVGroupSuspendReason, - Message: "tikv group is not suspended", + Reason: v1alpha1.ReasonUnsuspended, + Message: "group is not suspended", }) obj.Status.ObservedGeneration = 3 obj.Status.Replicas = 1 diff --git a/pkg/runtime/object.go b/pkg/runtime/object.go index 4c92d77f52..06c746e5a7 100644 --- a/pkg/runtime/object.go +++ b/pkg/runtime/object.go @@ -52,3 +52,9 @@ type Tuple[T any, U any] interface { type ObjectTuple[PT client.Object, PU Object] interface { Tuple[PT, PU] } + +func Component[T ObjectSet, O ObjectT[T]]() string { + // TODO(liubo02): new only once, now it's ok because only used in test + var o O = new(T) + return o.Component() +}