Skip to content

Commit

Permalink
Feature/move lockers to sub module (#466)
Browse files Browse the repository at this point in the history
* move redis locker into sub module to keep gocron deps clean

* remove example that imports dependency
  • Loading branch information
JohnRoesler authored May 5, 2023
1 parent 019f6e1 commit 1f411e2
Show file tree
Hide file tree
Showing 10 changed files with 541 additions and 477 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ There are several options available to restrict how jobs run:
| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler |
| Distributed locking (BETA) | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler |

## Distributed Locker Implementations

- Redis: [redis.go](lockers/redislock/redislock.go) `go get github.com/go-co-op/gocron/lockers/redislock`

## Tags

Jobs may have arbitrary tags added which can be useful when tracking many jobs.
Expand Down
31 changes: 15 additions & 16 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/go-co-op/gocron"
"github.com/redis/go-redis/v9"
)

var task = func() {}
Expand Down Expand Up @@ -925,21 +924,21 @@ func ExampleScheduler_Weeks() {
}

func ExampleScheduler_WithDistributedLocker() {
redisOptions := &redis.Options{
Addr: "localhost:6379",
}
redisClient := redis.NewClient(redisOptions)
locker, err := gocron.NewRedisLocker(redisClient)
if err != nil {
// handle the error
}

s := gocron.NewScheduler(time.UTC)
s.WithDistributedLocker(locker)
_, err = s.Every("500ms").Do(task)
if err != nil {
// handle the error
}
//redisOptions := &redis.Options{
// Addr: "localhost:6379",
//}
//redisClient := redis.NewClient(redisOptions)
//locker, err := redislock.NewRedisLocker(redisClient)
//if err != nil {
// // handle the error
//}
//
//s := gocron.NewScheduler(time.UTC)
//s.WithDistributedLocker(locker)
//_, err = s.Every("500ms").Do(task)
//if err != nil {
// // handle the error
//}
}

// ---------------------------------------------------------------------
Expand Down
43 changes: 3 additions & 40 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,15 @@ module github.com/go-co-op/gocron
go 1.20

require (
github.com/go-redsync/redsync/v4 v4.8.1
github.com/redis/go-redis/v9 v9.0.2
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go/modules/redis v0.0.0-20230424150504-5185956fa1de
)

replace github.com/testcontainers/testcontainers-go v0.19.0 => github.com/testcontainers/testcontainers-go v0.0.0-20230424150504-5185956fa1de

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.19 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v23.0.3+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.0.0-20221128092401-c43b287e0e0f // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/testcontainers/testcontainers-go v0.19.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
309 changes: 7 additions & 302 deletions go.sum

Large diffs are not rendered by default.

52 changes: 0 additions & 52 deletions locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ package gocron
import (
"context"
"errors"
"fmt"

"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
)

var (
Expand All @@ -26,50 +21,3 @@ type Locker interface {
type Lock interface {
Unlock(ctx context.Context) error
}

// NewRedisLocker provides an implementation of the Locker interface using
// redis for storage.
func NewRedisLocker(r *redis.Client) (Locker, error) {
if err := r.Ping(context.Background()).Err(); err != nil {
return nil, fmt.Errorf("%s: %w", ErrFailedToConnectToRedis, err)
}
pool := goredis.NewPool(r)
rs := redsync.New(pool)
return &redisLocker{rs: rs}, nil
}

var _ Locker = (*redisLocker)(nil)

type redisLocker struct {
rs *redsync.Redsync
}

func (r *redisLocker) Lock(ctx context.Context, key string) (Lock, error) {
mu := r.rs.NewMutex(key, redsync.WithTries(1))
err := mu.LockContext(ctx)
if err != nil {
return nil, ErrFailedToObtainLock
}
rl := &redisLock{
mu: mu,
}
return rl, nil
}

var _ Lock = (*redisLock)(nil)

type redisLock struct {
mu *redsync.Mutex
}

func (r *redisLock) Unlock(ctx context.Context) error {
unlocked, err := r.mu.UnlockContext(ctx)
if err != nil {
return ErrFailedToReleaseLock
}
if !unlocked {
return ErrFailedToReleaseLock
}

return nil
}
67 changes: 0 additions & 67 deletions locker_test.go
Original file line number Diff line number Diff line change
@@ -1,68 +1 @@
package gocron

import (
"context"
"strings"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
testcontainersredis "github.com/testcontainers/testcontainers-go/modules/redis"
)

func TestEnableDistributedLocking(t *testing.T) {
ctx := context.Background()
redisContainer, err := testcontainersredis.RunContainer(ctx)
require.NoError(t, err)
t.Cleanup(func() {
if err := redisContainer.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

uri, err := redisContainer.ConnectionString(ctx)
require.NoError(t, err)

resultChan := make(chan int, 10)
f := func(schedulerInstance int) {
resultChan <- schedulerInstance
}

redisClient := redis.NewClient(&redis.Options{Addr: strings.TrimPrefix(uri, "redis://")})
l, err := NewRedisLocker(redisClient)
require.NoError(t, err)

s1 := NewScheduler(time.UTC)
s1.WithDistributedLocker(l)
_, err = s1.Every("500ms").Do(f, 1)
require.NoError(t, err)

s2 := NewScheduler(time.UTC)
s2.WithDistributedLocker(l)
_, err = s2.Every("500ms").Do(f, 2)
require.NoError(t, err)

s3 := NewScheduler(time.UTC)
s3.WithDistributedLocker(l)
_, err = s3.Every("500ms").Do(f, 3)
require.NoError(t, err)

s1.StartAsync()
s2.StartAsync()
s3.StartAsync()

time.Sleep(1700 * time.Millisecond)

s1.Stop()
s2.Stop()
s3.Stop()
close(resultChan)

var results []int
for r := range resultChan {
results = append(results, r)
}
assert.Len(t, results, 4)
}
54 changes: 54 additions & 0 deletions lockers/redislock/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
module github.com/go-co-op/gocron/lockers/redislock

go 1.20

require (
github.com/go-co-op/gocron v1.24.0
github.com/go-redsync/redsync/v4 v4.8.1
github.com/redis/go-redis/v9 v9.0.4
github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go/modules/redis v0.0.0-20230503220718-0df60de8ccd8
)

replace github.com/testcontainers/testcontainers-go v0.19.0 => github.com/testcontainers/testcontainers-go v0.0.0-20230503220718-0df60de8ccd8

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.19 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v23.0.3+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.0.0-20221128092401-c43b287e0e0f // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/testcontainers/testcontainers-go v0.19.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 1f411e2

Please sign in to comment.