Skip to content

Commit

Permalink
add migration logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 21, 2017
1 parent 3850c33 commit 1521a8c
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 134 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ kapacitor define-handler system aggregate_by_1m.yaml
The change is completely breaking for the technical preview alerting service, a.k.a. the new alert topic handler features.
The change boils down to simplifying how you define and interact with topics.
Alert handlers now only ever have a single action and belong to a single topic.
An automatic migration from old to new handler definitions will be performed during startup.
See the updated API docs.

### Bugfixes
Expand Down
38 changes: 35 additions & 3 deletions services/alert/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,29 @@ var (
type HandlerSpecDAO interface {
// Retrieve a handler
Get(topic, id string) (HandlerSpec, error)
GetTx(tx storage.ReadOnlyTx, topic, id string) (HandlerSpec, error)

// Create a handler.
// ErrHandlerSpecExists is returned if a handler already exists with the same ID.
Create(h HandlerSpec) error
CreateTx(tx storage.Tx, h HandlerSpec) error

// Replace an existing handler.
// ErrNoHandlerSpecExists is returned if the handler does not exist.
Replace(h HandlerSpec) error
ReplaceTx(tx storage.Tx, h HandlerSpec) error

// Delete a handler.
// It is not an error to delete an non-existent handler.
Delete(topic, id string) error
DeleteTx(tx storage.Tx, topic, id string) error

// List handlers matching a pattern.
// The pattern is shell/glob matching see https://golang.org/pkg/path/#Match
// Offset and limit are pagination bounds. Offset is inclusive starting at index 0.
// More results may exist while the number of returned items is equal to limit.
List(topic, pattern string, offset, limit int) ([]HandlerSpec, error)
ListTx(tx storage.ReadOnlyTx, topic, pattern string, offset, limit int) ([]HandlerSpec, error)
}

//--------------------------------------------------------------------
Expand Down Expand Up @@ -113,8 +118,12 @@ type handlerSpecKV struct {
store *storage.IndexedStore
}

const (
handlerPrefix = "handlers"
)

func newHandlerSpecKV(store storage.Interface) (*handlerSpecKV, error) {
c := storage.DefaultIndexedStoreConfig("handlers", func() storage.BinaryObject {
c := storage.DefaultIndexedStoreConfig(handlerPrefix, func() storage.BinaryObject {
return new(HandlerSpec)
})
istore, err := storage.NewIndexedStore(store, c)
Expand All @@ -136,7 +145,13 @@ func (kv *handlerSpecKV) error(err error) error {
}

func (kv *handlerSpecKV) Get(topic, id string) (HandlerSpec, error) {
o, err := kv.store.Get(fullID(topic, id))
return kv.getHelper(kv.store.Get(fullID(topic, id)))
}
func (kv *handlerSpecKV) GetTx(tx storage.ReadOnlyTx, topic, id string) (HandlerSpec, error) {
return kv.getHelper(kv.store.GetTx(tx, fullID(topic, id)))
}

func (kv *handlerSpecKV) getHelper(o storage.BinaryObject, err error) (HandlerSpec, error) {
if err != nil {
return HandlerSpec{}, kv.error(err)
}
Expand All @@ -150,20 +165,37 @@ func (kv *handlerSpecKV) Get(topic, id string) (HandlerSpec, error) {
func (kv *handlerSpecKV) Create(h HandlerSpec) error {
return kv.store.Create(&h)
}
func (kv *handlerSpecKV) CreateTx(tx storage.Tx, h HandlerSpec) error {
return kv.store.CreateTx(tx, &h)
}

func (kv *handlerSpecKV) Replace(h HandlerSpec) error {
return kv.store.Replace(&h)
}
func (kv *handlerSpecKV) ReplaceTx(tx storage.Tx, h HandlerSpec) error {
return kv.store.ReplaceTx(tx, &h)
}

func (kv *handlerSpecKV) Delete(topic, id string) error {
return kv.store.Delete(fullID(topic, id))
}
func (kv *handlerSpecKV) DeleteTx(tx storage.Tx, topic, id string) error {
return kv.store.DeleteTx(tx, fullID(topic, id))
}

func (kv *handlerSpecKV) List(topic, pattern string, offset, limit int) ([]HandlerSpec, error) {
if pattern == "" {
pattern = "*"
}
objects, err := kv.store.List(storage.DefaultIDIndex, fullID(topic, pattern), offset, limit)
return kv.listHelper(kv.store.List(storage.DefaultIDIndex, fullID(topic, pattern), offset, limit))
}
func (kv *handlerSpecKV) ListTx(tx storage.ReadOnlyTx, topic, pattern string, offset, limit int) ([]HandlerSpec, error) {
if pattern == "" {
pattern = "*"
}
return kv.listHelper(kv.store.ListTx(tx, storage.DefaultIDIndex, fullID(topic, pattern), offset, limit))
}
func (kv *handlerSpecKV) listHelper(objects []storage.BinaryObject, err error) ([]HandlerSpec, error) {
if err != nil {
return nil, err
}
Expand Down
133 changes: 133 additions & 0 deletions services/alert/service.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package alert

import (
"encoding/json"
"fmt"
"log"
"path"
"regexp"
"sync"

"github.com/influxdata/kapacitor/alert"
Expand Down Expand Up @@ -46,6 +48,7 @@ type Service struct {

StorageService interface {
Store(namespace string) storage.Interface
Versions() storage.Versions
}

Commander command.Commander
Expand Down Expand Up @@ -128,6 +131,11 @@ func (s *Service) Open() error {
}
s.topicsDAO = topicsDAO

// Migrate v1.2 handlers
if err := s.migrateHandlerSpecs(store); err != nil {
return err
}

// Load saved handlers
if err := s.loadSavedHandlerSpecs(); err != nil {
return err
Expand All @@ -153,6 +161,131 @@ func (s *Service) Close() error {
return s.APIServer.Close()
}

const (
handlerSpecsStoreVersion = "alert_topic_handler_specs"
handlerSpecsStoreVersion1 = "1"
)

func (s *Service) migrateHandlerSpecs(store storage.Interface) error {
specVersion, err := s.StorageService.Versions().Get(handlerSpecsStoreVersion)
if err != nil && err != storage.ErrNoKeyExists {
return err
}
if specVersion == handlerSpecsStoreVersion1 {
// Already migrated
return nil
}
s.logger.Println("D! migrating old v1.2 handler specs")

// v1.2 HandlerActionSpec
type oldHandlerActionSpec struct {
Kind string `json:"kind"`
Options map[string]interface{} `json:"options"`
}

// v1.2 HandlerSpec
type oldHandlerSpec struct {
ID string `json:"id"`
Topics []string `json:"topics"`
Actions []oldHandlerActionSpec `json:"actions"`
}
oldDataPrefix := "/" + handlerPrefix + "/data"
oldKeyPattern := regexp.MustCompile(fmt.Sprintf(`^%s/[-\._\p{L}0-9]+$`, oldDataPrefix))

// Process to migrate to new handler specs:
// 1. Gather all old handlers
// 2. Define new handlers that are equivalent
// 3. Check that there are no ID conflicts
// 4. Save the new handlers
// 5. Delete the old specs
//
// All steps are performed in a single transaction,
// so it can be rolledback in case of an error.
err = store.Update(func(tx storage.Tx) error {
var newHandlers []HandlerSpec
kvs, err := tx.List(oldDataPrefix)
if err != nil {
return err
}
s.logger.Printf("D! found %d handler rows", len(kvs))

var oldKeys []string
for _, kv := range kvs {
if !oldKeyPattern.MatchString(kv.Key) {
s.logger.Println("D! found new handler skipping:", kv.Key)
continue
}
oldKeys = append(oldKeys, kv.Key)
var old oldHandlerSpec
err := storage.VersionJSONDecode(kv.Value, func(version int, dec *json.Decoder) error {
if version != 1 {
return fmt.Errorf("old handler specs should all be version 1, got version %d", version)
}
return dec.Decode(&old)
})
if err != nil {
return errors.Wrapf(err, "failed to read old handler spec data for %s", kv.Key)
}

s.logger.Println("D! migrating old handler spec", old.ID)

// Create new handlers from the old
hasStateChangesOnly := false
aggregatePrefix := ""
for i, action := range old.Actions {
new := HandlerSpec{
ID: fmt.Sprintf("%s-%s-%d", old.ID, action.Kind, i),
Kind: action.Kind,
Options: action.Options,
}
if hasStateChangesOnly {
new.Match = "changed() == TRUE"
}
switch action.Kind {
case "stateChangesOnly":
hasStateChangesOnly = true
// No need to add a handler for this one
case "aggregate":
newPrefix := aggregatePrefix + "aggregate_topic-" + old.ID + "-"
for _, topic := range old.Topics {
new.Topic = aggregatePrefix + topic
new.Options["topic"] = newPrefix + topic
newHandlers = append(newHandlers, new)
}
aggregatePrefix = newPrefix
default:
for _, topic := range old.Topics {
new.Topic = aggregatePrefix + topic
newHandlers = append(newHandlers, new)
}
}
}
}

// Check that all new handlers are unique
for _, handler := range newHandlers {
if _, err := s.specsDAO.GetTx(tx, handler.Topic, handler.ID); err != ErrNoHandlerSpecExists {
return fmt.Errorf("handler %q for topic %q already exists", handler.ID, handler.Topic)
}
}

s.logger.Printf("D! creating %d new handlers in place of old handlers", len(newHandlers))

// Create new handlers
for _, handler := range newHandlers {
if err := s.specsDAO.CreateTx(tx, handler); err != nil {
return errors.Wrap(err, "failed to create new handler during migration")
}
}
return nil
})
if err != nil {
return err
}
// Save version
return s.StorageService.Versions().Set(handlerSpecsStoreVersion, handlerSpecsStoreVersion1)
}

func (s *Service) loadSavedHandlerSpecs() error {
offset := 0
limit := 100
Expand Down
Loading

0 comments on commit 1521a8c

Please sign in to comment.