Skip to content

Commit

Permalink
refactor acquireDistributedLock
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed Jun 10, 2016
1 parent 8921c5c commit 900373f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 37 deletions.
4 changes: 1 addition & 3 deletions distributed_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ type distributedLock struct {
func (lock *distributedLock) addNodeRunner(
runner runcmd.Runner,
address address,
) error {
) {
lock.nodes = append(lock.nodes, &distributedLockNode{
address: address,
runner: runner,
})

return nil
}

func (lock *distributedLock) acquire(filename string) error {
Expand Down
67 changes: 33 additions & 34 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func acquireDistributedLock(
noLockFail bool,
) (*distributedLock, error) {
var (
lock = &distributedLock{
cluster = &distributedLock{
noFail: noLockFail,
}

Expand All @@ -31,43 +31,17 @@ func acquireDistributedLock(

for _, nodeAddress := range addresses {
go func(nodeAddress address) {
tracef(`connecting to address: '%s'`,
nodeAddress,
)
err := connectToNode(cluster, runnerFactory, nodeAddress, mutex)

runner, err := runnerFactory(nodeAddress)
if err != nil {
errors <- hierr.Errorf(
err,
`can't create runner for address: %s`,
debugf(`%4d/%d connection established: %s`,
atomic.AddInt64(&nodeIndex, 1),
len(addresses),
nodeAddress,
)

return
}

debugf(`%4d/%d connection established: %s`,
atomic.AddInt64(&nodeIndex, 1),
len(addresses),
nodeAddress,
)

mutex.Lock()
{
err = lock.addNodeRunner(runner, nodeAddress)
}
mutex.Unlock()
if err != nil {
errors <- hierr.Errorf(
err,
`can't add host to the global cluster lock: %s`,
nodeAddress,
)

return
}

errors <- nil
errors <- err
}(nodeAddress)
}

Expand All @@ -81,7 +55,7 @@ func acquireDistributedLock(
}
}

err := lock.acquire(lockFile)
err := cluster.acquire(lockFile)
if err != nil {
return nil, hierr.Errorf(
err,
Expand All @@ -90,5 +64,30 @@ func acquireDistributedLock(
)
}

return lock, nil
return cluster, nil
}

func connectToNode(
cluster *distributedLock,
runnerFactory runnerFactory,
address address,
nodeAddMutex sync.Locker,
) error {
tracef(`connecting to address: '%s'`, address)

runner, err := runnerFactory(address)
if err != nil {
return hierr.Errorf(
err,
`can't create runner for address: %s`,
address,
)
}

nodeAddMutex.Lock()
defer nodeAddMutex.Unlock()

cluster.addNodeRunner(runner, address)

return nil
}

0 comments on commit 900373f

Please sign in to comment.