Skip to content

Commit

Permalink
Address review comments by @ishan16696
Browse files Browse the repository at this point in the history
  • Loading branch information
anveshreddy18 committed Mar 14, 2024
1 parent 6e3a8eb commit e133c02
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 69 deletions.
5 changes: 0 additions & 5 deletions pkg/miscellaneous/miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,3 @@ func RemoveDir(dir string) error {
}
return nil
}

// IsUnitTestEnv checks whether the environment is unit test or not.
func IsUnitTestEnv() bool {
return os.Getenv("UNIT_TEST") == "true"
}
13 changes: 4 additions & 9 deletions pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

// RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped.
// The timer starts upon snapshotter initialization and is reset after every full snapshot is taken.
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically() {
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}) {
logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater")
fullSnapshotLeaseUpdateInterval := ssr.healthConfig.FullSnapshotLeaseUpdateInterval.Duration
fullSnapshotLeaseUpdateInterval := ssr.HealthConfig.FullSnapshotLeaseUpdateInterval.Duration
ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval)
fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO())
defer func() {
Expand All @@ -35,7 +35,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically() {
if err := func() error {
ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.healthConfig.FullSnapshotLeaseName)
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName)
}(); err != nil {
//FullSnapshot lease update failed. Retry after interval
logger.Warnf("FullSnapshot lease update failed with error: %v", err)
Expand All @@ -53,14 +53,9 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically() {
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
}

case <-ssr.FullSnapshotLeaseStopCh:
case <-FullSnapshotLeaseStopCh:
logger.Info("Closing the full snapshot lease renewal")
return
}
}
}

// SetFullSnapshotLeaseUpdatePeriod sets the interval for updating the full snapshot lease.
func (ssr *Snapshotter) SetFullSnapshotLeaseUpdatePeriod(period time.Duration) {
ssr.healthConfig.FullSnapshotLeaseUpdateInterval.Duration = period
}
75 changes: 37 additions & 38 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Snapshotter struct {
store brtypes.SnapStore
config *brtypes.SnapshotterConfig
compressionConfig *compressor.CompressionConfig
healthConfig *brtypes.HealthConfig
HealthConfig *brtypes.HealthConfig
schedule cron.Schedule
PrevSnapshot *brtypes.Snapshot
PrevFullSnapshot *brtypes.Snapshot
Expand All @@ -87,7 +87,6 @@ type Snapshotter struct {
fullSnapshotAckCh chan result
deltaSnapshotAckCh chan result
FullSnapshotLeaseUpdateTimer *time.Timer
FullSnapshotLeaseStopCh chan struct{}
fullSnapshotTimer *time.Timer
deltaSnapshotTimer *time.Timer
events []byte
Expand Down Expand Up @@ -130,44 +129,44 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto

metrics.LatestSnapshotRevision.With(prometheus.Labels{metrics.LabelKind: prevSnapshot.Kind}).Set(float64(prevSnapshot.LastRevision))

//Attempt to create clientset only if `enable-snapshot-lease-renewal` flag of healthConfig is set & when not running in unit test environment
//Attempt to create clientset only if `enable-snapshot-lease-renewal` flag of healthConfig is set
var clientSet client.Client
if healthConfig.SnapshotLeaseRenewalEnabled && !miscellaneous.IsUnitTestEnv() {
if healthConfig.SnapshotLeaseRenewalEnabled {
clientSet, err = miscellaneous.GetKubernetesClientSetOrError()
if err != nil {
return nil, err
}
}

return &Snapshotter{
logger: logger.WithField("actor", "snapshotter"),
store: store,
config: config,
etcdConnectionConfig: etcdConnectionConfig,
compressionConfig: compressionConfig,
healthConfig: healthConfig,
schedule: sdl,
PrevSnapshot: prevSnapshot,
PrevFullSnapshot: fullSnap,
PrevDeltaSnapshots: deltaSnapList,
SsrState: brtypes.SnapshotterInactive,
SsrStateMutex: &sync.Mutex{},
fullSnapshotReqCh: make(chan bool),
deltaSnapshotReqCh: make(chan struct{}),
fullSnapshotAckCh: make(chan result),
deltaSnapshotAckCh: make(chan result),
FullSnapshotLeaseStopCh: make(chan struct{}),
cancelWatch: func() {},
K8sClientset: clientSet,
snapstoreConfig: storeConfig,
logger: logger.WithField("actor", "snapshotter"),
store: store,
config: config,
etcdConnectionConfig: etcdConnectionConfig,
compressionConfig: compressionConfig,
HealthConfig: healthConfig,
schedule: sdl,
PrevSnapshot: prevSnapshot,
PrevFullSnapshot: fullSnap,
PrevDeltaSnapshots: deltaSnapList,
SsrState: brtypes.SnapshotterInactive,
SsrStateMutex: &sync.Mutex{},
fullSnapshotReqCh: make(chan bool),
deltaSnapshotReqCh: make(chan struct{}),
fullSnapshotAckCh: make(chan result),
deltaSnapshotAckCh: make(chan result),
cancelWatch: func() {},
K8sClientset: clientSet,
snapstoreConfig: storeConfig,
}, nil
}

// Run process loop for scheduled backup
// Setting startWithFullSnapshot to false will start the snapshotter without
// taking the first full snapshot.
func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) error {
defer ssr.stop()
FullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(FullSnapshotLeaseStopCh)
if startWithFullSnapshot {
ssr.fullSnapshotTimer = time.NewTimer(0)
} else {
Expand All @@ -188,8 +187,8 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool)
return fmt.Errorf("failed to reset full snapshot timer: %v", err)
}
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
go ssr.RenewFullSnapshotLeasePeriodically()
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh)
}
ssr.deltaSnapshotTimer = time.NewTimer(brtypes.DefaultDeltaSnapshotInterval)
if ssr.config.DeltaSnapshotPeriod.Duration >= brtypes.DeltaSnapshotIntervalThreshold {
Expand Down Expand Up @@ -235,7 +234,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {

// stop stops the snapshotter. Once stopped any subsequent calls will
// not have any effect.
func (ssr *Snapshotter) stop() {
func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
ssr.logger.Info("Closing the Snapshotter...")

if ssr.fullSnapshotTimer != nil {
Expand All @@ -246,8 +245,8 @@ func (ssr *Snapshotter) stop() {
ssr.deltaSnapshotTimer.Stop()
ssr.deltaSnapshotTimer = nil
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseStopCh <- emptyStruct
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
FullSnapshotLeaseStopCh <- emptyStruct
}
ssr.SetSnapshotterInactive()
ssr.closeEtcdClient()
Expand Down Expand Up @@ -641,7 +640,7 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(time.Nanosecond)
}
Expand All @@ -656,9 +655,9 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err = heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err = heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand All @@ -668,7 +667,7 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if _, err := ssr.TakeFullSnapshotAndResetTimer(false); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(time.Nanosecond)
}
Expand All @@ -678,9 +677,9 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if _, err := ssr.takeDeltaSnapshotAndResetTimer(); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand All @@ -695,11 +694,11 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err := ssr.handleDeltaWatchEvents(wr); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
//Call UpdateDeltaSnapshotLease only if new delta snapshot taken
if snapshots < len(ssr.PrevDeltaSnapshots) {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
43 changes: 26 additions & 17 deletions pkg/snapshot/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,15 +898,15 @@ var _ = Describe("Snapshotter", func() {
})
})

Describe("Scenarios to update full snapshot lease", func() {
FDescribe("Scenarios to update full snapshot lease", func() {
var (
ssr *Snapshotter
lease *v1.Lease
ctx context.Context
cancel context.CancelFunc
ssr *Snapshotter
lease *v1.Lease
FullSnapshotLeaseStopCh chan struct{}
ctx context.Context
cancel context.CancelFunc
)
BeforeEach(func() {
healthConfig.SnapshotLeaseRenewalEnabled = true
snapstoreConfig = &brtypes.SnapstoreConfig{Container: path.Join(outputDir, "default.bkp")}
store, err = snapstore.GetSnapstore(snapstoreConfig)
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -931,8 +931,8 @@ var _ = Describe("Snapshotter", func() {
Expect(err).ShouldNot(HaveOccurred())
ssr.PrevFullSnapshot = nil
ssr.K8sClientset = fake.NewClientBuilder().Build()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
ssr.HealthConfig.SnapshotLeaseRenewalEnabled = true
FullSnapshotLeaseStopCh = make(chan struct{})
})
AfterEach(func() {
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expand All @@ -941,13 +941,16 @@ var _ = Describe("Snapshotter", func() {
})
Context("Without previous full snapshot", func() {
It("should not update the lease", func() {
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

err := ssr.K8sClientset.Create(ctx, lease)
Expect(err).ShouldNot(HaveOccurred())

ssr.SetFullSnapshotLeaseUpdatePeriod(2 * time.Second)
go ssr.RenewFullSnapshotLeasePeriodically()
ssr.HealthConfig.FullSnapshotLeaseUpdateInterval.Duration = 2 * time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh)
time.Sleep(2 * time.Second)
close(ssr.FullSnapshotLeaseStopCh)
close(FullSnapshotLeaseStopCh)

l := &v1.Lease{}
Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{
Expand All @@ -959,6 +962,9 @@ var _ = Describe("Snapshotter", func() {
})
Context("With previous full snapshot", func() {
It("should be able to fetch and update the full snapshot lease", func() {
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

prevFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
Expand All @@ -970,10 +976,10 @@ var _ = Describe("Snapshotter", func() {
err := ssr.K8sClientset.Create(ctx, lease)
Expect(err).ShouldNot(HaveOccurred())

ssr.SetFullSnapshotLeaseUpdatePeriod(time.Second)
go ssr.RenewFullSnapshotLeasePeriodically()
ssr.HealthConfig.FullSnapshotLeaseUpdateInterval.Duration = time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh)
time.Sleep(2 * time.Second)
close(ssr.FullSnapshotLeaseStopCh)
close(FullSnapshotLeaseStopCh)

l := &v1.Lease{}
Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{
Expand All @@ -983,6 +989,9 @@ var _ = Describe("Snapshotter", func() {
Expect(*l.Spec.HolderIdentity).To(Equal(strconv.FormatInt(prevFullSnap.LastRevision, 10)))
})
It("should not be able to fetch the lease at first but should be able to update the lease in the next attempt", func() {
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

prevFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
Expand All @@ -991,8 +1000,8 @@ var _ = Describe("Snapshotter", func() {
}
prevFullSnap.GenerateSnapshotName()
ssr.PrevFullSnapshot = prevFullSnap
ssr.SetFullSnapshotLeaseUpdatePeriod(3 * time.Second)
go ssr.RenewFullSnapshotLeasePeriodically()
ssr.HealthConfig.FullSnapshotLeaseUpdateInterval.Duration = 3 * time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh)
time.Sleep(time.Second)
err := ssr.K8sClientset.Create(ctx, lease)
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -1010,7 +1019,7 @@ var _ = Describe("Snapshotter", func() {
Name: lease.Name,
}, l)).To(Succeed())
Expect(*l.Spec.HolderIdentity).To(Equal(strconv.FormatInt(prevFullSnap.LastRevision, 10)))
close(ssr.FullSnapshotLeaseStopCh)
close(FullSnapshotLeaseStopCh)
})
})
})
Expand Down

0 comments on commit e133c02

Please sign in to comment.