Skip to content

Commit

Permalink
feat: implement informer
Browse files Browse the repository at this point in the history
  • Loading branch information
wfxr committed Nov 10, 2021
1 parent 13f721a commit c1ac08b
Show file tree
Hide file tree
Showing 19 changed files with 1,402 additions and 0 deletions.
11 changes: 11 additions & 0 deletions internal/database/metadatav2/informer/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package informer

import "github.com/oom-ai/oomstore/pkg/oomstore/typesv2"

type EntityCache struct {
typesv2.EntityList
}

func (c *EntityCache) List() typesv2.EntityList {
return c.EntityList
}
50 changes: 50 additions & 0 deletions internal/database/metadatav2/informer/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package informer

import (
"github.com/oom-ai/oomstore/pkg/oomstore/types"
"github.com/oom-ai/oomstore/pkg/oomstore/typesv2"
)

type FeatureCache struct {
typesv2.FeatureList
}

func (c *FeatureCache) Enrich(groupCache *GroupCache) {
for _, f := range c.FeatureList {
f.Group = groupCache.Find(func(g *typesv2.FeatureGroup) bool {
return g.ID == f.GroupID
})
}
}

func (c *FeatureCache) List(opt types.ListFeatureOpt) typesv2.FeatureList {
var features typesv2.FeatureList

// filter names
if opt.FeatureNames != nil {
for _, name := range opt.FeatureNames {
if f := c.Find(func(f *typesv2.Feature) bool {
return f.Name != name
}); f != nil {
features = append(features, f)
}
}
} else {
features = c.FeatureList
}

// filter entity
if opt.EntityName != nil {
features = features.Filter(func(f *typesv2.Feature) bool {
return f.Entity().Name == *opt.EntityName
})
}

// filter group
if opt.GroupName != nil {
features = features.Filter(func(f *typesv2.Feature) bool {
return f.Group.Name == *opt.GroupName
})
}
return features
}
32 changes: 32 additions & 0 deletions internal/database/metadatav2/informer/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package informer

import (
"github.com/oom-ai/oomstore/pkg/oomstore/typesv2"
)

type GroupCache struct {
typesv2.FeatureGroupList
}

func (c *GroupCache) Enrich(entityCache *EntityCache, revisionCache *RevisionCache) {
for _, g := range c.FeatureGroupList {
g.Entity = entityCache.Find(func(e *typesv2.Entity) bool {
return e.ID == g.EntityID
})

if g.OnlineRevisionID != nil {
g.OnlineRevision = revisionCache.Find(func(r *typesv2.Revision) bool {
return r.ID == *g.OnlineRevisionID
})
}
}
}

func (c *GroupCache) List(entityName *string) []*typesv2.FeatureGroup {
if entityName == nil {
return c.FeatureGroupList
}
return c.FeatureGroupList.Filter(func(g *typesv2.FeatureGroup) bool {
return g.Entity.Name == *entityName
})
}
149 changes: 149 additions & 0 deletions internal/database/metadatav2/informer/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package informer

import (
"context"
"fmt"
"log"
"time"

"github.com/oom-ai/oomstore/internal/database/metadatav2"
"github.com/oom-ai/oomstore/pkg/oomstore/types"
"github.com/oom-ai/oomstore/pkg/oomstore/typesv2"
)

type Cache struct {
Entities *EntityCache
Features *FeatureCache
Groups *GroupCache
Revisions *RevisionCache
}

func NewCache(
entities typesv2.EntityList,
features typesv2.FeatureList,
groups typesv2.FeatureGroupList,
revisions typesv2.RevisionList) *Cache {
return &Cache{
Entities: &EntityCache{entities},
Features: &FeatureCache{features},
Groups: &GroupCache{groups},
Revisions: &RevisionCache{revisions},
}
}

func (c *Cache) enrich() {
c.Groups.Enrich(c.Entities, c.Revisions)
c.Features.Enrich(c.Groups)
// TODO: caching revision data is not necessary, but currently we do it for simplicity
c.Revisions.Enrich(c.Groups)
}

type Informer struct {
*Cache
lister func() (*Cache, error)
ticker *time.Ticker
quit chan bool
}

func New(listInterval time.Duration, lister func() (*Cache, error)) (*Informer, error) {
cache, err := lister()
if err != nil {
return nil, err
}

informer := &Informer{
Cache: cache,
lister: lister,
ticker: time.NewTicker(listInterval),
quit: make(chan bool),
}

go func() {
for {
select {
case <-informer.quit:
return
case <-informer.ticker.C:
if err := informer.Refresh(); err != nil {
log.Printf("failed refreshing informer: %v\n", err)
}
informer.ticker.Reset(listInterval)
}
}
}()
return informer, nil
}

func (f *Informer) Close() error {
f.ticker.Stop()
f.quit <- true
return nil
}

func (f *Informer) Refresh() error {
cache, err := f.lister()
if err != nil {
return err
}
cache.enrich()
f.Cache = cache
return nil
}

func (f *Informer) GetEntity(ctx context.Context, name string) *typesv2.Entity {
return f.Entities.Find(func(e *typesv2.Entity) bool {
return e.Name == name
})
}

func (f *Informer) ListEntity(ctx context.Context) typesv2.EntityList {
return f.Entities.List()
}

func (f *Informer) GetFeature(ctx context.Context, name string) *typesv2.Feature {
return f.Features.Find(func(f *typesv2.Feature) bool {
return f.Name == name
})
}

func (f *Informer) ListFeature(ctx context.Context, opt types.ListFeatureOpt) typesv2.FeatureList {
return f.Features.List(opt)
}

func (f *Informer) GetFeatureGroup(ctx context.Context, name string) *typesv2.FeatureGroup {
return f.Groups.Find(func(g *typesv2.FeatureGroup) bool {
return g.Name == name
})
}

func (f *Informer) ListFeatureGroup(ctx context.Context, entityName *string) typesv2.FeatureGroupList {
return f.Groups.List(entityName)
}

// TODO: refactor this
func (f *Informer) GetRevision(ctx context.Context, opt metadatav2.GetRevisionOpt) (*typesv2.Revision, error) {
if opt.RevisionId != nil {
return f.Revisions.Find(func(r *typesv2.Revision) bool {
return r.ID == *opt.RevisionId
}), nil
} else if opt.GroupName != nil && opt.Revision != nil {
return f.Revisions.Find(func(r *typesv2.Revision) bool {
return r.Group.Name == *opt.GroupName && r.Revision == *opt.Revision
}), nil
}
return nil, fmt.Errorf("invalid GetRevisionOpt: %+v", opt)
}

func (f *Informer) ListRevision(ctx context.Context, opt metadatav2.ListRevisionOpt) typesv2.RevisionList {
return f.Revisions.List(opt)
}

// TODO: not necessary anymore ?
func (f *Informer) GetLatestRevision(ctx context.Context, groupName string) *typesv2.Revision {
return f.Revisions.MaxRevision(groupName)
}

// TODO: refactor this into a private function of OomStore
func (f *Informer) BuildRevisionRanges(ctx context.Context, groupName string) []*types.RevisionRange {
return f.Revisions.BuildRevisionRanges(groupName)
}
93 changes: 93 additions & 0 deletions internal/database/metadatav2/informer/revision.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package informer

import (
"math"
"sort"

"github.com/oom-ai/oomstore/internal/database/metadatav2"
"github.com/oom-ai/oomstore/pkg/oomstore/types"
"github.com/oom-ai/oomstore/pkg/oomstore/typesv2"
)

type RevisionCache struct {
typesv2.RevisionList
}

func (c *RevisionCache) Enrich(groupCache *GroupCache) {
for _, r := range c.RevisionList {
r.Group = groupCache.Find(func(g *typesv2.FeatureGroup) bool {
return g.ID == r.GroupID
})
}
}

func (c *RevisionCache) List(opt metadatav2.ListRevisionOpt) typesv2.RevisionList {
var revisions typesv2.RevisionList
if opt.DataTables != nil {
for _, table := range opt.DataTables {
if r := c.Find(func(r *typesv2.Revision) bool {
return r.DataTable == table
}); r != nil {
revisions = append(revisions, r)
}
}
} else {
revisions = c.RevisionList
}

if opt.GroupName != nil {
revisions = revisions.Filter(func(r *typesv2.Revision) bool {
return r.Group.Name == *opt.GroupName
})
}
return revisions
}

func (c *RevisionCache) GetGroup(groupName string) typesv2.RevisionList {
return c.Filter(func(r *typesv2.Revision) bool {
return r.Group.Name == groupName
})
}

func (c *RevisionCache) MaxRevision(groupName string) *typesv2.Revision {
revisions := c.GetGroup(groupName)
if revisions == nil {
return nil
}

var max *typesv2.Revision
for _, r := range revisions {
if max == nil || max.Revision < r.Revision {
max = r
}
}
return max
}

func (c *RevisionCache) BuildRevisionRanges(groupName string) []*types.RevisionRange {
revisionIndex := c.GetGroup(groupName)
if len(revisionIndex) == 0 {
return nil
}

var revisions typesv2.RevisionList
revisions = append(revisions, revisionIndex...)
sort.Slice(revisions, func(i, j int) bool {
return revisions[i].Revision < revisions[j].Revision
})

var ranges []*types.RevisionRange
for i := 1; i < len(revisions); i++ {
ranges = append(ranges, &types.RevisionRange{
MinRevision: revisions[i-1].Revision,
MaxRevision: revisions[i].Revision,
DataTable: revisions[i-1].DataTable,
})
}

return append(ranges, &types.RevisionRange{
MinRevision: revisions[len(revisions)-1].Revision,
MaxRevision: revisions[math.MaxInt64].Revision,
DataTable: revisions[len(revisions)-1].DataTable,
})
}
Loading

0 comments on commit c1ac08b

Please sign in to comment.