Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

physical/zookeeper: Re-try to release lock in case of failure #33

Merged
merged 1 commit into from
Jun 5, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions physical/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,10 @@ func (c *ZooKeeperBackend) List(prefix string) ([]string, error) {
// LockWith is used for mutual exclusion based on the given key.
func (c *ZooKeeperBackend) LockWith(key, value string) (physical.Lock, error) {
l := &ZooKeeperHALock{
in: c,
key: key,
value: value,
in: c,
key: key,
value: value,
logger: c.logger,
}
return l, nil
}
Expand All @@ -328,13 +329,15 @@ func (c *ZooKeeperBackend) HAEnabled() bool {

// ZooKeeperHALock is a ZooKeeper Lock implementation for the HABackend
type ZooKeeperHALock struct {
in *ZooKeeperBackend
key string
value string
in *ZooKeeperBackend
key string
value string
logger log.Logger

held bool
localLock sync.Mutex
leaderCh chan struct{}
stopCh <-chan struct{}
zkLock *zk.Lock
}

Expand Down Expand Up @@ -377,6 +380,8 @@ func (i *ZooKeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error)
}
go i.monitorLock(lockeventCh, i.leaderCh)

i.stopCh = stopCh

return i.leaderCh, nil
}

Expand Down Expand Up @@ -433,16 +438,55 @@ func (i *ZooKeeperHALock) monitorLock(lockeventCh <-chan zk.Event, leaderCh chan
}
}

func (i *ZooKeeperHALock) Unlock() error {
func (i *ZooKeeperHALock) unlockInternal() error {
i.localLock.Lock()
defer i.localLock.Unlock()
if !i.held {
return nil
}

i.held = false
i.zkLock.Unlock()
return nil
err := i.zkLock.Unlock()

if err == nil {
i.held = false
return nil
}

return err
}

func (i *ZooKeeperHALock) Unlock() error {
var err error

if err = i.unlockInternal(); err != nil {
i.logger.Error("zookeeper: failed to release distributed lock", "error", err)

go func(i *ZooKeeperHALock) {
attempts := 0
i.logger.Info("zookeeper: launching automated distributed lock release")

for {
if err := i.unlockInternal(); err == nil {
i.logger.Info("zookeeper: distributed lock released")
return
}

select {
case <-time.After(time.Second):
attempts := attempts + 1
if attempts >= 10 {
i.logger.Error("zookeeper: release lock max attempts reached. Lock may not be released", "error", err)
return
}
continue
case <-i.stopCh:
return
}
}
}(i)
}

return err
}

func (i *ZooKeeperHALock) Value() (bool, string, error) {
Expand Down