Skip to content

Commit

Permalink
avoid concurrent map writes
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Oct 18, 2023
1 parent bc05fb0 commit b377da7
Showing 1 changed file with 33 additions and 16 deletions.
49 changes: 33 additions & 16 deletions v4/store/nats-js/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ func (n *natsStore) Options() store.Options {

// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error.
func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
if n.conn == nil {
if err := n.Init(); err != nil {
return nil, err
}
if err := n.initConn(); err != nil {
return nil, err
}

opt := store.ReadOptions{}
Expand Down Expand Up @@ -276,10 +274,8 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,

// Write writes a record to the store, and returns an error if the record was not written.
func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error {
if n.conn == nil {
if err := n.Init(); err != nil {
return err
}
if err := n.initConn(); err != nil {
return err
}

opt := store.WriteOptions{}
Expand Down Expand Up @@ -330,10 +326,8 @@ func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error {

// Delete removes the record with the corresponding key from the store.
func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
if n.conn == nil {
if err := n.Init(); err != nil {
return err
}
if err := n.initConn(); err != nil {
return err
}

opt := store.DeleteOptions{}
Expand Down Expand Up @@ -370,10 +364,8 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {

// List returns any keys that match, or an empty list with no error if none matched.
func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
if n.conn == nil {
if err := n.Init(); err != nil {
return nil, err
}
if err := n.initConn(); err != nil {
return nil, err
}

opt := store.ListOptions{}
Expand Down Expand Up @@ -456,3 +448,28 @@ func (n *natsStore) createNewBucket(name string) (nats.ObjectStore, error) {
n.buckets[name] = store
return store, err
}

// thread safe way to initialize the connection.
func (n *natsStore) initConn() error {
if n.hasConn() {
return nil
}

n.Lock()
defer n.Unlock()

// check if conn was initialized meanwhile
if n.conn != nil {
return nil
}

return n.Init()
}

// thread safe way to check if n is initialized.
func (n *natsStore) hasConn() bool {
n.RLock()
defer n.Unlock()

return n.conn != nil
}

0 comments on commit b377da7

Please sign in to comment.