Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

[nginx-ingress-controller]: Use delayed queue #1253

Merged
merged 1 commit into from
Jun 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions ingress/controllers/nginx/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,10 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[stri
return namedPorts, nil
}

func (lbc *loadBalancerController) sync(key string) {
func (lbc *loadBalancerController) sync(key string) error {
if !lbc.controllersInSync() {
time.Sleep(podStoreSyncedPollPeriod)
lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
return
return fmt.Errorf("deferring sync till endpoints controller has synced")
}

var cfg *api.ConfigMap
Expand All @@ -435,29 +434,28 @@ func (lbc *loadBalancerController) sync(key string) {
ings := lbc.ingLister.Store.List()
upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings)

lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
return lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
Upstreams: upstreams,
Servers: servers,
TCPUpstreams: lbc.getTCPServices(),
UDPUpstreams: lbc.getUDPServices(),
})
}

func (lbc *loadBalancerController) updateIngressStatus(key string) {
func (lbc *loadBalancerController) updateIngressStatus(key string) error {
if !lbc.controllersInSync() {
time.Sleep(podStoreSyncedPollPeriod)
lbc.ingQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
return
return fmt.Errorf("deferring sync till endpoints controller has synced")
}

obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
lbc.ingQueue.requeue(key, err)
return
return err
}

if !ingExists {
return
// TODO: what's the correct behavior here?
return nil
}

ing := obj.(*extensions.Ingress)
Expand All @@ -466,8 +464,7 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) {

currIng, err := ingClient.Get(ing.Name)
if err != nil {
glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
return
return fmt.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}

lbIPs := ing.Status.LoadBalancer.Ingress
Expand All @@ -478,11 +475,13 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) {
})
if _, err := ingClient.UpdateStatus(currIng); err != nil {
lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err)
return
return err
}

lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.podInfo.NodeIP)
}

return nil
}

func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool {
Expand Down
13 changes: 8 additions & 5 deletions ingress/controllers/nginx/nginx/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (ngx *Manager) Start() {
// shut down, stop accepting new connections and continue to service current requests
// until all such requests are serviced. After that, the old worker processes exit.
// http://nginx.org/en/docs/beginners_guide.html#control
func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) {
func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) error {
ngx.reloadRateLimiter.Accept()

ngx.reloadLock.Lock()
Expand All @@ -65,15 +65,18 @@ func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressC
newCfg, err := ngx.writeCfg(cfg, ingressCfg)

if err != nil {
glog.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
return
return fmt.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
}

if newCfg {
if err := ngx.shellOut("nginx -s reload"); err == nil {
glog.Info("change in configuration detected. Reloading...")
if err := ngx.shellOut("nginx -s reload"); err != nil {
return fmt.Errorf("error reloading nginx: %v", err)
}

glog.Info("change in configuration detected. Reloading...")
}

return nil
}

// shellOut executes a command and returns its combined standard output and standard
Expand Down
21 changes: 13 additions & 8 deletions ingress/controllers/nginx/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type StoreToConfigmapLister struct {
// invokes the given sync function for every work item inserted.
type taskQueue struct {
// queue is the work queue the worker polls
queue *workqueue.Type
queue workqueue.RateLimitingInterface
// sync is called for each item in the queue
sync func(string)
sync func(string) error
// workerDone is closed when the worker exits
workerDone chan struct{}
}
Expand All @@ -72,9 +72,8 @@ func (t *taskQueue) enqueue(obj interface{}) {
t.queue.Add(key)
}

func (t *taskQueue) requeue(key string, err error) {
glog.V(3).Infof("requeuing %v, err %v", key, err)
t.queue.Add(key)
func (t *taskQueue) requeue(key string) {
t.queue.AddRateLimited(key)
}

// worker processes work in the queue through sync.
Expand All @@ -86,7 +85,13 @@ func (t *taskQueue) worker() {
return
}
glog.V(3).Infof("syncing %v", key)
t.sync(key.(string))
if err := t.sync(key.(string)); err != nil {
glog.V(3).Infof("requeuing %v, err %v", key, err)
t.requeue(key.(string))
} else {
t.queue.Forget(key)
}

t.queue.Done(key)
}
}
Expand All @@ -99,9 +104,9 @@ func (t *taskQueue) shutdown() {

// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
func NewTaskQueue(syncFn func(string)) *taskQueue {
func NewTaskQueue(syncFn func(string) error) *taskQueue {
return &taskQueue{
queue: workqueue.New(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
sync: syncFn,
workerDone: make(chan struct{}),
}
Expand Down