diff --git a/CHANGELOG.md b/CHANGELOG.md index 39d82fd1c..270ce0760 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ kapacitor define-handler system aggregate_by_1m.yaml The change is completely breaking for the technical preview alerting service, a.k.a. the new alert topic handler features. The change boils down to simplifying how you define and interact with topics. Alert handlers now only ever have a single action and belong to a single topic. + An automatic migration from old to new handler definitions will be performed during startup. See the updated API docs. ### Bugfixes diff --git a/services/alert/dao.go b/services/alert/dao.go index 2ec6f6ff5..8b952c91f 100644 --- a/services/alert/dao.go +++ b/services/alert/dao.go @@ -21,24 +21,29 @@ var ( type HandlerSpecDAO interface { // Retrieve a handler Get(topic, id string) (HandlerSpec, error) + GetTx(tx storage.ReadOnlyTx, topic, id string) (HandlerSpec, error) // Create a handler. // ErrHandlerSpecExists is returned if a handler already exists with the same ID. Create(h HandlerSpec) error + CreateTx(tx storage.Tx, h HandlerSpec) error // Replace an existing handler. // ErrNoHandlerSpecExists is returned if the handler does not exist. Replace(h HandlerSpec) error + ReplaceTx(tx storage.Tx, h HandlerSpec) error // Delete a handler. // It is not an error to delete an non-existent handler. Delete(topic, id string) error + DeleteTx(tx storage.Tx, topic, id string) error // List handlers matching a pattern. // The pattern is shell/glob matching see https://golang.org/pkg/path/#Match // Offset and limit are pagination bounds. Offset is inclusive starting at index 0. // More results may exist while the number of returned items is equal to limit. List(topic, pattern string, offset, limit int) ([]HandlerSpec, error) + ListTx(tx storage.ReadOnlyTx, topic, pattern string, offset, limit int) ([]HandlerSpec, error) } //-------------------------------------------------------------------- @@ -113,8 +118,12 @@ type handlerSpecKV struct { store *storage.IndexedStore } +const ( + handlerPrefix = "handlers" +) + func newHandlerSpecKV(store storage.Interface) (*handlerSpecKV, error) { - c := storage.DefaultIndexedStoreConfig("handlers", func() storage.BinaryObject { + c := storage.DefaultIndexedStoreConfig(handlerPrefix, func() storage.BinaryObject { return new(HandlerSpec) }) istore, err := storage.NewIndexedStore(store, c) @@ -136,7 +145,13 @@ func (kv *handlerSpecKV) error(err error) error { } func (kv *handlerSpecKV) Get(topic, id string) (HandlerSpec, error) { - o, err := kv.store.Get(fullID(topic, id)) + return kv.getHelper(kv.store.Get(fullID(topic, id))) +} +func (kv *handlerSpecKV) GetTx(tx storage.ReadOnlyTx, topic, id string) (HandlerSpec, error) { + return kv.getHelper(kv.store.GetTx(tx, fullID(topic, id))) +} + +func (kv *handlerSpecKV) getHelper(o storage.BinaryObject, err error) (HandlerSpec, error) { if err != nil { return HandlerSpec{}, kv.error(err) } @@ -150,20 +165,37 @@ func (kv *handlerSpecKV) Get(topic, id string) (HandlerSpec, error) { func (kv *handlerSpecKV) Create(h HandlerSpec) error { return kv.store.Create(&h) } +func (kv *handlerSpecKV) CreateTx(tx storage.Tx, h HandlerSpec) error { + return kv.store.CreateTx(tx, &h) +} func (kv *handlerSpecKV) Replace(h HandlerSpec) error { return kv.store.Replace(&h) } +func (kv *handlerSpecKV) ReplaceTx(tx storage.Tx, h HandlerSpec) error { + return kv.store.ReplaceTx(tx, &h) +} func (kv *handlerSpecKV) Delete(topic, id string) error { return kv.store.Delete(fullID(topic, id)) } +func (kv *handlerSpecKV) DeleteTx(tx storage.Tx, topic, id string) error { + return kv.store.DeleteTx(tx, fullID(topic, id)) +} func (kv *handlerSpecKV) List(topic, pattern string, offset, limit int) ([]HandlerSpec, error) { if pattern == "" { pattern = "*" } - objects, err := kv.store.List(storage.DefaultIDIndex, fullID(topic, pattern), offset, limit) + return kv.listHelper(kv.store.List(storage.DefaultIDIndex, fullID(topic, pattern), offset, limit)) +} +func (kv *handlerSpecKV) ListTx(tx storage.ReadOnlyTx, topic, pattern string, offset, limit int) ([]HandlerSpec, error) { + if pattern == "" { + pattern = "*" + } + return kv.listHelper(kv.store.ListTx(tx, storage.DefaultIDIndex, fullID(topic, pattern), offset, limit)) +} +func (kv *handlerSpecKV) listHelper(objects []storage.BinaryObject, err error) ([]HandlerSpec, error) { if err != nil { return nil, err } diff --git a/services/alert/service.go b/services/alert/service.go index bf8e47b0a..ff80117d9 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -1,9 +1,11 @@ package alert import ( + "encoding/json" "fmt" "log" "path" + "regexp" "sync" "github.com/influxdata/kapacitor/alert" @@ -46,6 +48,7 @@ type Service struct { StorageService interface { Store(namespace string) storage.Interface + Versions() storage.Versions } Commander command.Commander @@ -128,6 +131,11 @@ func (s *Service) Open() error { } s.topicsDAO = topicsDAO + // Migrate v1.2 handlers + if err := s.migrateHandlerSpecs(store); err != nil { + return err + } + // Load saved handlers if err := s.loadSavedHandlerSpecs(); err != nil { return err @@ -153,6 +161,131 @@ func (s *Service) Close() error { return s.APIServer.Close() } +const ( + handlerSpecsStoreVersion = "alert_topic_handler_specs" + handlerSpecsStoreVersion1 = "1" +) + +func (s *Service) migrateHandlerSpecs(store storage.Interface) error { + specVersion, err := s.StorageService.Versions().Get(handlerSpecsStoreVersion) + if err != nil && err != storage.ErrNoKeyExists { + return err + } + if specVersion == handlerSpecsStoreVersion1 { + // Already migrated + return nil + } + s.logger.Println("D! migrating old v1.2 handler specs") + + // v1.2 HandlerActionSpec + type oldHandlerActionSpec struct { + Kind string `json:"kind"` + Options map[string]interface{} `json:"options"` + } + + // v1.2 HandlerSpec + type oldHandlerSpec struct { + ID string `json:"id"` + Topics []string `json:"topics"` + Actions []oldHandlerActionSpec `json:"actions"` + } + oldDataPrefix := "/" + handlerPrefix + "/data" + oldKeyPattern := regexp.MustCompile(fmt.Sprintf(`^%s/[-\._\p{L}0-9]+$`, oldDataPrefix)) + + // Process to migrate to new handler specs: + // 1. Gather all old handlers + // 2. Define new handlers that are equivalent + // 3. Check that there are no ID conflicts + // 4. Save the new handlers + // 5. Delete the old specs + // + // All steps are performed in a single transaction, + // so it can be rolledback in case of an error. + err = store.Update(func(tx storage.Tx) error { + var newHandlers []HandlerSpec + kvs, err := tx.List(oldDataPrefix) + if err != nil { + return err + } + s.logger.Printf("D! found %d handler rows", len(kvs)) + + var oldKeys []string + for _, kv := range kvs { + if !oldKeyPattern.MatchString(kv.Key) { + s.logger.Println("D! found new handler skipping:", kv.Key) + continue + } + oldKeys = append(oldKeys, kv.Key) + var old oldHandlerSpec + err := storage.VersionJSONDecode(kv.Value, func(version int, dec *json.Decoder) error { + if version != 1 { + return fmt.Errorf("old handler specs should all be version 1, got version %d", version) + } + return dec.Decode(&old) + }) + if err != nil { + return errors.Wrapf(err, "failed to read old handler spec data for %s", kv.Key) + } + + s.logger.Println("D! migrating old handler spec", old.ID) + + // Create new handlers from the old + hasStateChangesOnly := false + aggregatePrefix := "" + for i, action := range old.Actions { + new := HandlerSpec{ + ID: fmt.Sprintf("%s-%s-%d", old.ID, action.Kind, i), + Kind: action.Kind, + Options: action.Options, + } + if hasStateChangesOnly { + new.Match = "changed() == TRUE" + } + switch action.Kind { + case "stateChangesOnly": + hasStateChangesOnly = true + // No need to add a handler for this one + case "aggregate": + newPrefix := aggregatePrefix + "aggregate_topic-" + old.ID + "-" + for _, topic := range old.Topics { + new.Topic = aggregatePrefix + topic + new.Options["topic"] = newPrefix + topic + newHandlers = append(newHandlers, new) + } + aggregatePrefix = newPrefix + default: + for _, topic := range old.Topics { + new.Topic = aggregatePrefix + topic + newHandlers = append(newHandlers, new) + } + } + } + } + + // Check that all new handlers are unique + for _, handler := range newHandlers { + if _, err := s.specsDAO.GetTx(tx, handler.Topic, handler.ID); err != ErrNoHandlerSpecExists { + return fmt.Errorf("handler %q for topic %q already exists", handler.ID, handler.Topic) + } + } + + s.logger.Printf("D! creating %d new handlers in place of old handlers", len(newHandlers)) + + // Create new handlers + for _, handler := range newHandlers { + if err := s.specsDAO.CreateTx(tx, handler); err != nil { + return errors.Wrap(err, "failed to create new handler during migration") + } + } + return nil + }) + if err != nil { + return err + } + // Save version + return s.StorageService.Versions().Set(handlerSpecsStoreVersion, handlerSpecsStoreVersion1) +} + func (s *Service) loadSavedHandlerSpecs() error { offset := 0 limit := 100 diff --git a/services/storage/indexed.go b/services/storage/indexed.go index b56428bf4..599274a17 100644 --- a/services/storage/indexed.go +++ b/services/storage/indexed.go @@ -146,7 +146,15 @@ func (s *IndexedStore) indexKey(index, value string) string { return path.Join(s.indexesPrefix, index, value) } -func (s *IndexedStore) get(tx ReadOnlyTx, id string) (BinaryObject, error) { +func (s *IndexedStore) Get(id string) (o BinaryObject, err error) { + err = s.store.View(func(tx ReadOnlyTx) error { + o, err = s.GetTx(tx, id) + return err + }) + return +} + +func (s *IndexedStore) GetTx(tx ReadOnlyTx, id string) (BinaryObject, error) { key := s.dataKey(id) if exists, err := tx.Exists(key); err != nil { return nil, err @@ -160,189 +168,207 @@ func (s *IndexedStore) get(tx ReadOnlyTx, id string) (BinaryObject, error) { o := s.newObject() err = o.UnmarshalBinary(kv.Value) return o, err - -} - -func (s *IndexedStore) Get(id string) (o BinaryObject, err error) { - err = s.store.View(func(tx ReadOnlyTx) error { - o, err = s.get(tx, id) - return err - }) - return } func (s *IndexedStore) Create(o BinaryObject) error { return s.put(o, false, false) } +func (s *IndexedStore) CreateTx(tx Tx, o BinaryObject) error { + return s.putTx(tx, o, false, false) +} func (s *IndexedStore) Put(o BinaryObject) error { return s.put(o, true, false) } +func (s *IndexedStore) PutTx(tx Tx, o BinaryObject) error { + return s.putTx(tx, o, true, false) +} func (s *IndexedStore) Replace(o BinaryObject) error { return s.put(o, true, true) } +func (s *IndexedStore) ReplaceTx(tx Tx, o BinaryObject) error { + return s.putTx(tx, o, true, true) +} func (s *IndexedStore) put(o BinaryObject, allowReplace, requireReplace bool) error { return s.store.Update(func(tx Tx) error { - key := s.dataKey(o.ObjectID()) + return s.putTx(tx, o, allowReplace, requireReplace) + }) +} - replacing := false - old, err := s.get(tx, o.ObjectID()) - if err != nil { - if err != ErrNoObjectExists || (requireReplace && err == ErrNoObjectExists) { - return err - } - } else if !allowReplace { - return ErrObjectExists - } else { - replacing = true - } +func (s *IndexedStore) putTx(tx Tx, o BinaryObject, allowReplace, requireReplace bool) error { + key := s.dataKey(o.ObjectID()) - data, err := o.MarshalBinary() - if err != nil { + replacing := false + old, err := s.GetTx(tx, o.ObjectID()) + if err != nil { + if err != ErrNoObjectExists || (requireReplace && err == ErrNoObjectExists) { return err } + } else if !allowReplace { + return ErrObjectExists + } else { + replacing = true + } - // Put data - err = tx.Put(key, data) + data, err := o.MarshalBinary() + if err != nil { + return err + } + + // Put data + err = tx.Put(key, data) + if err != nil { + return err + } + // Put all indexes + for _, idx := range s.indexes { + // Get new index key + newValue, err := idx.ValueOf(o) if err != nil { return err } - // Put all indexes - for _, idx := range s.indexes { - // Get new index key - newValue, err := idx.ValueOf(o) + newIndexKey := s.indexKey(idx.Name, newValue) + + // Get old index key, if we are replacing + var oldValue string + if replacing { + var err error + oldValue, err = idx.ValueOf(old) if err != nil { return err } - newIndexKey := s.indexKey(idx.Name, newValue) + } + oldIndexKey := s.indexKey(idx.Name, oldValue) - // Get old index key, if we are replacing - var oldValue string - if replacing { - var err error - oldValue, err = idx.ValueOf(old) - if err != nil { - return err - } + if !replacing || (replacing && oldIndexKey != newIndexKey) { + // Update new key + err := tx.Put(newIndexKey, []byte(o.ObjectID())) + if err != nil { + return err } - oldIndexKey := s.indexKey(idx.Name, oldValue) - - if !replacing || (replacing && oldIndexKey != newIndexKey) { - // Update new key - err := tx.Put(newIndexKey, []byte(o.ObjectID())) + if replacing { + // Remove old key + err = tx.Delete(oldIndexKey) if err != nil { return err } - if replacing { - // Remove old key - err = tx.Delete(oldIndexKey) - if err != nil { - return err - } - } } } - return nil - }) + } + return nil } func (s *IndexedStore) Delete(id string) error { return s.store.Update(func(tx Tx) error { - o, err := s.get(tx, id) - if err == ErrNoObjectExists { - // Nothing to do - return nil - } else if err != nil { - return err - } + return s.DeleteTx(tx, id) + }) +} +func (s *IndexedStore) DeleteTx(tx Tx, id string) error { + o, err := s.GetTx(tx, id) + if err == ErrNoObjectExists { + // Nothing to do + return nil + } else if err != nil { + return err + } - // Delete object - key := s.dataKey(id) - err = tx.Delete(key) + // Delete object + key := s.dataKey(id) + err = tx.Delete(key) + if err != nil { + return err + } + + // Delete all indexes + for _, idx := range s.indexes { + value, err := idx.ValueOf(o) if err != nil { return err } - - // Delete all indexes - for _, idx := range s.indexes { - value, err := idx.ValueOf(o) - if err != nil { - return err - } - indexKey := s.indexKey(idx.Name, value) - err = tx.Delete(indexKey) - if err != nil { - return err - } + indexKey := s.indexKey(idx.Name, value) + err = tx.Delete(indexKey) + if err != nil { + return err } - return nil - }) + } + return nil } // List returns a list of objects that match a given pattern. // If limit < 0, then no limit is enforced. -func (s *IndexedStore) List(index, pattern string, offset, limit int) ([]BinaryObject, error) { - return s.list(index, pattern, offset, limit, false) +func (s *IndexedStore) List(index, pattern string, offset, limit int) (objects []BinaryObject, err error) { + err = s.store.View(func(tx ReadOnlyTx) error { + objects, err = s.list(tx, index, pattern, offset, limit, false) + return err + }) + return +} +func (s *IndexedStore) ListTx(tx ReadOnlyTx, index, pattern string, offset, limit int) ([]BinaryObject, error) { + return s.list(tx, index, pattern, offset, limit, false) } // ReverseList returns a list of objects that match a given pattern, using reverse sort. // If limit < 0, then no limit is enforced. -func (s *IndexedStore) ReverseList(index, pattern string, offset, limit int) ([]BinaryObject, error) { - return s.list(index, pattern, offset, limit, true) +func (s *IndexedStore) ReverseList(index, pattern string, offset, limit int) (objects []BinaryObject, err error) { + err = s.store.View(func(tx ReadOnlyTx) error { + objects, err = s.list(tx, index, pattern, offset, limit, true) + return err + }) + return +} +func (s *IndexedStore) ReverseListTx(tx ReadOnlyTx, index, pattern string, offset, limit int) ([]BinaryObject, error) { + return s.list(tx, index, pattern, offset, limit, true) } -func (s *IndexedStore) list(index, pattern string, offset, limit int, reverse bool) (objects []BinaryObject, err error) { - err = s.store.View(func(tx ReadOnlyTx) error { - // List all object ids sorted by index - ids, err := tx.List(s.indexKey(index, "") + "/") - if err != nil { - return err - } - if reverse { - // Reverse to sort - for i, j := 0, len(ids)-1; i < j; i, j = i+1, j-1 { - ids[i], ids[j] = ids[j], ids[i] - } +func (s *IndexedStore) list(tx ReadOnlyTx, index, pattern string, offset, limit int, reverse bool) ([]BinaryObject, error) { + // List all object ids sorted by index + ids, err := tx.List(s.indexKey(index, "") + "/") + if err != nil { + return nil, err + } + if reverse { + // Reverse to sort + for i, j := 0, len(ids)-1; i < j; i, j = i+1, j-1 { + ids[i], ids[j] = ids[j], ids[i] } + } - var match func([]byte) bool - if pattern != "" { - match = func(value []byte) bool { - id := string(value) - matched, _ := path.Match(pattern, id) - return matched - } - } else { - match = func([]byte) bool { return true } + var match func([]byte) bool + if pattern != "" { + match = func(value []byte) bool { + id := string(value) + matched, _ := path.Match(pattern, id) + return matched } - var matches []string - if limit >= 0 { - matches = DoListFunc(ids, match, offset, limit) - } else { - matches = make([]string, len(ids)) - for i := range ids { - matches[i] = string(ids[i].Value) - } + } else { + match = func([]byte) bool { return true } + } + var matches []string + if limit >= 0 { + matches = DoListFunc(ids, match, offset, limit) + } else { + matches = make([]string, len(ids)) + for i := range ids { + matches[i] = string(ids[i].Value) } + } - objects = make([]BinaryObject, len(matches)) - for i, id := range matches { - data, err := tx.Get(s.dataKey(id)) - if err != nil { - return err - } - o := s.newObject() - err = o.UnmarshalBinary(data.Value) - if err != nil { - return err - } - objects[i] = o + objects := make([]BinaryObject, len(matches)) + for i, id := range matches { + data, err := tx.Get(s.dataKey(id)) + if err != nil { + return nil, err } - return nil - }) - return + o := s.newObject() + err = o.UnmarshalBinary(data.Value) + if err != nil { + return nil, err + } + objects[i] = o + } + return objects, nil } func ImpossibleTypeErr(exp interface{}, got interface{}) error { diff --git a/services/storage/service.go b/services/storage/service.go index 6cbab1bf1..8437ae3dd 100644 --- a/services/storage/service.go +++ b/services/storage/service.go @@ -17,6 +17,8 @@ type Service struct { stores map[string]Interface mu sync.Mutex + versions Versions + logger *log.Logger } @@ -28,6 +30,10 @@ func NewService(conf Config, l *log.Logger) *Service { } } +const ( + versionsNamespace = "versions" +) + func (s *Service) Open() error { s.mu.Lock() defer s.mu.Unlock() @@ -40,6 +46,9 @@ func (s *Service) Open() error { return errors.Wrapf(err, "open boltdb @ %q", s.dbpath) } s.boltdb = db + + s.versions = NewVersions(s.store(versionsNamespace)) + return nil } @@ -57,6 +66,10 @@ func (s *Service) Close() error { func (s *Service) Store(name string) Interface { s.mu.Lock() defer s.mu.Unlock() + return s.store(name) +} + +func (s *Service) store(name string) Interface { if store, ok := s.stores[name]; ok { return store } else { @@ -65,3 +78,7 @@ func (s *Service) Store(name string) Interface { return store } } + +func (s *Service) Versions() Versions { + return s.versions +} diff --git a/services/storage/storagetest/storage.go b/services/storage/storagetest/storage.go index f2230e379..93b55c786 100644 --- a/services/storage/storagetest/storage.go +++ b/services/storage/storagetest/storage.go @@ -2,12 +2,20 @@ package storagetest import "github.com/influxdata/kapacitor/services/storage" -type TestStore struct{} +type TestStore struct { + versions storage.Versions +} func New() TestStore { - return TestStore{} + return TestStore{ + versions: storage.NewVersions(storage.NewMemStore("versions")), + } } func (s TestStore) Store(name string) storage.Interface { return storage.NewMemStore(name) } + +func (s TestStore) Versions() storage.Versions { + return s.versions +} diff --git a/services/storage/versions.go b/services/storage/versions.go new file mode 100644 index 000000000..5e0a96f8c --- /dev/null +++ b/services/storage/versions.go @@ -0,0 +1,34 @@ +package storage + +type Versions interface { + Get(id string) (string, error) + Set(id, version string) error +} + +type versions struct { + store Interface +} + +func NewVersions(store Interface) Versions { + return &versions{ + store: store, + } +} + +func (v versions) Get(id string) (version string, err error) { + err = v.store.View(func(tx ReadOnlyTx) error { + kv, err := tx.Get(id) + if err != nil { + return err + } + version = string(kv.Value) + return nil + }) + return +} + +func (v versions) Set(id, version string) error { + return v.store.Update(func(tx Tx) error { + return tx.Put(id, []byte(version)) + }) +} diff --git a/usr/share/bash-completion/completions/kapacitor b/usr/share/bash-completion/completions/kapacitor index f35b2e4b6..198de172e 100644 --- a/usr/share/bash-completion/completions/kapacitor +++ b/usr/share/bash-completion/completions/kapacitor @@ -83,7 +83,7 @@ _kapacitor() esac fi ;; - define-handler) + define-topic-handler) if [[ -z "${COMP_WORDS[2]}" || ("$cur" = "${COMP_WORDS[2]}" && -z "${COMP_WORDS[3]}") ]] then words=$(_kapacitor_list topics "$cur") @@ -175,7 +175,7 @@ _kapacitor() show-template) words=$(_kapacitor_list templates "$cur") ;; - show-handler) + show-topic-handler) if [[ -z "${COMP_WORDS[2]}" || ("$cur" = "${COMP_WORDS[2]}" && -z "${COMP_WORDS[3]}") ]] then words=$(_kapacitor_list topics "$cur") @@ -199,8 +199,8 @@ _kapacitor() esac ;; *) - words='record define define-template define-handler replay replay-live enable disable \ - reload delete list show show-template show-handler show-topic level stats version vars service-tests help' + words='record define define-template define-topic-handler replay replay-live enable disable \ + reload delete list show show-template show-topic-handler show-topic level stats version vars service-tests help' ;; esac if [ -z "$COMPREPLY" ]