Skip to content

Commit

Permalink
list-objects: amend listing virtual ("synthetic") dirs
Browse files Browse the repository at this point in the history
* aws and gcp backends to handle virtual dirs, set 'is-dir' bit
* (azure TBD)
* always return virtual directories (if any) - unless
  explicitly disallowed via '--no-dirs' switch
* move '--no-dirs' logic to the backends
* part three, prev. commits: 0b14d0e, a977325

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 27, 2024
1 parent 0b14d0e commit da0606f
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 94 deletions.
54 changes: 27 additions & 27 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,36 +320,36 @@ func (*s3bp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode

var (
custom cos.StrKVs
l = len(resp.Contents)
wantCustom = msg.WantProp(apc.GetPropsCustom)
)
for i := len(lst.Entries); i < l; i++ {
lst.Entries = append(lst.Entries, &cmn.LsoEnt{}) // add missing empty
}
if wantCustom {
custom = make(cos.StrKVs, 2) // reuse
}
for i, obj := range resp.Contents {
entry := lst.Entries[i]
entry.Name = *obj.Key
entry.Size = *obj.Size
if msg.IsFlagSet(apc.LsNameOnly) || msg.IsFlagSet(apc.LsNameSize) {
continue
}
if v, ok := h.EncodeCksum(obj.ETag); ok {
entry.Checksum = v
}
if wantCustom {
custom[cmn.ETag] = entry.Checksum
mtime := *(obj.LastModified)
custom[cmn.LastModified] = fmtTime(mtime)
entry.Custom = cmn.CustomMD2S(custom)
lst.Entries = lst.Entries[:0]
for _, obj := range resp.Contents {
en := cmn.LsoEnt{Name: *obj.Key, Size: *obj.Size}
// rarely
if en.Size == 0 && cos.IsLastB(en.Name, '/') {
if msg.IsFlagSet(apc.LsNoDirs) {
continue
}
en.Flags = apc.EntryIsDir
} else if !msg.IsFlagSet(apc.LsNameOnly) && !msg.IsFlagSet(apc.LsNameSize) {
if v, ok := h.EncodeCksum(obj.ETag); ok {
en.Checksum = v
}
if wantCustom {
custom[cmn.ETag] = en.Checksum
mtime := *(obj.LastModified)
custom[cmn.LastModified] = fmtTime(mtime)
en.Custom = cmn.CustomMD2S(custom)
}
}
lst.Entries = append(lst.Entries, &en)
}
lst.Entries = lst.Entries[:l]

// append virtual directories if:
if msg.IsFlagSet(apc.LsNoRecursion) {
// append virtual directories unless '--no-dirs'
if !msg.IsFlagSet(apc.LsNoDirs) {
for _, dir := range resp.CommonPrefixes {
lst.Entries = append(lst.Entries, &cmn.LsoEnt{Name: *dir.Prefix, Flags: apc.EntryIsDir})
}
Expand All @@ -373,8 +373,8 @@ func (*s3bp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode
verParams = &s3.ListObjectVersionsInput{Bucket: aws.String(cloudBck.Name)}
num int
)
for _, entry := range lst.Entries {
verParams.Prefix = aws.String(entry.Name)
for _, en := range lst.Entries {
verParams.Prefix = aws.String(en.Name)
verResp, err := svc.ListObjectVersions(context.Background(), verParams)
if err != nil {
return awsErrorToAISError(err, cloudBck, "")
Expand All @@ -383,10 +383,10 @@ func (*s3bp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode
if latest := *(vers.IsLatest); !latest {
continue
}
if key := *(vers.Key); key == entry.Name {
if key := *(vers.Key); key == en.Name {
v, ok := h.EncodeVersion(vers.VersionId)
debug.Assert(ok, entry.Name+": "+*(vers.VersionId))
entry.Version = v
debug.Assert(ok, en.Name+": "+*(vers.VersionId))
en.Version = v
num++
}
}
Expand Down
31 changes: 13 additions & 18 deletions ais/backend/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (azbp *azbp) HeadBucket(ctx context.Context, bck *meta.Bck) (cos.StrKVs, in

// TODO: support non-recursive (apc.LsNoRecursion) operation, as in:
// $ az storage blob list -c abc --prefix sub/ --delimiter /
// TODO: research "hierarchical namespaces"
// See also: aws.go, gcp.go
func (azbp *azbp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (int, error) {
msg.PageSize = calcPageSize(msg.PageSize, bck.MaxPageSize())
Expand Down Expand Up @@ -275,32 +276,26 @@ func (azbp *azbp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (

var (
custom cos.StrKVs
l = len(resp.Segment.BlobItems)
wantCustom = msg.WantProp(apc.GetPropsCustom)
)
for i := len(lst.Entries); i < l; i++ {
lst.Entries = append(lst.Entries, &cmn.LsoEnt{}) // add missing empty
}
if wantCustom {
custom = make(cos.StrKVs, 4) // reuse
}
for idx := range resp.Segment.BlobItems {
var (
blob = resp.Segment.BlobItems[idx]
entry = lst.Entries[idx]
)
entry.Name = *blob.Name
entry.Size = *blob.Properties.ContentLength
lst.Entries = lst.Entries[:0]
for _, blob := range resp.Segment.BlobItems {
en := cmn.LsoEnt{Name: *blob.Name, Size: *blob.Properties.ContentLength}

// not expecting directories
debug.Assert(en.Name != "" && !cos.IsLastB(en.Name, '/'), en.Name)

if msg.IsFlagSet(apc.LsNameOnly) || msg.IsFlagSet(apc.LsNameSize) {
lst.Entries = append(lst.Entries, &en)
continue
}

entry.Checksum = azEncodeChecksum(blob.Properties.ContentMD5)

en.Checksum = azEncodeChecksum(blob.Properties.ContentMD5)
etag := azEncodeEtag(*blob.Properties.ETag)
entry.Version = etag // (TODO a the top)

// custom
en.Version = etag // (TODO a the top)
if wantCustom {
clear(custom)
custom[cmn.ETag] = etag
Expand All @@ -313,10 +308,10 @@ func (azbp *azbp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (
if blob.VersionID != nil {
custom[cmn.VersionObjMD] = *blob.VersionID
}
entry.Custom = cmn.CustomMD2S(custom)
en.Custom = cmn.CustomMD2S(custom)
}
lst.Entries = append(lst.Entries, &en)
}
lst.Entries = lst.Entries[:l]

if resp.NextMarker != nil {
lst.ContinuationToken = *resp.NextMarker
Expand Down
68 changes: 31 additions & 37 deletions ais/backend/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
Expand Down Expand Up @@ -157,14 +158,12 @@ func (*gsbp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode
msg.PageSize = calcPageSize(msg.PageSize, bck.MaxPageSize())

if prefix := msg.Prefix; prefix != "" {
var delim string
query = &storage.Query{Prefix: prefix}
if msg.IsFlagSet(apc.LsNoRecursion) {
// NOTE: important to indicate subdirectory with trailing '/'
if cos.IsLastB(prefix, '/') {
delim = "/"
}
query.Delimiter = "/"
}
query = &storage.Query{Prefix: prefix, Delimiter: delim}
} else if msg.IsFlagSet(apc.LsNoRecursion) {
query = &storage.Query{Delimiter: "/"}
}

var (
Expand All @@ -185,47 +184,42 @@ func (*gsbp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode

var (
custom cos.StrKVs
i int
l = len(objs)
wantCustom = msg.WantProp(apc.GetPropsCustom)
)
for j := len(lst.Entries); j < l; j++ {
lst.Entries = append(lst.Entries, &cmn.LsoEnt{}) // add missing empty
}
if wantCustom {
custom = make(cos.StrKVs, 3) // reuse
}
lst.Entries = lst.Entries[:0]
for _, attrs := range objs {
if msg.IsFlagSet(apc.LsNoRecursion) {
if attrs.Name == "" {
entry := lst.Entries[i]
entry.Name = attrs.Prefix
entry.Flags = apc.EntryIsDir
i++
en := cmn.LsoEnt{Name: attrs.Name, Size: attrs.Size}
if attrs.Prefix != "" {
// see "Prefix"
// ref: https://github.com/googleapis/google-cloud-go/blob/main/storage/storage.go#L1407-L1411
debug.Assert(attrs.Name == "", attrs.Prefix, " vs ", attrs.Name)
debug.Assert(query != nil && query.Delimiter != "")

if msg.IsFlagSet(apc.LsNoDirs) {
continue
}
en.Name = attrs.Prefix
en.Flags = apc.EntryIsDir
} else if !msg.IsFlagSet(apc.LsNameOnly) && !msg.IsFlagSet(apc.LsNameSize) {
if v, ok := h.EncodeCksum(attrs.MD5); ok {
en.Checksum = v
}
if v, ok := h.EncodeVersion(attrs.Generation); ok {
en.Version = v
}
// custom
if wantCustom {
custom[cmn.ETag], _ = h.EncodeCksum(attrs.Etag)
custom[cmn.LastModified] = fmtTime(attrs.Updated)
custom[cos.HdrContentType] = attrs.ContentType
en.Custom = cmn.CustomMD2S(custom)
}
}
entry := lst.Entries[i]
i++
entry.Name, entry.Size = attrs.Name, attrs.Size
if msg.IsFlagSet(apc.LsNameOnly) || msg.IsFlagSet(apc.LsNameSize) {
continue
}
if v, ok := h.EncodeCksum(attrs.MD5); ok {
entry.Checksum = v
}
if v, ok := h.EncodeVersion(attrs.Generation); ok {
entry.Version = v
}
// custom
if wantCustom {
custom[cmn.ETag], _ = h.EncodeCksum(attrs.Etag)
custom[cmn.LastModified] = fmtTime(attrs.Updated)
custom[cos.HdrContentType] = attrs.ContentType
entry.Custom = cmn.CustomMD2S(custom)
}
lst.Entries = append(lst.Entries, &en)
}
lst.Entries = lst.Entries[:i]

if cmn.Rom.FastV(4, cos.SmoduleBackend) {
nlog.Infof("[list_objects] count %d", len(lst.Entries))
Expand Down
6 changes: 3 additions & 3 deletions ais/plstcx.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *lstcx) do() (string, error) {
Props: apc.GetPropsName,
PageSize: 0, // i.e., backend.MaxPageSize()
}
c.lsmsg.SetFlag(apc.LsNameOnly)
c.lsmsg.SetFlag(apc.LsNameOnly | apc.LsNoDirs)
c.smap = c.p.owner.smap.get()
tsi, err := c.smap.HrwTargetTask(c.lsmsg.UUID)
if err != nil {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *lstcx) do() (string, error) {
lr, cnt := &c.tcomsg.ListRange, len(lst.Entries)
lr.ObjNames = make([]string, 0, cnt)
for _, e := range lst.Entries {
if e.IsDir() || cos.IsLastB(e.Name, '/') { // skip virtual dir (apc.EntryIsDir)
if e.IsDir() { // NOTE: always skip virtual dir (apc.EntryIsDir)
continue
}
lr.ObjNames = append(lr.ObjNames, e.Name)
Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *lstcx) _page() (int, error) {
clear(lr.ObjNames)
lr.ObjNames = lr.ObjNames[:0]
for _, e := range lst.Entries {
if e.IsDir() || cos.IsLastB(e.Name, '/') { // skip virtual dir (apc.EntryIsDir)
if e.IsDir() { // NOTE: always skip virtual dir (apc.EntryIsDir)
continue
}
lr.ObjNames = append(lr.ObjNames, e.Name)
Expand Down
15 changes: 8 additions & 7 deletions cmn/objlist_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func DedupLso(entries LsoEntries, maxSize int, noDirs bool) []*LsoEnt {
if j > 0 && entries[j-1].Name == en.Name {
continue
}
if noDirs && cos.IsLastB(en.Name, '/') {
continue
}

debug.Assert(!(noDirs && en.IsDir())) // expecting backends for filter out accordingly

entries[j] = en
j++

Expand Down Expand Up @@ -156,10 +156,9 @@ func MergeLso(lists []*LsoRes, lsmsg *apc.LsoMsg, maxSize int) *LsoRes {
token = l.ContinuationToken
}
for _, en := range l.Entries {
// drop
if noDirs && cos.IsLastB(en.Name, '/') {
continue
}
// expecting backends for filter out
debug.Assert(!(noDirs && en.IsDir()))

// add new
entry, exists := tmp[en.Name]
if !exists {
Expand Down Expand Up @@ -188,6 +187,8 @@ func MergeLso(lists []*LsoRes, lsmsg *apc.LsoMsg, maxSize int) *LsoRes {
resList.Entries = resList.Entries[:0]
resList.ContinuationToken = token

// TODO -- FIXME: sort virtual dirs separately =====================

for _, entry := range tmp {
resList.Entries = appSorted(resList.Entries, entry)
}
Expand Down
2 changes: 1 addition & 1 deletion xact/xs/lrit.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (r *lriterator) _prefix(wi lrwi, smap *meta.Smap) error {
if !be.IsStatusOK() {
continue
}
if be.IsDir() || cos.IsLastB(be.Name, '/') { // skip virtual dir (apc.EntryIsDir)
if be.IsDir() { // NOTE: always skip virtual dirs (apc.EntryIsDir)
continue
}
if r.done() {
Expand Down
2 changes: 1 addition & 1 deletion xact/xs/nsumm.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (r *XactNsumm) visitObj(lom *core.LOM, _ []byte) error {

func (r *XactNsumm) runCloudBck(bck *meta.Bck, res *cmn.BsummResult) {
lsmsg := &apc.LsoMsg{Props: apc.GetPropsSize, Prefix: r.p.msg.Prefix}
lsmsg.SetFlag(apc.LsNameSize)
lsmsg.SetFlag(apc.LsNameSize | apc.LsNoDirs)
for !r.IsAborted() {
npg := newNpgCtx(bck, lsmsg, noopCb, nil) // TODO -- FIXME: inventory offset
nentries := allocLsoEntries()
Expand Down

0 comments on commit da0606f

Please sign in to comment.