From ce42c745a6bfb5728b2aacaff81d1926085cb2ae Mon Sep 17 00:00:00 2001 From: Gergely Brautigam Date: Mon, 24 Sep 2018 07:18:01 +0200 Subject: [PATCH 1/4] Fixed scheduling ID of pipelines --- scheduler/scheduler.go | 3 --- store/pipeline.go | 30 +++++++----------------------- store/store_test.go | 4 ++-- 3 files changed, 9 insertions(+), 28 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d44ca63a..179287c7 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -253,9 +253,6 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*g return nil, err } - // increment by one - highestID++ - // Get jobs jobs, err := s.getPipelineJobs(p) if err != nil { diff --git a/store/pipeline.go b/store/pipeline.go index 6ad7bef7..d69ab3eb 100644 --- a/store/pipeline.go +++ b/store/pipeline.go @@ -141,31 +141,15 @@ func (s *BoltStore) PipelineGetByName(n string) (*gaia.Pipeline, error) { func (s *BoltStore) PipelineGetRunHighestID(p *gaia.Pipeline) (int, error) { var highestID int - return highestID, s.db.View(func(tx *bolt.Tx) error { + return highestID, s.db.Update(func(tx *bolt.Tx) error { // Get Bucket b := tx.Bucket(pipelineRunBucket) - - // Iterate all pipeline runs. - return b.ForEach(func(k, v []byte) error { - // create single run object - r := &gaia.PipelineRun{} - - // Unmarshal - err := json.Unmarshal(v, r) - if err != nil { - return err - } - - // Is this a run from our pipeline? - if r.PipelineID == p.ID { - // Check if the id is higher than what we found before? - if r.ID > highestID { - highestID = r.ID - } - } - - return nil - }) + id, err := b.NextSequence() + if err != nil { + return err + } + highestID = int(id) + return nil }) } diff --git a/store/store_test.go b/store/store_test.go index 5c416b34..d9858bec 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -320,7 +320,7 @@ func TestPipelineGetRunHighestID(t *testing.T) { } pipelineRun1 := &gaia.PipelineRun{ - ID: 1, + ID: 0, PipelineID: 1, Status: gaia.RunRunning, UniqueID: uuid.Must(uuid.NewV4(), nil).String(), @@ -332,7 +332,7 @@ func TestPipelineGetRunHighestID(t *testing.T) { } pipelineRun2 := &gaia.PipelineRun{ - ID: 2, + ID: 1, PipelineID: 1, Status: gaia.RunRunning, UniqueID: uuid.Must(uuid.NewV4(), nil).String(), From 5c267a3b0c3cd618f6caeb608b0739d1ba8ef553 Mon Sep 17 00:00:00 2001 From: Gergely Brautigam Date: Mon, 24 Sep 2018 22:29:06 +0200 Subject: [PATCH 2/4] Implemented a sempahore and added some explanations. --- scheduler/scheduler.go | 28 ++++++++++++++++++++++++++++ store/pipeline.go | 30 +++++++++++++++++++++++------- store/store_test.go | 4 ++-- 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 179287c7..92ff270a 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -243,9 +243,34 @@ func (s *Scheduler) schedule() { } } +var schedulerRunningSemaphore = make(chan bool, 0) +var schedulerRunning = false + // SchedulePipeline schedules a pipeline. We create a new schedule object // and save it in our store. The scheduler will later pick this up and will continue the work. func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*gaia.PipelineRun, error) { + + // Introduce a semaphore locking here because this function can be called + // in parallel if multiple users happen to trigger a pipeline run at the same time. + // (or someone is just simply eager and presses (Start Pipeline) in quick successions). + // This means that one of the calls will take slightly longer (a couple of nanoseconds) + // while the other finishes to save the pipelinerun. + // This is to ensure that the highest ID for the next pipeline is calculated properly. + if schedulerRunning { + for schedulerRunning { + <-schedulerRunningSemaphore + schedulerRunning = false + } + } + schedulerRunning = true + defer func() { + select { + case schedulerRunningSemaphore <- true: + default: + } + schedulerRunning = false + }() + // Get highest public id used for this pipeline highestID, err := s.storeService.PipelineGetRunHighestID(p) if err != nil { @@ -253,6 +278,9 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*g return nil, err } + // increment by one + highestID++ + // Get jobs jobs, err := s.getPipelineJobs(p) if err != nil { diff --git a/store/pipeline.go b/store/pipeline.go index d69ab3eb..6ad7bef7 100644 --- a/store/pipeline.go +++ b/store/pipeline.go @@ -141,15 +141,31 @@ func (s *BoltStore) PipelineGetByName(n string) (*gaia.Pipeline, error) { func (s *BoltStore) PipelineGetRunHighestID(p *gaia.Pipeline) (int, error) { var highestID int - return highestID, s.db.Update(func(tx *bolt.Tx) error { + return highestID, s.db.View(func(tx *bolt.Tx) error { // Get Bucket b := tx.Bucket(pipelineRunBucket) - id, err := b.NextSequence() - if err != nil { - return err - } - highestID = int(id) - return nil + + // Iterate all pipeline runs. + return b.ForEach(func(k, v []byte) error { + // create single run object + r := &gaia.PipelineRun{} + + // Unmarshal + err := json.Unmarshal(v, r) + if err != nil { + return err + } + + // Is this a run from our pipeline? + if r.PipelineID == p.ID { + // Check if the id is higher than what we found before? + if r.ID > highestID { + highestID = r.ID + } + } + + return nil + }) }) } diff --git a/store/store_test.go b/store/store_test.go index d9858bec..5c416b34 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -320,7 +320,7 @@ func TestPipelineGetRunHighestID(t *testing.T) { } pipelineRun1 := &gaia.PipelineRun{ - ID: 0, + ID: 1, PipelineID: 1, Status: gaia.RunRunning, UniqueID: uuid.Must(uuid.NewV4(), nil).String(), @@ -332,7 +332,7 @@ func TestPipelineGetRunHighestID(t *testing.T) { } pipelineRun2 := &gaia.PipelineRun{ - ID: 1, + ID: 2, PipelineID: 1, Status: gaia.RunRunning, UniqueID: uuid.Must(uuid.NewV4(), nil).String(), From 926f2b71d1f50ca90c7fca970e291d88e4f4209d Mon Sep 17 00:00:00 2001 From: Gergely Brautigam Date: Tue, 25 Sep 2018 06:40:28 +0200 Subject: [PATCH 3/4] Added unit test to semaphore logic. --- scheduler/scheduler_test.go | 54 +++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index cae6183c..0d60d2f4 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os/exec" "path/filepath" + "sync" "testing" "github.com/gaia-pipeline/gaia" @@ -260,6 +261,59 @@ func TestSchedulePipeline(t *testing.T) { } } +func TestSchedulePipelineParallel(t *testing.T) { + gaia.Cfg = &gaia.Config{} + storeInstance := store.NewBoltStore() + tmp, _ := ioutil.TempDir("", "TestSchedulePipeline") + gaia.Cfg.DataPath = tmp + gaia.Cfg.WorkspacePath = filepath.Join(tmp, "tmp") + gaia.Cfg.Bolt.Mode = 0600 + gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: hclog.DefaultOutput, + Name: "Gaia", + }) + gaia.Cfg.Worker = "2" + if err := storeInstance.Init(); err != nil { + t.Fatal(err) + } + p1 := gaia.Pipeline{ + ID: 0, + Name: "Test Pipeline 1", + Type: gaia.PTypeGolang, + Jobs: prepareJobs(), + } + p2 := gaia.Pipeline{ + ID: 1, + Name: "Test Pipeline 2", + Type: gaia.PTypeGolang, + Jobs: prepareJobs(), + } + storeInstance.PipelinePut(&p1) + storeInstance.PipelinePut(&p2) + s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{}) + err := s.Init() + if err != nil { + t.Fatal(err) + } + var run1 *gaia.PipelineRun + var run2 *gaia.PipelineRun + var wg sync.WaitGroup + wg.Add(2) + go func() { + run1, _ = s.SchedulePipeline(&p1, prepareArgs()) + wg.Done() + }() + go func() { + run2, _ = s.SchedulePipeline(&p2, prepareArgs()) + wg.Done() + }() + wg.Wait() + if run1.ID == run2.ID { + t.Fatal("the two run jobs id should not have equalled. was: ", run1.ID, run2.ID) + } +} + func TestSchedule(t *testing.T) { gaia.Cfg = &gaia.Config{} storeInstance := store.NewBoltStore() From 2c82d8cdc924a7a53628019d18f310d3e6e104ab Mon Sep 17 00:00:00 2001 From: Gergely Brautigam Date: Tue, 25 Sep 2018 07:59:48 +0200 Subject: [PATCH 4/4] Using an RWMutex --- scheduler/scheduler.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 92ff270a..f8401621 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -7,6 +7,7 @@ import ( "os/exec" "path/filepath" "strconv" + "sync" "time" "github.com/gaia-pipeline/gaia" @@ -243,8 +244,7 @@ func (s *Scheduler) schedule() { } } -var schedulerRunningSemaphore = make(chan bool, 0) -var schedulerRunning = false +var schedulerLock = sync.RWMutex{} // SchedulePipeline schedules a pipeline. We create a new schedule object // and save it in our store. The scheduler will later pick this up and will continue the work. @@ -256,20 +256,8 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*g // This means that one of the calls will take slightly longer (a couple of nanoseconds) // while the other finishes to save the pipelinerun. // This is to ensure that the highest ID for the next pipeline is calculated properly. - if schedulerRunning { - for schedulerRunning { - <-schedulerRunningSemaphore - schedulerRunning = false - } - } - schedulerRunning = true - defer func() { - select { - case schedulerRunningSemaphore <- true: - default: - } - schedulerRunning = false - }() + schedulerLock.Lock() + defer schedulerLock.Unlock() // Get highest public id used for this pipeline highestID, err := s.storeService.PipelineGetRunHighestID(p)