Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full snapshot lease update retry on failure #711

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
if opts.EnabledLeaseRenewal {
// Update revisions in holder identity of full snapshot lease.
ctx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName, opts.DeltaSnapshotLeaseName); err != nil {
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName); err != nil {
cp.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
14 changes: 6 additions & 8 deletions pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error {
return nil
}

// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot
// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot.
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if k8sClientset == nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -185,7 +185,7 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
if err != nil {
return err
}
if rev > fullSnapshot.LastRevision {
if rev >= fullSnapshot.LastRevision {
return nil
}
}
Expand All @@ -211,7 +211,6 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
}

return nil
}

Expand Down Expand Up @@ -278,18 +277,17 @@ func UpdateDeltaSnapshotLease(ctx context.Context, logger *logrus.Entry, prevDel
return nil
}

// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease and the deltasnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, deltaSnapshotLeaseName string) error {
// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName); err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
}

return nil
}

// DeltaSnapshotCaseLeaseUpdate Updates the fullsnapshot lease and the deltasnapshot lease as needed when a delta snapshot is taken
// DeltaSnapshotCaseLeaseUpdate Updates the deltasnapshot lease as needed when a delta snapshot is taken
func DeltaSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, k8sClientset client.Client, deltaSnapshotLeaseName string, store brtypes.SnapStore) error {
_, latestDeltaSnapshotList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err == nil {
Expand Down Expand Up @@ -343,7 +341,7 @@ func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hco
hb.logger.Info("Stopped member lease renewal timer")
return nil
case <-stopCh:
hb.logger.Info("Stoping the member lease renewal")
hb.logger.Info("Stopping the member lease renewal")
return nil
}
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,46 @@ var _ = Describe("Heartbeat", func() {
Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod"))
Expect(os.Getenv("POD_NAMESPACE")).Should(Equal("test_namespace"))

snap := &brtypes.Snapshot{
prevFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
StartRevision: 0,
LastRevision: 980,
}
prevFullSnap.GenerateSnapshotName()

latestFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
StartRevision: 0,
LastRevision: 989,
}
snap.GenerateSnapshotName()
latestFullSnap.GenerateSnapshotName()
err := k8sClientset.Create(context.TODO(), lease)
Expect(err).ShouldNot(HaveOccurred())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, snap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
// Update full snapshot lease with the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)).To(Succeed())
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("980")))

Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))
// Trigger full snapshot lease update with latest full snapshot which is not the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).ShouldNot(HaveOccurred())

l = &v1.Lease{}
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)).To(Succeed())
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))

err = k8sClientset.Delete(context.TODO(), l)
Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -132,7 +150,7 @@ var _ = Describe("Heartbeat", func() {

Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).Should(HaveOccurred())

err = k8sClientset.Delete(context.TODO(), lease)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
if b.config.HealthConfig.SnapshotLeaseRenewalEnabled {
leaseUpdatectx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName, b.config.HealthConfig.DeltaSnapshotLeaseName); err != nil {
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName); err != nil {
b.logger.Warnf("Snapshot lease update failed : %v", err)
}
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package snapshotter

import (
"context"
"time"

"github.com/gardener/etcd-backup-restore/pkg/health/heartbeat"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
)

// RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped.
// The timer starts upon snapshotter initialization and is reset after every full snapshot is taken.
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}) {
logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater")
fullSnapshotLeaseUpdateInterval := ssr.HealthConfig.FullSnapshotLeaseUpdateInterval.Duration
ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval)
fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO())
defer func() {
fullSnapshotLeaseUpdateCancel()
if ssr.FullSnapshotLeaseUpdateTimer != nil {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer = nil
}
}()
logger.Infof("Starting the FullSnapshot lease renewal with interval %v", fullSnapshotLeaseUpdateInterval)
for {
select {
case <-ssr.FullSnapshotLeaseUpdateTimer.C:
if ssr.PrevFullSnapshot != nil {
if err := func() error {
ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName)
}(); err != nil {
//FullSnapshot lease update failed. Retry after interval
logger.Warnf("FullSnapshot lease update failed with error: %v", err)
logger.Infof("Resetting the FullSnapshot lease to retry updating with revision %d after %v", ssr.PrevFullSnapshot.LastRevision, fullSnapshotLeaseUpdateInterval)
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
} else {
//FullSnapshot lease successfully updated. Stop the timer
logger.Infof("Stopping the FullSnapshot lease update")
ssr.FullSnapshotLeaseUpdateTimer.Stop()
}
} else {
//Skip the FullSnapshot lease update as no full snapshot has been taken yet. Reset the timer to retry after interval
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
}

case <-FullSnapshotLeaseStopCh:
logger.Info("Closing the full snapshot lease renewal")
return
}
}
}
Loading