From 252f9526ad9bfafedf0d468009ad74cf0a0d802a Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 3 Jul 2024 10:40:14 -0700 Subject: [PATCH] Implement `ishard` utility to archive objects according to subdirectory paths Signed-off-by: Tony Chen --- cmd/ishard/main.go | 20 +++ tools/ishard/README.md | 54 ++++++++ tools/ishard/config/config.go | 66 ++++++++++ tools/ishard/ishard.go | 217 +++++++++++++++++++++++++++++++ tools/ishard/ishard_test.go | 234 ++++++++++++++++++++++++++++++++++ 5 files changed, 591 insertions(+) create mode 100644 cmd/ishard/main.go create mode 100644 tools/ishard/README.md create mode 100644 tools/ishard/config/config.go create mode 100644 tools/ishard/ishard.go create mode 100644 tools/ishard/ishard_test.go diff --git a/cmd/ishard/main.go b/cmd/ishard/main.go new file mode 100644 index 0000000000..4125adc3af --- /dev/null +++ b/cmd/ishard/main.go @@ -0,0 +1,20 @@ +// Package main for the `ishard` executable. +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + */ +package main + +import ( + "fmt" + "os" + + "github.com/NVIDIA/aistore/tools/ishard" +) + +func main() { + ishard.Init(nil) + if err := ishard.Start(); err != nil { + fmt.Fprintf(os.Stderr, "ishard exited with error: %v\n", err) + os.Exit(1) + } +} diff --git a/tools/ishard/README.md b/tools/ishard/README.md new file mode 100644 index 0000000000..c01643f3a0 --- /dev/null +++ b/tools/ishard/README.md @@ -0,0 +1,54 @@ +# Initial Sharder + +This package provides a utility for archiving objects from a bucket into shard files. It ensures that objects from the same directory are not split across different output shards. + +## Assumptions + +**Do Not Split A Record Across Directories:** The utility assumes that objects within the same record should NOT be split across multiple subdirectories. This ensures that members of the same record are archived together. Users will be able to customize the **record key** and the **extensions** associated with the record through configuration (feature to be implemented). + +> This limitation will become a configurable user selection at a later point, with capabilities to customize record keys, etc. + +## Parameters + +- `-shard_size`: The desired size of each output shard in bytes. Default is `1024000`. +- `-src_bck`: The source bucket name or URI. If empty, a bucket with a random name will be created. +- `-dst_bck`: The destination bucket name or URI. If empty, a bucket with a random name will be created. +- `-src_bck_provider`: The provider for the source bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`. +- `-dst_bck_provider`: The provider for the destination bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`. +- `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size. + +## Initial Setup + +**Build the Package:** + +```sh +go build -o ishard . +``` + +## Sample Usage + +```sh +./ishard -shard_size=10240 -src_bck=source_bucket -dst_bck=destination_bucket -src_bck_provider=ais -dst_bck_provider=ais +``` + +## Running the Tests + +```sh +go test -v +``` + +## TODO List + +### MUST HAVE/DESIRABLE +- [ ] Shard name patterns + - [ ] Utilize existing name template tools +- [ ] goroutine +- [ ] configurable record key, extensions +- [ ] logging (timestamp, nlog) +- [ ] reports missing member in a record +- [ ] allow user to specify target directories to include/exclude +- [ ] E2E testing from CLI + +### GOOD TO HAVE +- [ ] progress bar (later) +- [ ] integration into aistore (later) diff --git a/tools/ishard/config/config.go b/tools/ishard/config/config.go new file mode 100644 index 0000000000..bdf1768dd6 --- /dev/null +++ b/tools/ishard/config/config.go @@ -0,0 +1,66 @@ +// Package config provides types and functions to configure ishard executable. +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + */ +package config + +import ( + "flag" + + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" +) + +type ( + ClusterConfig struct { + URL string + } + IshardConfig struct { + MaxShardSize int64 + StartIdx int + IdxDigits int + Ext string + Prefix string + Collapse bool + } + Config struct { + ClusterConfig + IshardConfig + SrcBck cmn.Bck + DstBck cmn.Bck + } +) + +const ( + defaultClusterIPv4 = "127.0.0.1" + defaultProxyPort = "8080" +) + +var DefaultConfig = Config{ + ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort}, + IshardConfig: IshardConfig{MaxShardSize: 102400, StartIdx: 0, IdxDigits: 4, Ext: ".tar", Prefix: "shard-", Collapse: false}, + SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS}, + DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS}, +} + +// Load configuration for ishard from cli, or spec files (TODO) +func Load() (*Config, error) { + cfg := DefaultConfig + parseCliParams(&cfg) + return &cfg, nil +} + +func parseCliParams(cfg *Config) { + flag.Int64Var(&cfg.MaxShardSize, "max_shard_size", 1024000, "desired size of each output shard") + flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "the source bucket name or URI. If empty, a bucket with random name will be created") + flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "the destination bucket name or URI. If empty, a bucket with random name will be created") + flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.") + flag.Parse() + + if cfg.SrcBck.Provider, cfg.SrcBck.Name = cmn.ParseURLScheme(cfg.SrcBck.Name); cfg.SrcBck.Provider == "" { + cfg.SrcBck.Provider = apc.AIS + } + if cfg.DstBck.Provider, cfg.DstBck.Name = cmn.ParseURLScheme(cfg.DstBck.Name); cfg.DstBck.Provider == "" { + cfg.DstBck.Provider = apc.AIS + } +} diff --git a/tools/ishard/ishard.go b/tools/ishard/ishard.go new file mode 100644 index 0000000000..7dcf8caf29 --- /dev/null +++ b/tools/ishard/ishard.go @@ -0,0 +1,217 @@ +// Package ishard provides utility for shard the initial dataset +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + */ +package ishard + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/NVIDIA/aistore/api" + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/nlog" + "github.com/NVIDIA/aistore/ext/dsort/shard" + "github.com/NVIDIA/aistore/tools/ishard/config" +) + +var ( + cfg *config.Config + baseParams api.BaseParams +) + +var ( + shardIdx = 0 +) + +// Represents the hierarchical structure of virtual directories within a bucket +type Node struct { + children map[string]*Node + records *shard.Records +} + +func NewNode() *Node { + return &Node{ + children: make(map[string]*Node), + records: shard.NewRecords(16), + } +} + +func (n *Node) Insert(path string, size int64) { + parts := strings.Split(path, "/") + current := n + + for i, part := range parts { + if _, exists := current.children[part]; !exists { + if i == len(parts)-1 { + ext := filepath.Ext(path) + base := strings.TrimSuffix(path, ext) + current.records.Insert(&shard.Record{ + Key: base, + Name: base, + Objects: []*shard.RecordObj{{ + ContentPath: path, + StoreType: shard.SGLStoreType, + Offset: 0, + MetadataSize: 0, + Size: size, + Extension: ext, + }}, + }) + } else { + current.children[part] = NewNode() + } + } + current = current.children[part] + } +} + +func (n *Node) Print(prefix string) { + for name, child := range n.children { + fmt.Printf("%s%s/", prefix, name) + names := []string{} + child.records.Lock() + for _, r := range child.records.All() { + names = append(names, r.Name) + } + child.records.Unlock() + fmt.Println(names) + + child.Print(prefix + " ") + } +} + +// Archive objects from this node into shards according to subdirectories structure +func (n *Node) Archive() error { + paths := []string{} + _, err := archiveNode(n, "", &paths) + + if cfg.Collapse && len(paths) != 0 { + msg := cmn.ArchiveBckMsg{ + ToBck: cfg.DstBck, + ArchiveMsg: apc.ArchiveMsg{ + ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), + ListRange: apc.ListRange{ + ObjNames: paths, + }, + }, + } + + if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { + return fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) + } + } + return err +} + +func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) { + totalSize := int64(0) + paths := []string{} + + for name, child := range node.children { + fullPath := path + "/" + name + if path == "" { + fullPath = name + } + subtreeSize, err := archiveNode(child, fullPath, &paths) + if err != nil { + return 0, err + } + totalSize += subtreeSize + } + + node.records.Lock() + for _, record := range node.records.All() { + totalSize += record.TotalSize() + for _, obj := range record.Objects { + paths = append(paths, record.MakeUniqueName(obj)) + } + if totalSize > cfg.MaxShardSize { + msg := cmn.ArchiveBckMsg{ + ToBck: cfg.DstBck, + ArchiveMsg: apc.ArchiveMsg{ + ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), + ListRange: apc.ListRange{ + ObjNames: paths, + }, + }, + } + + if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { + return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) + } + + shardIdx++ + totalSize = 0 + paths = []string{} + } + } + node.records.Unlock() + + if len(paths) == 0 { + return 0, nil + } + + // Allow to flatten remaining objects into parent directory + if cfg.Collapse { + *parentPaths = append(*parentPaths, paths...) + paths = nil + return totalSize, nil + } + + // Otherwise, archive all remaining objects regardless the current total size + msg := cmn.ArchiveBckMsg{ + ToBck: cfg.DstBck, + ArchiveMsg: apc.ArchiveMsg{ + ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), + ListRange: apc.ListRange{ + ObjNames: paths, + }, + }, + } + + if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { + return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) + } + shardIdx++ + return totalSize, nil +} + +// Init sets the configuration for ishard. If a config is provided, it uses that; +// otherwise, it loads from CLI or uses the default config. +func Init(cfgArg *config.Config) error { + var err error + + // Use provided config if given + if cfgArg != nil { + cfg = cfgArg + } else { + cfg, err = config.Load() + if err != nil { + nlog.Errorf("Error initializing config: %v. Using default config.", err) + defaultCfg := config.DefaultConfig + cfg = &defaultCfg + } + } + + baseParams = api.BaseParams{URL: cfg.URL} + baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}) + return err +} + +func Start() error { + msg := &apc.LsoMsg{} + objList, err := api.ListObjects(baseParams, cfg.SrcBck, msg, api.ListArgs{}) + if err != nil { + return err + } + + root := NewNode() + for _, en := range objList.Entries { + root.Insert(en.Name, en.Size) + } + + return root.Archive() +} diff --git a/tools/ishard/ishard_test.go b/tools/ishard/ishard_test.go new file mode 100644 index 0000000000..9585dcdf84 --- /dev/null +++ b/tools/ishard/ishard_test.go @@ -0,0 +1,234 @@ +// Package main for the `ishard` executable. +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + */ +package ishard_test + +import ( + "bytes" + "fmt" + "math/rand/v2" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/NVIDIA/aistore/api" + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/tools" + "github.com/NVIDIA/aistore/tools/ishard" + "github.com/NVIDIA/aistore/tools/ishard/config" + "github.com/NVIDIA/aistore/tools/readers" + "github.com/NVIDIA/aistore/tools/tarch" + "github.com/NVIDIA/aistore/tools/tassert" + "github.com/NVIDIA/aistore/tools/trand" +) + +func TestNoRecordsSplit(t *testing.T) { + testCases := []struct { + numRecords int + numExtensions int + fileSize int64 + collapse bool + }{ + {numRecords: 50, numExtensions: 2, fileSize: 32 * cos.KiB, collapse: false}, + {numRecords: 200, numExtensions: 4, fileSize: 48 * cos.KiB, collapse: true}, + {numRecords: 50, numExtensions: 2, fileSize: 32 * cos.KiB, collapse: false}, + {numRecords: 200, numExtensions: 4, fileSize: 48 * cos.KiB, collapse: true}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("Records:%d/Extensions:%d/FileSize:%d/Collapse:%v", tc.numRecords, tc.numExtensions, tc.fileSize, tc.collapse), func(t *testing.T) { + var ( + cfg = &config.Config{ + SrcBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + DstBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + IshardConfig: config.IshardConfig{ + MaxShardSize: 102400, + Collapse: tc.collapse, + StartIdx: 0, + IdxDigits: 4, + Ext: ".tar", + Prefix: "shard-", + }, + ClusterConfig: config.DefaultConfig.ClusterConfig, + } + baseParams = api.BaseParams{ + URL: cfg.URL, + Client: cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}), + } + ) + + t.Logf("using %s as input bucket\n", cfg.SrcBck) + tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) + t.Logf("using %s as output bucket\n", cfg.DstBck) + tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) + tassert.CheckError(t, generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, tc.numExtensions, tc.fileSize)) + + tassert.CheckError(t, ishard.Init(cfg)) + tassert.CheckError(t, ishard.Start()) + + time.Sleep(time.Second * 3) // wait for api.ArchiveMultiObj to complete + + shardContents, err := getShardContents(baseParams, cfg.DstBck) + tassert.CheckError(t, err) + + recordToTarballs := make(map[string]string) + + totalFileNum := 0 + for tarball, files := range shardContents { + for _, file := range files { + totalFileNum++ + record := getRecordName(file) + existingTarball, exists := recordToTarballs[record] + tassert.Fatalf(t, !exists || existingTarball == tarball, "Found split records") + recordToTarballs[record] = tarball + } + } + tassert.Fatalf(t, totalFileNum == tc.numRecords*tc.numExtensions, "The total number of files in output shards doesn't match to the initially generated amount") + }) + } +} + +func TestMaxShardSize(t *testing.T) { + testCases := []struct { + numRecords int + fileSize int64 + maxShardSize int64 + }{ + {numRecords: 50, fileSize: 32 * cos.KiB, maxShardSize: 128 * cos.KiB}, + {numRecords: 100, fileSize: 96 * cos.KiB, maxShardSize: 256 * cos.KiB}, + {numRecords: 200, fileSize: 24 * cos.KiB, maxShardSize: 16 * cos.KiB}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("Records:%d/FileSize:%d/MaxShardSize:%d", tc.numRecords, tc.fileSize, tc.maxShardSize), func(t *testing.T) { + var ( + cfg = &config.Config{ + SrcBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + DstBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + IshardConfig: config.IshardConfig{ + MaxShardSize: tc.maxShardSize, + Collapse: true, + StartIdx: 0, + IdxDigits: 4, + Ext: ".tar", + Prefix: "shard-", + }, + ClusterConfig: config.DefaultConfig.ClusterConfig, + } + baseParams = api.BaseParams{ + URL: cfg.URL, + Client: cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}), + } + numExtensions = 3 + ) + + t.Logf("using %s as input bucket\n", cfg.SrcBck) + tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) + t.Logf("using %s as output bucket\n", cfg.DstBck) + tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) + tassert.CheckError(t, generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize)) + + tassert.CheckError(t, ishard.Init(cfg)) + tassert.CheckError(t, ishard.Start()) + time.Sleep(time.Second * 3) // wait for api.ArchiveMultiObj to complete + + tarballs, err := api.ListObjects(baseParams, cfg.DstBck, &apc.LsoMsg{}, api.ListArgs{}) + tassert.CheckError(t, err) + + for _, en := range tarballs.Entries { + tassert.Fatalf(t, en.Size > tc.maxShardSize, "The output shard size doesn't reach to the desired amount") + } + }) + } +} + +// Helper function to generate a nested directory structure +func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecords, numExtensions int, fileSize int64) error { + extensions := make([]string, 0, numExtensions) + + for range numExtensions { + extensions = append(extensions, "."+trand.String(3)) + } + + randomFilePath := func() string { + levels := rand.IntN(3) + 1 // Random number of subdirectory levels (1-3) + parts := make([]string, levels) + for i := range levels { + parts[i] = trand.String(3) + } + return filepath.Join(parts...) + } + + basePath := randomFilePath() + for range numRecords { + dice := rand.IntN(5) + if dice == 0 { // 1/5 chance to change to a new directory + basePath = randomFilePath() + } else if dice < 2 { // 1/5 chance to extend current directory + basePath += randomFilePath() + } + baseName := trand.String(5) + for _, ext := range extensions { + objectName := filepath.Join(basePath, baseName+ext) + size := rand.Int64N(fileSize) + r, _ := readers.NewRand(size, cos.ChecksumNone) + if _, err := api.PutObject(&api.PutArgs{ + BaseParams: baseParams, + Bck: bucket, + ObjName: objectName, + Reader: r, + Size: uint64(size), + }); err != nil { + return err + } + } + } + return nil +} + +func getShardContents(baseParams api.BaseParams, bucket cmn.Bck) (map[string][]string, error) { + msg := &apc.LsoMsg{} + objList, err := api.ListObjects(baseParams, bucket, msg, api.ListArgs{}) + if err != nil { + return nil, err + } + + shardContents := make(map[string][]string) + for _, en := range objList.Entries { + var buffer bytes.Buffer + _, err := api.GetObject(baseParams, bucket, en.Name, &api.GetArgs{Writer: &buffer}) + if err != nil { + return nil, err + } + files, err := tarch.GetFilesFromArchBuffer(".tar", buffer, ".tar") + if err != nil { + return nil, err + } + for _, file := range files { + shardContents[en.Name] = append(shardContents[en.Name], file.Name) + } + } + + return shardContents, nil +} + +func getRecordName(filePath string) string { + base := filePath[strings.LastIndex(filePath, "/")+1:] + return base[:strings.LastIndex(base, ".")] +}