Skip to content

Commit

Permalink
feat: used cos.ParsedTemplate to parse and generate shard's name
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jul 8, 2024
1 parent c0db9fa commit f0d3b6f
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 61 deletions.
41 changes: 35 additions & 6 deletions tools/ishard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,51 @@ This package provides a utility for archiving objects from a bucket into shard f
- `-shard_size`: The desired size of each output shard in bytes. Default is `1024000`.
- `-src_bck`: The source bucket name or URI. If empty, a bucket with a random name will be created.
- `-dst_bck`: The destination bucket name or URI. If empty, a bucket with a random name will be created.
- `-src_bck_provider`: The provider for the source bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`.
- `-dst_bck_provider`: The provider for the destination bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`.
- `-shard_template`: the template used for generating output shards. Accepts Bash, Fmt, or At formats.
- `-ext`: the extension used for generating output shards.
- `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.

## Initial Setup

**Build the Package:**

```sh
go build -o ishard .
$ cd cmd/ishard
$ go build -o ishard .
```

## Sample Usage

Correct Usage

```sh
./ishard -shard_size=10240 -src_bck=source_bucket -dst_bck=destination_bucket -src_bck_provider=ais -dst_bck_provider=ais
$ ./ishard -max_shard_size=1024000 -src_bck=source_bucket -dst_bck=destination_bucket -collapse -shard_template="prefix-{0000..1023..8}-suffix"
$ ais archive ls ais://destination_bucket

NAME SIZE
prefix-0000-suffix.tar 1.01MiB
prefix-0008-suffix.tar 1.41MiB
prefix-0016-suffix.tar 1.00MiB
prefix-0024-suffix.tar 1.00MiB
prefix-0032-suffix.tar 1.05MiB
prefix-0040-suffix.tar 1.09MiB
prefix-0048-suffix.tar 1.04MiB
prefix-0056-suffix.tar 1.02MiB
prefix-0064-suffix.tar 1.02MiB
prefix-0072-suffix.tar 1.04MiB
prefix-0080-suffix.tar 1.03MiB
prefix-0088-suffix.tar 1.01MiB
prefix-0096-suffix.tar 1.02MiB
prefix-0104-suffix.tar 1.06MiB
...
```

Incorrect Usage
```
$ ./ishard -max_shard_size=1024000 -src_bck=source_bucket -dst_bck=destination_bucket -collapse -shard_template="prefix-{0000..0050..8}-suffix"
Error: number of shards to be created exceeds expected number of shards (7)
```

## Running the Tests
Expand All @@ -40,8 +69,8 @@ go test -v
## TODO List

### MUST HAVE/DESIRABLE
- [ ] Shard name patterns
- [ ] Utilize existing name template tools
- [X] Shard name patterns
- [X] Utilize existing name template tools
- [ ] goroutine
- [ ] configurable record key, extensions
- [ ] logging (timestamp, nlog)
Expand Down
14 changes: 7 additions & 7 deletions tools/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ type (
URL string
}
IshardConfig struct {
MaxShardSize int64
StartIdx int
IdxDigits int
Ext string
Prefix string
Collapse bool
MaxShardSize int64
Ext string
ShardTemplate string
Collapse bool
}
Config struct {
ClusterConfig
Expand All @@ -38,7 +36,7 @@ const (

var DefaultConfig = Config{
ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort},
IshardConfig: IshardConfig{MaxShardSize: 102400, StartIdx: 0, IdxDigits: 4, Ext: ".tar", Prefix: "shard-", Collapse: false},
IshardConfig: IshardConfig{MaxShardSize: 102400, Ext: ".tar", ShardTemplate: "shard-%d", Collapse: false},
SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS},
DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS},
}
Expand All @@ -54,6 +52,8 @@ func parseCliParams(cfg *Config) {
flag.Int64Var(&cfg.MaxShardSize, "max_shard_size", 1024000, "desired size of each output shard")
flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "the source bucket name or URI. If empty, a bucket with random name will be created")
flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "the destination bucket name or URI. If empty, a bucket with random name will be created")
flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%d", "the template used for generating output shards. Accepts Bash (prefix{0001..0010}suffix), Fmt (prefix-%06d-suffix), or At (prefix-@00001-gap-@100-suffix) templates")
flag.StringVar(&cfg.Ext, "ext", ".tar", "the extension used for generating output shards.")
flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.")
flag.Parse()

Expand Down
69 changes: 36 additions & 33 deletions tools/ishard/ishard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort/shard"
"github.com/NVIDIA/aistore/tools/ishard/config"
Expand All @@ -23,7 +24,8 @@ var (
)

var (
shardIdx = 0
shardIter cos.ParsedTemplate
shardCount int64
)

// Represents the hierarchical structure of virtual directories within a bucket
Expand Down Expand Up @@ -86,24 +88,17 @@ func (n *Node) Print(prefix string) {
// Archive objects from this node into shards according to subdirectories structure
func (n *Node) Archive() error {
paths := []string{}
_, err := archiveNode(n, "", &paths)
if _, err := archiveNode(n, "", &paths); err != nil {
return err
}

if cfg.Collapse && len(paths) != 0 {
msg := cmn.ArchiveBckMsg{
ToBck: cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext),
ListRange: apc.ListRange{
ObjNames: paths,
},
},
}

if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil {
return fmt.Errorf("failed to archive shard %d: %w", shardIdx, err)
if err := generateShard(paths); err != nil {
return err
}
}
return err

return nil
}

func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) {
Expand All @@ -129,21 +124,9 @@ func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error)
paths = append(paths, record.MakeUniqueName(obj))
}
if totalSize > cfg.MaxShardSize {
msg := cmn.ArchiveBckMsg{
ToBck: cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext),
ListRange: apc.ListRange{
ObjNames: paths,
},
},
}

if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil {
return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err)
if err := generateShard(paths); err != nil {
return 0, err
}

shardIdx++
totalSize = 0
paths = []string{}
}
Expand All @@ -162,21 +145,34 @@ func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error)
}

// Otherwise, archive all remaining objects regardless the current total size
if err := generateShard(paths); err != nil {
return 0, err
}

return totalSize, nil
}

func generateShard(paths []string) error {
name, hasNext := shardIter.Next()
if !hasNext {
return fmt.Errorf("number of shards to be created exceeds expected number of shards (%d)", shardCount)
}

msg := cmn.ArchiveBckMsg{
ToBck: cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext),
ArchName: name + cfg.Ext,
ListRange: apc.ListRange{
ObjNames: paths,
},
},
}

if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil {
return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err)
return fmt.Errorf("failed to archive shard %s: %w", name, err)
}
shardIdx++
return totalSize, nil

return nil
}

// Init sets the configuration for ishard. If a config is provided, it uses that;
Expand All @@ -198,6 +194,13 @@ func Init(cfgArg *config.Config) error {

baseParams = api.BaseParams{URL: cfg.URL}
baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true})

if shardIter, err = cos.NewParsedTemplate(strings.TrimSpace(cfg.ShardTemplate)); err != nil {
return err
}
shardIter.InitIter()
shardCount = shardIter.Count()

return err
}

Expand Down
104 changes: 89 additions & 15 deletions tools/ishard/ishard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math/rand/v2"
"path/filepath"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -52,12 +53,10 @@ func TestNoRecordsSplit(t *testing.T) {
Provider: apc.AIS,
},
IshardConfig: config.IshardConfig{
MaxShardSize: 102400,
Collapse: tc.collapse,
StartIdx: 0,
IdxDigits: 4,
Ext: ".tar",
Prefix: "shard-",
MaxShardSize: 102400,
Collapse: tc.collapse,
ShardTemplate: "shard-%d",
Ext: ".tar",
},
ClusterConfig: config.DefaultConfig.ClusterConfig,
}
Expand All @@ -71,11 +70,12 @@ func TestNoRecordsSplit(t *testing.T) {
tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/)
t.Logf("using %s as output bucket\n", cfg.DstBck)
tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/)
tassert.CheckError(t, generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, tc.numExtensions, tc.fileSize))

err := generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, tc.numExtensions, tc.fileSize)
tassert.CheckError(t, err)

tassert.CheckError(t, ishard.Init(cfg))
tassert.CheckError(t, ishard.Start())

time.Sleep(time.Second * 3) // wait for api.ArchiveMultiObj to complete

shardContents, err := getShardContents(baseParams, cfg.DstBck)
Expand Down Expand Up @@ -123,12 +123,10 @@ func TestMaxShardSize(t *testing.T) {
Provider: apc.AIS,
},
IshardConfig: config.IshardConfig{
MaxShardSize: tc.maxShardSize,
Collapse: true,
StartIdx: 0,
IdxDigits: 4,
Ext: ".tar",
Prefix: "shard-",
MaxShardSize: tc.maxShardSize,
Collapse: true,
ShardTemplate: "shard-%d",
Ext: ".tar",
},
ClusterConfig: config.DefaultConfig.ClusterConfig,
}
Expand All @@ -143,7 +141,9 @@ func TestMaxShardSize(t *testing.T) {
tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/)
t.Logf("using %s as output bucket\n", cfg.DstBck)
tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/)
tassert.CheckError(t, generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize))

err := generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize)
tassert.CheckError(t, err)

tassert.CheckError(t, ishard.Init(cfg))
tassert.CheckError(t, ishard.Start())
Expand All @@ -163,6 +163,80 @@ func TestMaxShardSize(t *testing.T) {
}
}

func TestShardTemplate(t *testing.T) {
testCases := []struct {
numRecords int
fileSize int64
maxShardSize int64
shardTemplate string
}{
{numRecords: 50, fileSize: 32 * cos.KiB, maxShardSize: 128 * cos.KiB, shardTemplate: "prefix{0000..9999}-suffix"},
{numRecords: 100, fileSize: 96 * cos.KiB, maxShardSize: 256 * cos.KiB, shardTemplate: "prefix-%06d-suffix"},
{numRecords: 200, fileSize: 24 * cos.KiB, maxShardSize: 16 * cos.KiB, shardTemplate: "prefix-@00001-gap-@100-suffix"},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("Records:%d/FileSize:%d/MaxShardSize:%d/Template:%s", tc.numRecords, tc.fileSize, tc.maxShardSize, tc.shardTemplate), func(t *testing.T) {
var (
cfg = &config.Config{
SrcBck: cmn.Bck{
Name: trand.String(15),
Provider: apc.AIS,
},
DstBck: cmn.Bck{
Name: trand.String(15),
Provider: apc.AIS,
},
IshardConfig: config.IshardConfig{
MaxShardSize: tc.maxShardSize,
Collapse: true,
ShardTemplate: tc.shardTemplate,
Ext: ".tar",
},
ClusterConfig: config.DefaultConfig.ClusterConfig,
}
baseParams = api.BaseParams{
URL: cfg.URL,
Client: cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}),
}
numExtensions = 3
)

t.Logf("using %s as input bucket\n", cfg.SrcBck)
tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/)
t.Logf("using %s as output bucket\n", cfg.DstBck)
tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/)

err := generateNestedStructure(baseParams, cfg.SrcBck, tc.numRecords, numExtensions, tc.fileSize)
tassert.CheckError(t, err)

tassert.CheckError(t, ishard.Init(cfg))
tassert.CheckError(t, ishard.Start())
time.Sleep(time.Second * 3) // wait for api.ArchiveMultiObj to complete

tarballs, err := api.ListObjects(baseParams, cfg.DstBck, &apc.LsoMsg{}, api.ListArgs{})
tassert.CheckError(t, err)

for _, en := range tarballs.Entries {
var expectedFormat string
switch tc.shardTemplate {
case "prefix{0000..9999}-suffix":
expectedFormat = `^prefix\d{4}-suffix\.tar$`
case "prefix-%06d-suffix":
expectedFormat = `^prefix-\d{6}-suffix\.tar$`
case "prefix-@00001-gap-@100-suffix":
expectedFormat = `^prefix-\d+-gap-\d+-suffix\.tar$`
default:
t.Fatalf("Unsupported shard template: %s", tc.shardTemplate)
}

re := regexp.MustCompile(expectedFormat)
tassert.Fatalf(t, re.MatchString(en.Name), fmt.Sprintf("expected %s to match %s", en.Name, expectedFormat))
}
})
}
}

// Helper function to generate a nested directory structure
func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecords, numExtensions int, fileSize int64) error {
extensions := make([]string, 0, numExtensions)
Expand Down

0 comments on commit f0d3b6f

Please sign in to comment.