From 900373fccaa6f010115d17fc9db876e7d94e2c05 Mon Sep 17 00:00:00 2001 From: Stanislav Seletskiy Date: Fri, 10 Jun 2016 12:56:33 +0600 Subject: [PATCH] refactor acquireDistributedLock --- distributed_lock.go | 4 +-- lock.go | 67 ++++++++++++++++++++++----------------------- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/distributed_lock.go b/distributed_lock.go index e3ca322..841f65c 100644 --- a/distributed_lock.go +++ b/distributed_lock.go @@ -18,13 +18,11 @@ type distributedLock struct { func (lock *distributedLock) addNodeRunner( runner runcmd.Runner, address address, -) error { +) { lock.nodes = append(lock.nodes, &distributedLockNode{ address: address, runner: runner, }) - - return nil } func (lock *distributedLock) acquire(filename string) error { diff --git a/lock.go b/lock.go index 016b013..b00e920 100644 --- a/lock.go +++ b/lock.go @@ -19,7 +19,7 @@ func acquireDistributedLock( noLockFail bool, ) (*distributedLock, error) { var ( - lock = &distributedLock{ + cluster = &distributedLock{ noFail: noLockFail, } @@ -31,43 +31,17 @@ func acquireDistributedLock( for _, nodeAddress := range addresses { go func(nodeAddress address) { - tracef(`connecting to address: '%s'`, - nodeAddress, - ) + err := connectToNode(cluster, runnerFactory, nodeAddress, mutex) - runner, err := runnerFactory(nodeAddress) if err != nil { - errors <- hierr.Errorf( - err, - `can't create runner for address: %s`, + debugf(`%4d/%d connection established: %s`, + atomic.AddInt64(&nodeIndex, 1), + len(addresses), nodeAddress, ) - - return - } - - debugf(`%4d/%d connection established: %s`, - atomic.AddInt64(&nodeIndex, 1), - len(addresses), - nodeAddress, - ) - - mutex.Lock() - { - err = lock.addNodeRunner(runner, nodeAddress) - } - mutex.Unlock() - if err != nil { - errors <- hierr.Errorf( - err, - `can't add host to the global cluster lock: %s`, - nodeAddress, - ) - - return } - errors <- nil + errors <- err }(nodeAddress) } @@ -81,7 +55,7 @@ func acquireDistributedLock( } } - err := lock.acquire(lockFile) + err := cluster.acquire(lockFile) if err != nil { return nil, hierr.Errorf( err, @@ -90,5 +64,30 @@ func acquireDistributedLock( ) } - return lock, nil + return cluster, nil +} + +func connectToNode( + cluster *distributedLock, + runnerFactory runnerFactory, + address address, + nodeAddMutex sync.Locker, +) error { + tracef(`connecting to address: '%s'`, address) + + runner, err := runnerFactory(address) + if err != nil { + return hierr.Errorf( + err, + `can't create runner for address: %s`, + address, + ) + } + + nodeAddMutex.Lock() + defer nodeAddMutex.Unlock() + + cluster.addNodeRunner(runner, address) + + return nil }