Skip to content

Commit

Permalink
set isPoolCacheSynced as true if lease updated
Browse files Browse the repository at this point in the history
Signed-off-by: Congrool <[email protected]>
  • Loading branch information
Congrool committed Jan 9, 2023
1 parent b81bff6 commit 7b8c52e
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 57 deletions.
156 changes: 99 additions & 57 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ type coordinator struct {
isPoolCacheSynced bool
certMgr *certmanager.CertManager
needUploadLocalCache bool
// poolScopeCacheSyncManager is used to sync pool-scoped resources from cloud to poolcoordinator.
poolScopeCacheSyncManager *poolScopedCacheSyncManager
// informerSyncLeaseManager is used to detect the leader-informer-sync lease
// to check its RenewTime. If its renewTime is not updated after defaultInformerLeaseRenewDuration
// we can think that the poolcoordinator cache is stale and the poolcoordinator is not ready.
// It will start if yurthub becomes leader or follower.
informerSyncLeaseManager *coordinatorLeaseInformerManager
// poolCacheSyncManager is used to sync pool-scoped resources from cloud to poolcoordinator.
poolCacheSyncManager *poolScopedCacheSyncManager
// poolCacheSyncedDector is used to detect if pool cache is synced and ready for use.
// It will list/watch the informer sync lease, and if it's renewed by leader yurthub, isPoolCacheSynced will
// be set as true which means the pool cache is ready for use. It also starts a routine which will set
// isPoolCacheSynced as false if the informer sync lease has not been updated for a duration.
poolCacheSyncedDetector *poolCacheSyncedDetector
// delegateNodeLeaseManager is used to list/watch kube-node-lease from poolcoordinator. If the
// node lease contains DelegateHeartBeat label, it will triger the eventhandler which will
// use cloud client to send it to cloud APIServer.
Expand Down Expand Up @@ -144,9 +144,19 @@ func NewCoordinator(
hubElector: elector,
}

informerSyncLeaseManager := &coordinatorLeaseInformerManager{
ctx: ctx,
coordinatorClient: coordinatorClient,
poolCacheSyncedDetector := &poolCacheSyncedDetector{
ctx: ctx,
updateNotifyCh: make(chan struct{}),
syncLeaseManager: &coordinatorLeaseInformerManager{
ctx: ctx,
coordinatorClient: coordinatorClient,
},
staleTimeout: defaultPoolCacheStaleDuration,
isPoolCacheSyncSetter: func(value bool) {
coordinator.Lock()
defer coordinator.Unlock()
coordinator.isPoolCacheSynced = value
},
}

delegateNodeLeaseManager := &coordinatorLeaseInformerManager{
Expand All @@ -166,9 +176,9 @@ func NewCoordinator(
getEtcdStore: coordinator.getEtcdStore,
}

coordinator.informerSyncLeaseManager = informerSyncLeaseManager
coordinator.poolCacheSyncedDetector = poolCacheSyncedDetector
coordinator.delegateNodeLeaseManager = delegateNodeLeaseManager
coordinator.poolScopeCacheSyncManager = poolScopedCacheSyncManager
coordinator.poolCacheSyncManager = poolScopedCacheSyncManager

return coordinator, nil
}
Expand All @@ -185,9 +195,9 @@ func (coordinator *coordinator) Run() {

select {
case <-coordinator.ctx.Done():
coordinator.poolScopeCacheSyncManager.EnsureStop()
coordinator.poolCacheSyncManager.EnsureStop()
coordinator.delegateNodeLeaseManager.EnsureStop()
coordinator.informerSyncLeaseManager.EnsureStop()
coordinator.poolCacheSyncedDetector.EnsureStop()
klog.Info("exit normally in coordinator loop.")
return
case electorStatus, ok := <-coordinator.hubElector.StatusChan():
Expand All @@ -197,9 +207,9 @@ func (coordinator *coordinator) Run() {

switch electorStatus {
case PendingHub:
coordinator.poolScopeCacheSyncManager.EnsureStop()
coordinator.poolCacheSyncManager.EnsureStop()
coordinator.delegateNodeLeaseManager.EnsureStop()
coordinator.informerSyncLeaseManager.EnsureStop()
coordinator.poolCacheSyncedDetector.EnsureStop()
needUploadLocalCache = true
needCancelEtcdStorage = true
isPoolCacheSynced = false
Expand All @@ -217,11 +227,10 @@ func (coordinator *coordinator) Run() {
klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err)
continue
}
if err := coordinator.poolScopeCacheSyncManager.EnsureStart(); err != nil {
if err := coordinator.poolCacheSyncManager.EnsureStart(); err != nil {
klog.Errorf("failed to sync pool-scoped resource, %v", err)
continue
}

coordinator.delegateNodeLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{
FilterFunc: ifDelegateHeartBeat,
Handler: cache.ResourceEventHandlerFuncs{
Expand All @@ -233,20 +242,7 @@ func (coordinator *coordinator) Run() {
},
},
})
coordinator.informerSyncLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{
FilterFunc: ifInformerSyncLease,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: coordinator.detectPoolCacheSynced,
UpdateFunc: func(_, newObj interface{}) {
coordinator.detectPoolCacheSynced(newObj)
},
DeleteFunc: func(_ interface{}) {
coordinator.Lock()
defer coordinator.Unlock()
coordinator.isPoolCacheSynced = false
},
},
})
coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
Expand All @@ -262,22 +258,9 @@ func (coordinator *coordinator) Run() {
continue
}

coordinator.poolScopeCacheSyncManager.EnsureStop()
coordinator.poolCacheSyncManager.EnsureStop()
coordinator.delegateNodeLeaseManager.EnsureStop()
coordinator.informerSyncLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{
FilterFunc: ifInformerSyncLease,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: coordinator.detectPoolCacheSynced,
UpdateFunc: func(_, newObj interface{}) {
coordinator.detectPoolCacheSynced(newObj)
},
DeleteFunc: func(_ interface{}) {
coordinator.Lock()
defer coordinator.Unlock()
coordinator.isPoolCacheSynced = false
},
},
})
coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
Expand Down Expand Up @@ -399,16 +382,6 @@ func (coordinator *coordinator) delegateNodeLease(cloudLeaseClient coordclientse
}
}

func (coordinator *coordinator) detectPoolCacheSynced(obj interface{}) {
lease := obj.(*coordinationv1.Lease)
renewTime := lease.Spec.RenewTime
if time.Now().After(renewTime.Add(defaultPoolCacheStaleDuration)) {
coordinator.Lock()
defer coordinator.Unlock()
coordinator.isPoolCacheSynced = false
}
}

// poolScopedCacheSyncManager will continuously sync pool-scoped resources from cloud to pool-coordinator.
// After resource sync is completed, it will periodically renew the informer synced lease, which is used by
// other yurthub to determine if pool-coordinator is ready to handle requests of pool-scoped resources.
Expand Down Expand Up @@ -600,6 +573,75 @@ func (l *localCacheUploader) resourcesToUpload() map[storage.Key][]byte {
return objBytes
}

// poolCacheSyncedDector will list/watch informer-sync-lease to detect if pool cache can be used.
// The leader yurthub should periodically renew the lease. If the lease is not updated for staleTimeout
// duration, it will think the pool cache cannot be used.
type poolCacheSyncedDetector struct {
ctx context.Context
updateNotifyCh chan struct{}
isRunning bool
staleTimeout time.Duration
// syncLeaseManager is used to list/watch the informer-sync-lease, and set the
// isPoolCacheSync as ture when it is renewed.
syncLeaseManager *coordinatorLeaseInformerManager
isPoolCacheSyncSetter func(value bool)
cancelLoop func()
}

func (p *poolCacheSyncedDetector) EnsureStart() {
if !p.isRunning {
p.syncLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{
FilterFunc: ifInformerSyncLease,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: p.detectPoolCacheSynced,
UpdateFunc: func(_, newObj interface{}) {
p.detectPoolCacheSynced(newObj)
},
DeleteFunc: func(_ interface{}) {
p.isPoolCacheSyncSetter(false)
},
},
})

ctx, cancel := context.WithCancel(p.ctx)
p.cancelLoop = cancel
go p.loopForChange(ctx)
}
}

func (p *poolCacheSyncedDetector) EnsureStop() {
if p.isRunning {
p.syncLeaseManager.EnsureStop()
p.cancelLoop()
}
}

func (p *poolCacheSyncedDetector) loopForChange(ctx context.Context) {
t := time.NewTicker(p.staleTimeout)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-p.updateNotifyCh:
t.Reset(p.staleTimeout)
p.isPoolCacheSyncSetter(true)
case <-t.C:
klog.V(4).Infof("timeout waitting for pool cache sync lease being updated, do not use pool cache")
p.isPoolCacheSyncSetter(false)
}
}
}

func (p *poolCacheSyncedDetector) detectPoolCacheSynced(obj interface{}) {
lease := obj.(*coordinationv1.Lease)
renewTime := lease.Spec.RenewTime
if time.Now().Before(renewTime.Add(p.staleTimeout)) {
// The lease is updated before pool cache being considered as stale.
p.updateNotifyCh <- struct{}{}
}
}

func getRv(objBytes []byte) (uint64, error) {
obj := &unstructured.Unstructured{}
if err := json.Unmarshal(objBytes, obj); err != nil {
Expand Down
Loading

0 comments on commit 7b8c52e

Please sign in to comment.