Skip to content

Commit

Permalink
Add Consul-based locking (closes #17)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed May 23, 2017
1 parent 66a858b commit d5721c4
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ language: go
go:
- 1.8

before_script:
- wget https://releases.hashicorp.com/consul/0.8.3/consul_0.8.3_linux_amd64.zip
- unzip consul_0.8.3_linux_amd64.zip
- export PATH=$PATH:$PWD

script:
- make all
- make goveralls
Expand Down
48 changes: 46 additions & 2 deletions src/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package main

import (
"errors"
"os"
"path/filepath"

"github.com/hashicorp/consul/api"
"github.com/nightlyone/lockfile"
)

Expand All @@ -40,12 +42,54 @@ func InitFileLock(name string) (Lock, error) {
return &FileLock{lock: lock}, nil
}

// TryLock tries to lock the file
// TryLock tries to acquire a lock on a file
func (fl FileLock) TryLock() error {
return fl.lock.TryLock()
}

// Unlock tries to unlock the file
// Unlock tries to release the lock on a file
func (fl FileLock) Unlock() error {
return fl.lock.Unlock()
}

// ConsulLock is for Consul-based locks
type ConsulLock struct {
lock *api.Lock
}

// InitConsulLock builds a ConsulLock (a KV pair in Consul) with the name argument as key
func InitConsulLock(consulAddress, name string) (Lock, error) {
client, err := api.NewClient(&api.Config{Address: consulAddress})
if err != nil {
return nil, err
}

opts := &api.LockOptions{
Key: name,
LockTryOnce: true,
}

lock, err := client.LockOpts(opts)
if err != nil {
return nil, err
}
return &ConsulLock{lock: lock}, nil
}

// TryLock tries to acquire a lock from Consul
func (cl ConsulLock) TryLock() error {
stopCh := make(chan struct{})
leaderCh, err := cl.lock.Lock(stopCh)
if err != nil {
return err
}
if leaderCh == nil {
return errors.New("Lock held by another process")
}
return nil
}

// Unlock tries to release the lock from Consul
func (cl ConsulLock) Unlock() error {
return cl.lock.Unlock()
}
70 changes: 70 additions & 0 deletions src/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,80 @@ import (
"strconv"
"testing"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/nightlyone/lockfile"
"github.com/stretchr/testify/assert"
)

func makeClient(t *testing.T) (*api.Client, *testutil.TestServer) {
// Make client config
conf := api.DefaultConfig()
// Create server
server, err := testutil.NewTestServerConfigT(t, nil)
if err != nil {
t.Fatal(err)
}
conf.Address = server.HTTPAddr

// Create client
client, err := api.NewClient(conf)
if err != nil {
t.Fatalf("err: %v", err)
}

return client, server
}

func TestConsulLock(t *testing.T) {
assert := assert.New(t)

c, s := makeClient(t)
assert.NotNil(c)
assert.NotNil(s)
defer s.Stop()

lockName := "lock"

cl, err := InitConsulLock("some://faulty.address", lockName)
assert.NotNil(err)
assert.Nil(cl)
assert.Equal("Unknown protocol scheme: some", err.Error())

cl, err = InitConsulLock(s.HTTPAddr, "")
assert.NotNil(err)
assert.Nil(cl)
assert.Equal("missing key", err.Error())

cl, err = InitConsulLock(s.HTTPAddr, lockName)
assert.Nil(err)
assert.NotNil(cl)

err = cl.TryLock()
assert.Nil(err)

// fail if already locked
err = cl.TryLock()
assert.NotNil(err)
assert.Equal(api.ErrLockHeld, err)

// fail if already locked by another lock
ocl, err := InitConsulLock(s.HTTPAddr, lockName)
assert.Nil(err)
assert.NotNil(ocl)
err = ocl.TryLock()
assert.NotNil(err)
assert.Equal("Lock held by another process", err.Error())

// fail if already unlocked
err = cl.Unlock()
assert.Nil(err)

err = cl.Unlock()
assert.NotNil(err)
assert.Equal(api.ErrLockNotHeld, err)
}

func TestFileLock(t *testing.T) {
assert := assert.New(t)

Expand Down
58 changes: 56 additions & 2 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
fVars = "vars"
fAsync = "async"
fLogLevel = "log-level"
fLock = "lock"
fConsul = "consul"
)

func main() {
Expand Down Expand Up @@ -111,13 +113,17 @@ func main() {
getEmrPlaybookFlag(),
getEmrClusterFlag(),
getAsyncFlag(),
getLockFlag(),
getConsulFlag(),
getVarsFlag(),
},
Action: func(c *cli.Context) error {
err := run(
c.String(fEmrPlaybook),
c.String(fEmrCluster),
c.Bool(fAsync),
c.String(fLock),
c.String(fConsul),
c.String(fVars),
)
checkErr(err)
Expand Down Expand Up @@ -152,6 +158,8 @@ func main() {
Flags: []cli.Flag{
getEmrConfigFlag(),
getEmrPlaybookFlag(),
getLockFlag(),
getConsulFlag(),
getVarsFlag(),
},
Action: func(c *cli.Context) error {
Expand All @@ -163,7 +171,9 @@ func main() {
checkErr(err1)
log.Info("EMR cluster launched successfully; Jobflow ID: " + jobflowID)

err2 := run(emrPlaybook, jobflowID, false, vars)
lock := c.String(fLock)
consul := c.String(fConsul)
err2 := run(emrPlaybook, jobflowID, false, lock, consul, vars)
if err2 != nil {
log.Error(err2.Error())
} else {
Expand Down Expand Up @@ -211,6 +221,20 @@ func getAsyncFlag() cli.BoolFlag {
return cli.BoolFlag{Name: fAsync, Usage: "Asynchronous execution of the jobflow steps"}
}

func getLockFlag() cli.StringFlag {
return cli.StringFlag{
Name: fLock,
Usage: "Name of the lock held for the duration of the jobflow steps",
}
}

func getConsulFlag() cli.StringFlag {
return cli.StringFlag{
Name: fConsul,
Usage: "Address of the Consul server used for distributed locking for the duration of the jobflow steps",
}
}

// --- Commands

// up launches a new EMR cluster
Expand Down Expand Up @@ -244,19 +268,33 @@ func up(emrConfig string, vars string) (string, error) {
}

// run adds steps to an EMR cluster
func run(emrPlaybook string, emrCluster string, async bool, vars string) error {
func run(emrPlaybook string, emrCluster string, async bool, lock string, consul string, vars string) error {
if emrPlaybook == "" {
return flagToError(fEmrPlaybook)
}
if emrCluster == "" {
return flagToError(fEmrCluster)
}
if consul != "" && lock == "" {
return errors.New("--" + fLock + " is needed to make use of --" + fConsul)
}

varMap, err := varsToMap(vars)
if err != nil {
return err
}

if !async && lock != "" {
l, err := getLock(consul, lock)
err = l.TryLock()
if err != nil {
return err
}
defer l.Unlock()
} else if lock != "" {
return errors.New("--" + fAsync + " and --" + fLock + " are not compatible")
}

ar := getNewConfigResolver()

playbookRecord, err := ar.ParsePlaybookRecordFromFile(emrPlaybook, varMap)
Expand Down Expand Up @@ -341,10 +379,26 @@ func checkErr(err error) {
}
}

// getLogLevelKeys builds an array of the available log levels
func getLogLevelKeys(logLevels map[string]log.Level) []string {
keys := make([]string, 0, len(logLevels))
for k := range logLevels {
keys = append(keys, k)
}
return keys
}

// getLock builds a file-based or consul-based lock
func getLock(consul, lock string) (Lock, error) {
var l Lock
var err error
if consul != "" {
l, err = InitConsulLock(consul, lock)
} else {
l, err = InitFileLock(lock)
}
if err != nil {
return nil, err
}
return l, nil
}

0 comments on commit d5721c4

Please sign in to comment.