diff --git a/tools/ishard/README.md b/tools/ishard/README.md index c01643f3a0..878f46c712 100644 --- a/tools/ishard/README.md +++ b/tools/ishard/README.md @@ -13,8 +13,8 @@ This package provides a utility for archiving objects from a bucket into shard f - `-shard_size`: The desired size of each output shard in bytes. Default is `1024000`. - `-src_bck`: The source bucket name or URI. If empty, a bucket with a random name will be created. - `-dst_bck`: The destination bucket name or URI. If empty, a bucket with a random name will be created. -- `-src_bck_provider`: The provider for the source bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`. -- `-dst_bck_provider`: The provider for the destination bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`. +- `-shard_template`: the template used for generating output shards. Accepts Bash, Fmt, or At formats. +- `-ext`: the extension used for generating output shards. - `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size. ## Initial Setup @@ -22,13 +22,42 @@ This package provides a utility for archiving objects from a bucket into shard f **Build the Package:** ```sh -go build -o ishard . +$ cd cmd/ishard +$ go build -o ishard . ``` ## Sample Usage +Correct Usage + ```sh -./ishard -shard_size=10240 -src_bck=source_bucket -dst_bck=destination_bucket -src_bck_provider=ais -dst_bck_provider=ais +$ ./ishard -max_shard_size=1024000 -src_bck=source_bucket -dst_bck=destination_bucket -collapse -shard_template="prefix-{0000..1023..8}-suffix" +$ ais archive ls ais://destination_bucket + +NAME SIZE +prefix-0000-suffix.tar 1.01MiB +prefix-0008-suffix.tar 1.41MiB +prefix-0016-suffix.tar 1.00MiB +prefix-0024-suffix.tar 1.00MiB +prefix-0032-suffix.tar 1.05MiB +prefix-0040-suffix.tar 1.09MiB +prefix-0048-suffix.tar 1.04MiB +prefix-0056-suffix.tar 1.02MiB +prefix-0064-suffix.tar 1.02MiB +prefix-0072-suffix.tar 1.04MiB +prefix-0080-suffix.tar 1.03MiB +prefix-0088-suffix.tar 1.01MiB +prefix-0096-suffix.tar 1.02MiB +prefix-0104-suffix.tar 1.06MiB +... +``` + +Incorrect Usage +``` +$ ./ishard -max_shard_size=1024000 -src_bck=source_bucket -dst_bck=destination_bucket -collapse -shard_template="prefix-{0000..0050..8}-suffix" +Error: number of shards to be created exceeds expected number of shards (7) + + ``` ## Running the Tests @@ -40,8 +69,8 @@ go test -v ## TODO List ### MUST HAVE/DESIRABLE -- [ ] Shard name patterns - - [ ] Utilize existing name template tools +- [X] Shard name patterns + - [X] Utilize existing name template tools - [ ] goroutine - [ ] configurable record key, extensions - [ ] logging (timestamp, nlog) diff --git a/tools/ishard/config/config.go b/tools/ishard/config/config.go index bdf1768dd6..33a43f7d2f 100644 --- a/tools/ishard/config/config.go +++ b/tools/ishard/config/config.go @@ -16,12 +16,10 @@ type ( URL string } IshardConfig struct { - MaxShardSize int64 - StartIdx int - IdxDigits int - Ext string - Prefix string - Collapse bool + MaxShardSize int64 + Ext string + ShardTemplate string + Collapse bool } Config struct { ClusterConfig @@ -38,7 +36,7 @@ const ( var DefaultConfig = Config{ ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort}, - IshardConfig: IshardConfig{MaxShardSize: 102400, StartIdx: 0, IdxDigits: 4, Ext: ".tar", Prefix: "shard-", Collapse: false}, + IshardConfig: IshardConfig{MaxShardSize: 102400, Ext: ".tar", ShardTemplate: "shard-%d", Collapse: false}, SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS}, DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS}, } @@ -54,6 +52,8 @@ func parseCliParams(cfg *Config) { flag.Int64Var(&cfg.MaxShardSize, "max_shard_size", 1024000, "desired size of each output shard") flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "the source bucket name or URI. If empty, a bucket with random name will be created") flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "the destination bucket name or URI. If empty, a bucket with random name will be created") + flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%d", "the template used for generating output shards. Accepts Bash (prefix{0001..0010}suffix), Fmt (prefix-%06d-suffix), or At (prefix-@00001-gap-@100-suffix) templates") + flag.StringVar(&cfg.Ext, "ext", ".tar", "the extension used for generating output shards.") flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.") flag.Parse() diff --git a/tools/ishard/ishard.go b/tools/ishard/ishard.go index 7dcf8caf29..50ec4afb6b 100644 --- a/tools/ishard/ishard.go +++ b/tools/ishard/ishard.go @@ -12,6 +12,7 @@ import ( "github.com/NVIDIA/aistore/api" "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/ext/dsort/shard" "github.com/NVIDIA/aistore/tools/ishard/config" @@ -23,7 +24,8 @@ var ( ) var ( - shardIdx = 0 + shardIter cos.ParsedTemplate + shardCount int64 ) // Represents the hierarchical structure of virtual directories within a bucket @@ -86,24 +88,17 @@ func (n *Node) Print(prefix string) { // Archive objects from this node into shards according to subdirectories structure func (n *Node) Archive() error { paths := []string{} - _, err := archiveNode(n, "", &paths) + if _, err := archiveNode(n, "", &paths); err != nil { + return err + } if cfg.Collapse && len(paths) != 0 { - msg := cmn.ArchiveBckMsg{ - ToBck: cfg.DstBck, - ArchiveMsg: apc.ArchiveMsg{ - ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), - ListRange: apc.ListRange{ - ObjNames: paths, - }, - }, - } - - if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { - return fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) + if err := generateShard(paths); err != nil { + return err } } - return err + + return nil } func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) { @@ -129,21 +124,9 @@ func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) paths = append(paths, record.MakeUniqueName(obj)) } if totalSize > cfg.MaxShardSize { - msg := cmn.ArchiveBckMsg{ - ToBck: cfg.DstBck, - ArchiveMsg: apc.ArchiveMsg{ - ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), - ListRange: apc.ListRange{ - ObjNames: paths, - }, - }, - } - - if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { - return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) + if err := generateShard(paths); err != nil { + return 0, err } - - shardIdx++ totalSize = 0 paths = []string{} } @@ -162,10 +145,23 @@ func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) } // Otherwise, archive all remaining objects regardless the current total size + if err := generateShard(paths); err != nil { + return 0, err + } + + return totalSize, nil +} + +func generateShard(paths []string) error { + name, hasNext := shardIter.Next() + if !hasNext { + return fmt.Errorf("number of shards to be created exceeds expected number of shards (%d)", shardCount) + } + msg := cmn.ArchiveBckMsg{ ToBck: cfg.DstBck, ArchiveMsg: apc.ArchiveMsg{ - ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), + ArchName: name + cfg.Ext, ListRange: apc.ListRange{ ObjNames: paths, }, @@ -173,10 +169,10 @@ func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) } if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { - return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) + return fmt.Errorf("failed to archive shard %s: %w", name, err) } - shardIdx++ - return totalSize, nil + + return nil } // Init sets the configuration for ishard. If a config is provided, it uses that; @@ -198,6 +194,13 @@ func Init(cfgArg *config.Config) error { baseParams = api.BaseParams{URL: cfg.URL} baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}) + + if shardIter, err = cos.NewParsedTemplate(strings.TrimSpace(cfg.ShardTemplate)); err != nil { + return err + } + shardIter.InitIter() + shardCount = shardIter.Count() + return err } diff --git a/tools/ishard/ishard_test.go b/tools/ishard/ishard_test.go index e9516dde9e..9e007fe56b 100644 --- a/tools/ishard/ishard_test.go +++ b/tools/ishard/ishard_test.go @@ -9,6 +9,7 @@ import ( "fmt" "math/rand/v2" "path/filepath" + "regexp" "strings" "testing" "time" @@ -52,12 +53,10 @@ func TestNoRecordsSplit(t *testing.T) { Provider: apc.AIS, }, IshardConfig: config.IshardConfig{ - MaxShardSize: 102400, - Collapse: tc.collapse, - StartIdx: 0, - IdxDigits: 4, - Ext: ".tar", - Prefix: "shard-", + MaxShardSize: 102400, + Collapse: tc.collapse, + ShardTemplate: "shard-%d", + Ext: ".tar", }, ClusterConfig: config.DefaultConfig.ClusterConfig, } @@ -71,11 +70,12 @@ func TestNoRecordsSplit(t *testing.T) { tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) t.Logf("using %s as output bucket\n", cfg.DstBck) tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) - tassert.CheckError(t, generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, tc.numExtensions, tc.fileSize)) + + err := generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, tc.numExtensions, tc.fileSize) + tassert.CheckError(t, err) tassert.CheckError(t, ishard.Init(cfg)) tassert.CheckError(t, ishard.Start()) - time.Sleep(time.Second * 3) // wait for api.ArchiveMultiObj to complete shardContents, err := getShardContents(baseParams, cfg.DstBck) @@ -123,12 +123,10 @@ func TestMaxShardSize(t *testing.T) { Provider: apc.AIS, }, IshardConfig: config.IshardConfig{ - MaxShardSize: tc.maxShardSize, - Collapse: true, - StartIdx: 0, - IdxDigits: 4, - Ext: ".tar", - Prefix: "shard-", + MaxShardSize: tc.maxShardSize, + Collapse: true, + ShardTemplate: "shard-%d", + Ext: ".tar", }, ClusterConfig: config.DefaultConfig.ClusterConfig, } @@ -143,7 +141,9 @@ func TestMaxShardSize(t *testing.T) { tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) t.Logf("using %s as output bucket\n", cfg.DstBck) tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) - tassert.CheckError(t, generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize)) + + err := generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize) + tassert.CheckError(t, err) tassert.CheckError(t, ishard.Init(cfg)) tassert.CheckError(t, ishard.Start()) @@ -163,6 +163,80 @@ func TestMaxShardSize(t *testing.T) { } } +func TestShardTemplate(t *testing.T) { + testCases := []struct { + numRecords int + fileSize int64 + maxShardSize int64 + shardTemplate string + }{ + {numRecords: 50, fileSize: 32 * cos.KiB, maxShardSize: 128 * cos.KiB, shardTemplate: "prefix{0000..9999}-suffix"}, + {numRecords: 100, fileSize: 96 * cos.KiB, maxShardSize: 256 * cos.KiB, shardTemplate: "prefix-%06d-suffix"}, + {numRecords: 200, fileSize: 24 * cos.KiB, maxShardSize: 16 * cos.KiB, shardTemplate: "prefix-@00001-gap-@100-suffix"}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("Records:%d/FileSize:%d/MaxShardSize:%d/Template:%s", tc.numRecords, tc.fileSize, tc.maxShardSize, tc.shardTemplate), func(t *testing.T) { + var ( + cfg = &config.Config{ + SrcBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + DstBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + IshardConfig: config.IshardConfig{ + MaxShardSize: tc.maxShardSize, + Collapse: true, + ShardTemplate: tc.shardTemplate, + Ext: ".tar", + }, + ClusterConfig: config.DefaultConfig.ClusterConfig, + } + baseParams = api.BaseParams{ + URL: cfg.URL, + Client: cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}), + } + numExtensions = 3 + ) + + t.Logf("using %s as input bucket\n", cfg.SrcBck) + tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) + t.Logf("using %s as output bucket\n", cfg.DstBck) + tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) + + err := generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize) + tassert.CheckError(t, err) + + tassert.CheckError(t, ishard.Init(cfg)) + tassert.CheckError(t, ishard.Start()) + time.Sleep(time.Second * 3) // wait for api.ArchiveMultiObj to complete + + tarballs, err := api.ListObjects(baseParams, cfg.DstBck, &apc.LsoMsg{}, api.ListArgs{}) + tassert.CheckError(t, err) + + for _, en := range tarballs.Entries { + var expectedFormat string + switch tc.shardTemplate { + case "prefix{0000..9999}-suffix": + expectedFormat = `^prefix\d{4}-suffix\.tar$` + case "prefix-%06d-suffix": + expectedFormat = `^prefix-\d{6}-suffix\.tar$` + case "prefix-@00001-gap-@100-suffix": + expectedFormat = `^prefix-\d+-gap-\d+-suffix\.tar$` + default: + t.Fatalf("Unsupported shard template: %s", tc.shardTemplate) + } + + re := regexp.MustCompile(expectedFormat) + tassert.Fatalf(t, re.MatchString(en.Name), fmt.Sprintf("expected %s to match %s", en.Name, expectedFormat)) + } + }) + } +} + // Helper function to generate a nested directory structure func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecords, numExtensions int, fileSize int64) error { extensions := make([]string, 0, numExtensions)