Skip to content

Commit

Permalink
fix(informer): fix potential data race
Browse files Browse the repository at this point in the history
  • Loading branch information
wfxr committed Nov 10, 2021
1 parent 459deca commit 3db1c3d
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions internal/database/metadatav2/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"sync/atomic"
"time"

"github.com/oom-ai/oomstore/internal/database/metadatav2"
Expand Down Expand Up @@ -39,24 +40,22 @@ func (c *Cache) enrich() {
}

type Informer struct {
*Cache
cache atomic.Value
lister func() (*Cache, error)
ticker *time.Ticker
quit chan bool
}

func New(listInterval time.Duration, lister func() (*Cache, error)) (*Informer, error) {
cache, err := lister()
if err != nil {
return nil, err
}

informer := &Informer{
Cache: cache,
cache: atomic.Value{},
lister: lister,
ticker: time.NewTicker(listInterval),
quit: make(chan bool),
}
if err := informer.Refresh(); err != nil {
return nil, err
}

go func() {
for {
Expand Down Expand Up @@ -86,64 +85,68 @@ func (f *Informer) Refresh() error {
return err
}
cache.enrich()
f.Cache = cache
f.cache.Store(cache)
return nil
}

func (f *Informer) Cache() *Cache {
return f.cache.Load().(*Cache)
}

func (f *Informer) GetEntity(ctx context.Context, name string) *typesv2.Entity {
return f.Entities.Find(func(e *typesv2.Entity) bool {
return f.Cache().Entities.Find(func(e *typesv2.Entity) bool {
return e.Name == name
})
}

func (f *Informer) ListEntity(ctx context.Context) typesv2.EntityList {
return f.Entities.List()
return f.Cache().Entities.List()
}

func (f *Informer) GetFeature(ctx context.Context, name string) *typesv2.Feature {
return f.Features.Find(func(f *typesv2.Feature) bool {
return f.Cache().Features.Find(func(f *typesv2.Feature) bool {
return f.Name == name
})
}

func (f *Informer) ListFeature(ctx context.Context, opt types.ListFeatureOpt) typesv2.FeatureList {
return f.Features.List(opt)
return f.Cache().Features.List(opt)
}

func (f *Informer) GetFeatureGroup(ctx context.Context, name string) *typesv2.FeatureGroup {
return f.Groups.Find(func(g *typesv2.FeatureGroup) bool {
return f.Cache().Groups.Find(func(g *typesv2.FeatureGroup) bool {
return g.Name == name
})
}

func (f *Informer) ListFeatureGroup(ctx context.Context, entityName *string) typesv2.FeatureGroupList {
return f.Groups.List(entityName)
return f.Cache().Groups.List(entityName)
}

// TODO: refactor this
func (f *Informer) GetRevision(ctx context.Context, opt metadatav2.GetRevisionOpt) (*typesv2.Revision, error) {
if opt.RevisionId != nil {
return f.Revisions.Find(func(r *typesv2.Revision) bool {
return f.Cache().Revisions.Find(func(r *typesv2.Revision) bool {
return r.ID == *opt.RevisionId
}), nil
} else if opt.GroupName != nil && opt.Revision != nil {
return f.Revisions.Find(func(r *typesv2.Revision) bool {
return f.Cache().Revisions.Find(func(r *typesv2.Revision) bool {
return r.Group.Name == *opt.GroupName && r.Revision == *opt.Revision
}), nil
}
return nil, fmt.Errorf("invalid GetRevisionOpt: %+v", opt)
}

func (f *Informer) ListRevision(ctx context.Context, opt metadatav2.ListRevisionOpt) typesv2.RevisionList {
return f.Revisions.List(opt)
return f.Cache().Revisions.List(opt)
}

// TODO: not necessary anymore ?
func (f *Informer) GetLatestRevision(ctx context.Context, groupName string) *typesv2.Revision {
return f.Revisions.MaxRevision(groupName)
return f.Cache().Revisions.MaxRevision(groupName)
}

// TODO: refactor this into a private function of OomStore
func (f *Informer) BuildRevisionRanges(ctx context.Context, groupName string) []*types.RevisionRange {
return f.Revisions.BuildRevisionRanges(groupName)
return f.Cache().Revisions.BuildRevisionRanges(groupName)
}

0 comments on commit 3db1c3d

Please sign in to comment.