Skip to content

Commit

Permalink
Read a single key without listing all keys
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Aug 2, 2023
1 parent 1ada012 commit ea6298b
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions v4/store/nats-js/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,63 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
}

var keys []string
objects, err := bucket.List()
if err == nats.ErrNoObjectsFound {
return []*store.Record{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "Failed to list objects")

var keyPrefix, keySuffix string

switch {
case opt.Prefix:
keyPrefix = getKey(key, opt.Table)
case opt.Suffix:
keySuffix = key
default:
keys = []string{getKey(key, opt.Table)}
}

for _, obj := range objects {
name := obj.Name
if (!opt.Prefix && !opt.Suffix) && getKey(key, opt.Table) != name {
continue
}
if opt.Prefix && !strings.HasPrefix(name, getKey(key, opt.Table)) {
continue
if len(keys) == 0 {
objects, err := bucket.List()
if err == nats.ErrNoObjectsFound {
return []*store.Record{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "Failed to list objects")
}

if opt.Suffix && !strings.HasSuffix(name, key) {
continue
for _, obj := range objects {
name := obj.Name
if !strings.HasPrefix(name, opt.Table) {
continue
}
if (!opt.Prefix && !opt.Suffix) && key != name {
continue
}
if opt.Prefix && !strings.HasPrefix(name, keyPrefix) {
continue
}
if opt.Suffix && !strings.HasSuffix(name, keySuffix) {
continue
}
keys = append(keys, name)
}
keys = append(keys, name)
}

records := []*store.Record{}
for _, key := range keys {
obj, err := bucket.Get(key)
if err != nil {
if err == nats.ErrObjectNotFound {
return []*store.Record{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "Failed to get object from bucket")
}

b, err := io.ReadAll(obj)
if err != nil {
return nil, errors.Wrap(err, "Failed to read returned bytes")
}
defer obj.Close()

info, err := obj.Info()
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch record info")
}

if info.Deleted {
continue
}

metadata := map[string]interface{}{}
for key, value := range info.Headers {
var val interface{}
Expand All @@ -234,14 +252,17 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
metadata[key] = val
}

b, err := io.ReadAll(obj)
if err != nil {
return nil, errors.Wrap(err, "Failed to read returned bytes")
}

records = append(records, &store.Record{
Key: key,
Value: b,
Metadata: metadata,
})

// Why is there a close method?
obj.Close()
}

if opt.Limit > 0 {
Expand Down

0 comments on commit ea6298b

Please sign in to comment.