Skip to content

Commit

Permalink
use a sync.Map for storing buckets
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Oct 20, 2023
1 parent 3882f06 commit 9463c82
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions v4/store/nats-js/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type natsStore struct {

conn *nats.Conn
js nats.JetStreamContext
buckets map[string]nats.ObjectStore
buckets *sync.Map
}

func init() {
Expand All @@ -55,7 +55,7 @@ func NewStore(opts ...store.Option) store.Store {
opts: options,
jsopts: []nats.JSOpt{},
objStoreConfigs: []*nats.ObjectStoreConfig{},
buckets: map[string]nats.ObjectStore{},
buckets: &sync.Map{},
storageType: nats.FileStorage,
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func (n *natsStore) Init(opts ...store.Option) error {
if err != nil {
return errors.Wrapf(err, "Failed to create bucket (%s)", cfg.Bucket)
}
n.buckets[cfg.Bucket] = store
n.buckets.Store(cfg.Bucket, store)
}

return nil
Expand Down Expand Up @@ -178,10 +178,11 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
opt.Table = n.opts.Table
}

bucket, ok := n.buckets[opt.Database]
b, ok := n.buckets.Load(opt.Database)
if !ok {
return nil, ErrBucketNotFound
}
bucket := b.(nats.ObjectStore)

var keys []string

Expand Down Expand Up @@ -291,8 +292,8 @@ func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error {
opt.Table = n.opts.Table
}

store, ok := n.buckets[opt.Database]

s, ok := n.buckets.Load(opt.Database)
store, _ := s.(nats.ObjectStore)
// Create new bucket if not exists
if !ok {
var err error
Expand Down Expand Up @@ -344,18 +345,19 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
}

if opt.Table == "DELETE_BUCKET" {
delete(n.buckets, key)
n.buckets.Delete(key)
if err := n.js.DeleteObjectStore(key); err != nil {
return errors.Wrap(err, "Failed to delete bucket")
}
return nil
}

store, ok := n.buckets[opt.Database]
s, ok := n.buckets.Load(opt.Database)
if !ok {
return ErrBucketNotFound
}

store := s.(nats.ObjectStore)
if err := store.Delete(getKey(key, opt.Table)); err != nil {
return errors.Wrap(err, "Failed to delete data")
}
Expand All @@ -381,10 +383,11 @@ func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
opt.Table = n.opts.Table
}

store, ok := n.buckets[opt.Database]
s, ok := n.buckets.Load(opt.Database)
if !ok {
return nil, ErrBucketNotFound
}
store := s.(nats.ObjectStore)

objects, err := store.List()
if err != nil {
Expand Down Expand Up @@ -445,7 +448,7 @@ func (n *natsStore) createNewBucket(name string) (nats.ObjectStore, error) {
if err != nil {
return nil, errors.Wrapf(err, "Failed to create new bucket (%s)", name)
}
n.buckets[name] = store
n.buckets.Store(name, store)
return store, err
}

Expand Down

0 comments on commit 9463c82

Please sign in to comment.