Skip to content

Commit

Permalink
feat: add dry-run option for ishard with expected shards layout
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jul 15, 2024
1 parent b3e1adf commit d990ada
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 32 deletions.
44 changes: 42 additions & 2 deletions cmd/ishard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ On top of these basic capabilities, there's the already mentioned [WebDataset](h

To give a quick example, `a/b/c/toyota.jpeg` and `a/b/c/toyota.json` from an original dataset are considered part of the same sample (a.k.a. _record_) and must be stored together in one shard.

## Terms

- **File**: Represents individual files in the source bucket. The file names are substituted into sample keys based on a configurable rule called `sample_key_pattern`.
- **Sample**: Groups multiple files with the same sample key into a single structure. After `ishard` execution, samples are indivisible and will always be included together in the same output shard.
- **Shard**: Represents the output of `ishard`, which is a collection of files archived in `.tar`, `.tgz` or `.tar.gz`, `.zip`, or `.tar.lz4` formats.

## CLI Parameters

- `-sample_key_pattern`: The pattern used to substitute source file names to sample keys. This ensures that objects with the same sample key are always merged into the same output shard.
Expand All @@ -42,6 +48,7 @@ To give a quick example, `a/b/c/toyota.jpeg` and `a/b/c/toyota.json` from an ori

```sh
$ cd cmd/ishard
$ go mod tidy
$ go build -o ishard .
```

Expand Down Expand Up @@ -126,6 +133,8 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB

### Correct Usages

> Sharding a large dataset can take hours to complete. Therefore, it is highly recommended to first perform a `dry-run` of your `ishard` command to ensure it performs the desired sample key substitution and produces the expected output shard composition. See the Dry Run section below for more details.
1. **Execute `ishard` with `base_file_name` as the sample key**:

When `sample_key_pattern` is set to `base_file_name`, source files with the same base name (without extensions) will be sharded together. For example, the following three files:
Expand Down Expand Up @@ -217,7 +226,7 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB
```sh
$ ./ishard -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -progress --sample_key_pattern=".*/([^/]+)/[^/]+$" -shard_template="pre-{0000..8192..8}-suf"

2024/07/11 11:34:26 `sample_key_pattern` .*/([^/]+)/[^/]+$ is not built-in (`base_file_name` | `no_op` | `collapse_all_dir`), compiled as custom regex.
2024/07/11 11:34:26 `sample_key_pattern` .*/([^/]+)/[^/]+$ is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex.

$ ais archive ls ais://ImageNet-out | less

Expand Down Expand Up @@ -257,6 +266,37 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB
Invalid regex pattern: (.*'. Error: error parsing regexp: missing closing ): `(.*'`
```

## Dry Run

The `-dry_run` flag in the CLI parameters allows `ishard` to only print a preview of the output shards composition without performing the actual archiving tasks. This is especially useful when working with large datasets, where the full execution of `ishard` can take hours to complete. The string inside `[]` in the output represents the sample key of the sample to which the following files belong.

```sh
$ ./ishard-cli -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run | less
pre-0000-suf.tar 120.68KiB
[n00000333_01]
pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_01.xml 100B
pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_01.JPEG 30.00KiB
[n00000333_02]
pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_02.xml 100B
pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB
pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_02.loss 100B
[n00000369_01]
pre-0000-suf/ImageNet/Annotations/n00000369/n00000369_01.xml 100B
pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_01.JPEG 30.00KiB
pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_01.loss 100B
[n00000369_02]
pre-0000-suf/ImageNet/Annotations/n00000369/n00000369_02.xml 100B
pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_02.JPEG 30.00KiB
pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_02.loss 100B
pre-0008-suf.tar 120.59KiB
[n00000369_03]
pre-0008-suf/ImageNet/Annotations/n00000369/n00000369_03.xml 100B
pre-0008-suf/ImageNet/Data/train/n00000369/n00000369_03.JPEG 30.00KiB
[n00000369_04]
...
```

## Running the Tests

Test in Short Mode
Expand Down Expand Up @@ -291,7 +331,7 @@ go test -v -short -tags=debug -run=TestIshardMaxShardSize
- [X] goroutine
- [X] configurable record key, extensions
- [X] upon missing extension in a record: (abort | warn | ignore)
- [ ] dry run
- [X] dry run
- [ ] version 0.9 (github checksum, git cmd)
- [ ] go install
- [ ] debug build
Expand Down
5 changes: 4 additions & 1 deletion cmd/ishard/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type (
SrcBck cmn.Bck
DstBck cmn.Bck
Progress bool
DryRun bool
}
)

Expand All @@ -47,6 +48,7 @@ var DefaultConfig = Config{
SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS},
DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS},
Progress: false,
DryRun: false,
}

// Load configuration for ishard from cli, or spec files (TODO)
Expand All @@ -65,6 +67,7 @@ func parseCliParams(cfg *Config) {
flag.StringVar(&cfg.MissingExtAction, "missing_extension_action", "ignore", "Action to take when an extension is missing: abort | warn | ignore")
flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.")
flag.BoolVar(&cfg.Progress, "progress", false, "If true, display the progress of processing objects in the source bucket.")
flag.BoolVar(&cfg.DryRun, "dry_run", false, "If true, only shows the layout of resulting output shards without actually executing archive jobs.")

var (
sampleExts string
Expand All @@ -91,7 +94,7 @@ func parseCliParams(cfg *Config) {
if pattern, ok := commonPatterns[sampleKeyPatternStr]; ok {
cfg.SampleKeyPattern = pattern
} else {
log.Printf("`sample_key_pattern` %s is not built-in (`base_file_name` | `no_op` | `collapse_all_dir`), compiled as custom regex.", sampleKeyPatternStr)
log.Printf("`sample_key_pattern` %s is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex.", sampleKeyPatternStr)
if _, err := regexp.Compile(sampleKeyPatternStr); err != nil {
log.Fatalf("Invalid regex pattern: %s. Error: %v", cfg.SampleKeyPattern, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ishard/ishard/config/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type SampleKeyPattern struct {
// Define some commonly used sample key patterns
var (
BaseFileNamePattern = SampleKeyPattern{Regex: `.*/([^/]+)$`, CaptureGroup: "$1"}
FullNamePattern = SampleKeyPattern{Regex: `^.*$`, CaptureGroup: "$0"}
FullNamePattern = SampleKeyPattern{Regex: `^(.*)$`, CaptureGroup: "$1"}
CollapseAllDirPattern = SampleKeyPattern{Regex: `/`, CaptureGroup: ""}
)

Expand Down
107 changes: 79 additions & 28 deletions cmd/ishard/ishard/ishard.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package ishard
import (
"errors"
"fmt"
"html/template"
"os"
"path/filepath"
"regexp"
"strings"
"text/tabwriter"

"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
Expand Down Expand Up @@ -97,6 +100,7 @@ type ISharder struct {
progressBar *mpb.Bar
missingExtAct config.MissingExtFunc
sampleKeyRegex *regexp.Regexp
CLItemplate *template.Template
}

// archive traverses through nodes and collects records on the way. Once it reaches
Expand Down Expand Up @@ -137,14 +141,25 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor
if !hasNext {
return nil, 0, fmt.Errorf("number of shards to be created exceeds expected number of shards (%d)", is.shardIter.Count())
}
wg.Add(1)
go func(recs *shard.Records, name string) {
is.generateShard(recs, name, errCh)
wg.Done()
}(recs, name)

sh := &shard.Shard{
Size: totalSize,
Records: recs,
Name: name,
}
totalSize = 0
recs = shard.NewRecords(16)

if is.cfg.DryRun {
is.printShard(sh)
continue
}

wg.Add(1)
go func(sh *shard.Shard) {
is.generateShard(sh, errCh)
wg.Done()
}(sh)
}
n.records.Unlock()

Expand All @@ -155,44 +170,57 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor
return nil, 0, fmt.Errorf("number of shards to be created exceeds expected number of shards (%d)", is.shardIter.Count())
}

wg.Add(1)
go func(recs *shard.Records, name string) {
is.generateShard(recs, name, errCh)
wg.Done()
}(recs, name)
sh := &shard.Shard{
Size: totalSize,
Records: recs,
Name: name,
}

defer recs.Drain()
if is.cfg.DryRun {
is.printShard(sh)
} else {
wg.Add(1)
go func(sh *shard.Shard) {
is.generateShard(sh, errCh)
wg.Done()
}(sh)
}
}

wg.Wait()
close(errCh)
for err := range errCh {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case err := <-errCh:
return nil, 0, err
case <-done:
return recs, totalSize, nil
}

return recs, totalSize, nil
}

func (is *ISharder) generateShard(recs *shard.Records, name string, errCh chan error) {
defer recs.Drain()

if recs.Len() == 0 {
func (is *ISharder) generateShard(sh *shard.Shard, errCh chan error) {
if sh.Records.Len() == 0 {
return
}

defer sh.Records.Drain()

paths := []string{}
recs.Lock()
for _, record := range recs.All() {
sh.Records.Lock()
for _, record := range sh.Records.All() {
for _, obj := range record.Objects {
paths = append(paths, obj.ContentPath)
}
}
recs.Unlock()
sh.Records.Unlock()

msg := cmn.ArchiveBckMsg{
ToBck: is.cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: name + is.cfg.Ext,
ArchName: sh.Name + is.cfg.Ext,
ListRange: apc.ListRange{
ObjNames: paths,
},
Expand All @@ -201,7 +229,7 @@ func (is *ISharder) generateShard(recs *shard.Records, name string, errCh chan e

_, err := api.ArchiveMultiObj(is.baseParams, is.cfg.SrcBck, &msg)
if err != nil {
errCh <- fmt.Errorf("failed to archive shard %s: %w", name, err)
errCh <- fmt.Errorf("failed to archive shard %s: %w", sh.Name, err)
}

if is.progressBar != nil {
Expand Down Expand Up @@ -246,6 +274,15 @@ func (is *ISharder) findMissingExt(node *dirNode, recursive bool) error {
return nil
}

func (is *ISharder) printShard(sh *shard.Shard) {
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', tabwriter.AlignRight)
if err := is.CLItemplate.Execute(w, sh); err != nil {
fmt.Println("error executing template: %w", err)
}
w.Flush()
sh.Records.Drain()
}

// NewISharder instantiates an ISharder with the configuration if provided;
// otherwise, it loads from CLI or uses the default config.
func NewISharder(cfgArg *config.Config) (is *ISharder, err error) {
Expand Down Expand Up @@ -279,6 +316,19 @@ func NewISharder(cfgArg *config.Config) (is *ISharder, err error) {
is.missingExtAct = config.MissingExtActMap[is.cfg.MissingExtAction]
is.sampleKeyRegex = regexp.MustCompile(is.cfg.SampleKeyPattern.Regex)

if is.cfg.DryRun {
tmplWithSampleKey := `{{$shard := .}}{{appendExt $shard.Name}} {{formatSize $shard.Size}}
{{range $rec := .Records.All}} {{sampleKey $rec.Name}}
{{range $obj := .Objects}} {{contentPath $shard.Name $obj.ContentPath}} {{formatSize .Size}}
{{end}}{{end}}`
is.CLItemplate = template.Must(template.New("shard").Funcs(template.FuncMap{
"formatSize": func(size int64) string { return cos.ToSizeIEC(size, 2) },
"sampleKey": func(sampleKey string) string { return "[" + sampleKey + "]" },
"contentPath": func(shardName, objContentPath string) string { return filepath.Join(shardName, objContentPath) },
"appendExt": func(shardName string) string { return shardName + is.cfg.Ext },
}).Parse(tmplWithSampleKey))
}

return is, err
}

Expand Down Expand Up @@ -327,9 +377,10 @@ func (is *ISharder) Start() error {
return err
}

if err := api.WaitForXactionIdle(is.baseParams, &xact.ArgsMsg{Kind: apc.ActArchive, Bck: is.cfg.SrcBck}); err != nil {
return err
if is.cfg.DryRun {
return nil
}

return err
// Wait until all archive xactions reach to idle stage
return api.WaitForXactionIdle(is.baseParams, &xact.ArgsMsg{Kind: apc.ActArchive, Bck: is.cfg.SrcBck})
}

0 comments on commit d990ada

Please sign in to comment.