From b9d188d7bf8ab838a88fd981e4c2c8081699974a Mon Sep 17 00:00:00 2001 From: Yosi Attias Date: Sat, 26 Mar 2016 19:18:35 +0300 Subject: [PATCH] TaskStore changes: * Enabling and disabling tasks by glob, for example: `kapacitor enable window_*` * Deleting tasks by glob * Adding FindTasks method for finding tasks by predicate --- CHANGELOG.md | 1 + cmd/kapacitor/main.go | 45 +++++++++- services/task_store/service.go | 152 +++++++++++++++++++++++++++++---- 3 files changed, 180 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 940aece5b..180080bdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ format a TICKscript file according to a common standard. - [#299](https://github.com/influxdata/kapacitor/issues/299): Changes TICKscript chaining method operators and adds `tickfmt` binary. - [#389](https://github.com/influxdata/kapacitor/pull/389): Adds benchmarks to Kapacitor for basic use cases. - [#390](https://github.com/influxdata/kapacitor/issues/390): BREAKING: Remove old `.mapReduce` functions. +- [#381](https://github.com/influxdata/kapacitor/pull/381): Adding enable/disable/delete/reload tasks by glob. ### Bugfixes diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index a5598956a..d1d21acff 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -639,6 +639,16 @@ func enableUsage() { var u = `Usage: kapacitor enable [task name...] Enable and start a task running from the live data. + +For example: + + You can enable by specific task name. + + $ kapacitor enable cpu_alert + + Or, you can enable by glob: + + $ kapacitor enable *_alert ` fmt.Fprintln(os.Stderr, u) } @@ -678,6 +688,16 @@ func disableUsage() { var u = `Usage: kapacitor disable [task name...] Disable and stop a task running. + +For example: + + You can disable by specific task name. + + $ kapacitor disable cpu_alert + + Or, you can disable by glob: + + $ kapacitor disable *_alert ` fmt.Fprintln(os.Stderr, u) } @@ -717,6 +737,16 @@ func reloadUsage() { var u = `Usage: kapacitor reload [task name...] Disable then enable a running task. + +For example: + + You can reload by specific task name. + + $ kapacitor reload cpu_alert + + Or, you can reload by glob: + + $ kapacitor reload *_alert ` fmt.Fprintln(os.Stderr, u) } @@ -874,7 +904,20 @@ func deleteUsage() { Delete a task or recording. - If a task is enabled it will be disabled and then deleted. + If a task is enabled it will be disabled and then deleted, +For example: + + You can delete task: + + $ kapacitor delete tasks my_task + + Or you can delete tasks by glob: + + $ kapacitor delete tasks *_alert + + You can delete recordings: + + $ kapacitor delete recordings b0a2ba8a-aeeb-45ec-bef9-1a2939963586 ` fmt.Fprintln(os.Stderr, u) } diff --git a/services/task_store/service.go b/services/task_store/service.go index 65ca5a2c0..a017e37a5 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "path" + "path/filepath" "strconv" "strings" "time" @@ -523,9 +524,9 @@ func (ts *Service) Save(task *rawTask) error { return err } -func (ts *Service) Delete(name string) error { - +func (ts *Service) deleteTask(name string) error { ts.TaskMaster.StopTask(name) + return ts.db.Update(func(tx *bolt.Tx) error { tb := tx.Bucket(tasksBucket) if tb != nil { @@ -543,6 +544,30 @@ func (ts *Service) Delete(name string) error { }) } +func (ts *Service) Delete(pattern string) error { + rawTasks, err := ts.FindTasks(func(taskName string) (bool, error) { + matched, err := filepath.Match(pattern, taskName) + if err != nil { + return false, err + } + + return matched, nil + }) + + if err != nil { + return nil + } + + for _, rawTask := range rawTasks { + err = ts.deleteTask(rawTask.Name) + if err != nil { + return err + } + } + + return nil +} + func (ts *Service) LoadRaw(name string) (*rawTask, error) { var data []byte err := ts.db.View(func(tx *bolt.Tx) error { @@ -577,6 +602,11 @@ func (ts *Service) Load(name string) (*kapacitor.Task, error) { if err != nil { return nil, err } + + return ts.CreateTaskFromRaw(task) +} + +func (ts *Service) CreateTaskFromRaw(task *rawTask) (*kapacitor.Task, error) { return ts.TaskMaster.NewTask(task.Name, task.TICKscript, task.Type, @@ -585,22 +615,22 @@ func (ts *Service) Load(name string) (*kapacitor.Task, error) { ) } -func (ts *Service) Enable(name string) error { - // Load the task - t, err := ts.Load(name) +func (ts *Service) enableRawTask(rawTask *rawTask) error { + t, err := ts.CreateTaskFromRaw(rawTask) if err != nil { return err } - var enabled bool // Save the enabled state + var enabled bool + err = ts.db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucketIfNotExists(enabledBucket) if err != nil { return err } - enabled = b.Get([]byte(name)) != nil - err = b.Put([]byte(name), []byte{}) + enabled = b.Get([]byte(t.Name)) != nil + err = b.Put([]byte(t.Name), []byte{}) if err != nil { return err } @@ -619,6 +649,31 @@ func (ts *Service) Enable(name string) error { return nil } +func (ts *Service) Enable(pattern string) error { + // Find the matching tasks + rawTasks, err := ts.FindTasks(func(taskName string) (bool, error) { + matched, err := filepath.Match(pattern, taskName) + if err != nil { + return false, err + } + + return matched, nil + }) + + if err != nil { + return nil + } + + for _, rawTask := range rawTasks { + err = ts.enableRawTask(rawTask) + if err != nil { + return nil + } + } + + return nil +} + func (ts *Service) StartTask(t *kapacitor.Task) error { // Starting task, remove last error ts.SaveLastError(t.Name, "") @@ -673,27 +728,52 @@ func (ts *Service) SaveLastError(name string, errStr string) error { return nil } -func (ts *Service) Disable(name string) error { +func (ts *Service) Disable(pattern string) error { + // Find the matching tasks + rawTasks, err := ts.FindTasks(func(taskName string) (bool, error) { + matched, err := filepath.Match(pattern, taskName) + if err != nil { + return false, err + } + + return matched, nil + }) + + if err != nil { + return nil + } + // Delete the enabled state - err := ts.db.Update(func(tx *bolt.Tx) error { + err = ts.db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucketIfNotExists(enabledBucket) if err != nil { return err } - enabled := b.Get([]byte(name)) != nil - if enabled { - err = b.Delete([]byte(name)) - if err != nil { - return err + for _, rawTask := range rawTasks { + enabled := b.Get([]byte(rawTask.Name)) != nil + if enabled { + err = b.Delete([]byte(rawTask.Name)) + if err != nil { + return err + } + kapacitor.NumEnabledTasksVar.Add(-1) } - kapacitor.NumEnabledTasksVar.Add(-1) } return nil }) + if err != nil { return err } - return ts.TaskMaster.StopTask(name) + + for _, rawTask := range rawTasks { + err = ts.TaskMaster.StopTask(rawTask.Name) + if err != nil { + return err + } + } + + return nil } type TaskSummaryInfo struct { @@ -714,6 +794,44 @@ func (ts *Service) IsEnabled(name string) (e bool) { return } +// Returns all taskInfo of task name that matches the predicate +func (ts *Service) FindTasks(predicate func(string) (bool, error)) ([]*rawTask, error) { + rawTasks := make([]*rawTask, 0) + + err := ts.db.View(func(tx *bolt.Tx) error { + tb := tx.Bucket([]byte(tasksBucket)) + if tb == nil { + return nil + } + + return tb.ForEach(func(k, v []byte) error { + taskName := string(k) + isMatched, err := predicate(taskName) + if err != nil { + return err + } + if !isMatched { + return nil + } + + // Grab task info + t, err := ts.LoadRaw(taskName) + if err != nil { + return fmt.Errorf("found invalid task in db. name: %s, err: %s", string(k), err) + } + + rawTasks = append(rawTasks, t) + return nil + }) + + }) + if err != nil { + return nil, err + } + + return rawTasks, nil +} + func (ts *Service) GetTaskSummaryInfo(tasks []string) ([]TaskSummaryInfo, error) { taskInfos := make([]TaskSummaryInfo, 0)