Skip to content

Commit

Permalink
feat: support count-based shard_size config in ishard
Browse files Browse the repository at this point in the history
* Rename all `max_shard_size` to `shard_size`
* Refractor `ISharder` archive logic to keep current shard size/count as
  internal state

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Aug 12, 2024
1 parent 061a7c9 commit 767dbb6
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 115 deletions.
52 changes: 17 additions & 35 deletions cmd/ishard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ To give a quick example, `a/b/c/toyota.jpeg` and `a/b/c/toyota.json` from an ori
- `-sample_key_pattern="full_name"`: Performs no substitution, using the entire file name without extension as the sample key.
- `-sample_key_pattern="collapse_all_dir"`: Removes all '/' characters from the file name, using the resulting string as the sample key.
- `-sample_key_pattern="custom_regex"`: Applies a custom regex pattern to substitute the file names to sample keys for your specific requirements.
- `-max_shard_size`: Maximum size of each output shard. Default is `1MiB`. Accept following _units_ formats:
- IEC format, e.g.: 4KiB, 16MiB, 2GiB
- SI format, e.g.: 4KB, 16MB, 2GB
- raw format (in bytes), e.g.: 1024000
- `-shard_size`: Specifies the approximate size of each output shard. The default value is 1MiB. This option supports both count-based and size-based formats:
- Count-Based Format:
- `-shard_size="10"`: Sets the number of samples contained in each output shard to 10.
- Size-Based Formats:
- `-shard_size="16MiB"`: Sets the size of each output shard to "16MiB" using the IEC format.
- `-shard_size="4KB"`: Sets the size of each output shard to "4KB" using the SI format.
- `-src_bck`: The source bucket name or URI.
- `-dst_bck`: The destination bucket name or URI.
- `-shard_template`: The template used for generating output shards. Accepts Bash, Fmt, or At formats.
Expand Down Expand Up @@ -158,7 +160,7 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB
- `ImageNet/Data/train/n00000333/n00000333_02.JPEG`
- `ImageNet/Data/train/n00000333/n00000333_02.loss`

They have the same base name `n00000333_02`, and therefore will always present in the same output shard, regardless of `max_shard_size` value.
They have the same base name `n00000333_02`, and therefore will always present in the same output shard, regardless of `shard_size` value.
```sh
./ishard -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out

Expand Down Expand Up @@ -203,9 +205,9 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB
...
```

By default, `ishard` ensures that files with different virtual directory structure (after applying `sample_key_pattern`) won't present in the same output shard. In other words, `ishard` maintains clear boundaries between files that belong to different virtual directory, even if some output shard's size doesn't reached the `max_shard_size`. As shown in the example above, there are only two objects in the `shard-059.tar` output shard regardless of the `max_shard_size` value, since they are the only two files under their virtual directory structure.
By default, `ishard` ensures that files with different virtual directories (after applying `sample_key_pattern`) won't be present in the same output shard. In other words, `ishard` maintains clear boundaries between files that belong to different virtual directories, even if an output shard's size doesn't reach the requested `shard_size`. As shown in the example above, there are only two objects in the `shard-059.tar` output shard regardless of the `shard_size` value, since they are the only two files under their virtual directories.

To disable this default setting and compact each output shard's size closer to `max_shard_size`, regardless of virtual directories, you can specify `-collapse` flag. This will to flatten samples into its parent virtual directory if their overall size doesn't reach `max_shard_size`.
To disable this default setting and compact each output shard's size closer to `shard_size`, regardless of virtual directories, you can specify `-collapse` flag. This will to flatten samples into its parent virtual directory if their overall size doesn't reach `shard_size`.

```sh
$ ./ishard -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -sample_key_pattern="full_name" -collapse
Expand Down Expand Up @@ -359,14 +361,14 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB

1. The number of generated output shards can't fit into specified `shard-template`.
```sh
$ ./ishard -max_shard_size=256000 -src_bck=ais://sample -dst_bck=ais://sample-out -collapse -sample_key_pattern="base_filename" -shard_template="pre-{0000..50..8}-suf"
$ ./ishard -shard_size=256KiB -src_bck=ais://sample -dst_bck=ais://sample-out -collapse -sample_key_pattern="base_filename" -shard_template="pre-{0000..50..8}-suf"

Error: number of shards to be created exceeds expected number of shards (7)
```

2. Provides invalid regex `sample_key_pattern`.
```sh
$ ./ishard -max_shard_size=256000 -src_bck=ais://sample -dst_bck=ais://sample-out -collapse -sample_key_pattern="(.*'" -shard_template="pre-{0000..8192..8}-suf"
$ ./ishard -shard_size=256KiB -src_bck=ais://sample -dst_bck=ais://sample-out -collapse -sample_key_pattern="(.*'" -shard_template="pre-{0000..8192..8}-suf"

Invalid regex pattern: (.*'. Error: error parsing regexp: missing closing ): `(.*'`
```
Expand All @@ -376,7 +378,7 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB
The `-dry_run` flag in the CLI parameters allows `ishard` to only print a preview of the output shards composition without performing the actual archiving tasks. This is especially useful when working with large datasets, where the full execution of `ishard` can take hours to complete.

```sh
$ ./ishard-cli -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run | less
$ ./ishard-cli -shard_size=120KiB -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run | less
pre-0000-suf.tar 120.68KiB
pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_01.xml 100B
Expand All @@ -398,7 +400,7 @@ pre-0008-suf.tar 120.59Ki
You can also apply `-dry_run="show_keys"` to display the key of each group of samples after `sample_key_pattern` substitution. The string inside `[]` in the output represents the sample key of the sample to which the following files belong.

```sh
$ ./ishard-cli -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run="show_keys" | less
$ ./ishard-cli -shard_size=120KiB -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run="show_keys" | less
pre-0000-suf.tar 120.68KiB
[n00000333_01]
Expand Down Expand Up @@ -439,31 +441,11 @@ go test -v -short -tags=debug
Test for a Specific Case

```sh
go test -v -short -tags=debug -run=TestIshardMaxShardSize
go test -v -short -tags=debug -run=TestIshardShardSize
```

## TODO List

### MUST HAVE/DESIRABLE
- [X] Shard name patterns
- [X] Utilize existing name template tools
- [X] goroutine
- [X] configurable record key, extensions
- [X] upon missing extension in a record: (abort | warn | ignore)
- [X] dry run
- [ ] debug build
- [X] allow user to specify source directories to include/exclude (achieved by prefix option)
- [X] logging (timestamp, nlog)
- [ ] Large list of objects, need to swap MEM temporary
- [ ] Dry run with Dsort
- [X] Long stress tests
- [X] Dsort integration
- [ ] Dry run with Dsort

### GOOD TO HAVE
- [X] progress bar (later)
- [X] polling for completion of archive xactions (necessary to show the progress)
- [ ] substitute the original file name
- [X] multi-worker archive xact
- [ ] integration into aistore (later)
- [ ] E2E testing from CLI
- [ ] Naming Template Placeholder
- [ ] ETL Integration
- [ ] On-the-fly ishard
55 changes: 42 additions & 13 deletions cmd/ishard/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type (
URL string
}
IshardConfig struct {
MaxShardSize int64
ShardSize ShardSize
Ext string
ShardTemplate string
SampleKeyPattern SampleKeyPattern
Expand Down Expand Up @@ -53,7 +53,7 @@ const (
var DefaultConfig = Config{
ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort},
IshardConfig: IshardConfig{
MaxShardSize: 102400,
ShardSize: ShardSize{Size: 102400},
Ext: ".tar",
ShardTemplate: "shard-%d",
Collapse: false,
Expand All @@ -67,6 +67,42 @@ var DefaultConfig = Config{
SortFlag: SortFlag{IsSet: false},
}

//////////////////////////////
// Parse `-shard_size` flag //
//////////////////////////////

type ShardSize struct {
IsCount bool
Size int64
Count int
}

func (s *ShardSize) Set(value string) error {
var (
err error
cnt int64
)
if cnt, err = strconv.ParseInt(value, 10, 32); err == nil {
s.Count = int(cnt)
s.IsCount = true
return nil
}

if s.Size, err = cos.ParseSize(value, cos.UnitsIEC); err != nil {
return fmt.Errorf("error parsing shard_size (accepts IEC, SI, or count formats): %w", err)
}

s.IsCount = false
return nil
}

func (s *ShardSize) String() string {
if s.IsCount {
return fmt.Sprintf("%d files", s.Count)
}
return fmt.Sprintf("%d bytes", s.Size)
}

////////////////////////
// Parse `-sort` flag //
////////////////////////
Expand Down Expand Up @@ -230,16 +266,15 @@ func parseCliParams(cfg *Config) {

var (
err error
maxShardSizeStr string
sampleExts string
sampleKeyPatternStr string
missingExtActStr string
)

flag.StringVar(&maxShardSizeStr, "max_shard_size", "1MiB", "Maximum size of each output shard. Default is `\"1MiB\"`. Accepts the following units formats:\n"+
" - IEC format, e.g.: 4KiB, 16MiB, 2GiB\n"+
" - SI format, e.g.: 4KB, 16MB, 2GB\n"+
" - raw format (in bytes), e.g.: 1024000")
flag.Var(&cfg.ShardSize, "shard_size", "Approximate size of each output shard. Supports both count-based and size-based formats. Default is `\"1MiB\"`.\n"+
" -shard_size=\"10\": Sets the number of samples contained in each output shard to 10.\n"+
" -shard_size=\"16MiB\": Sets the size of each output shard to \"16MiB\" using the IEC format.\n"+
" -shard_size=\"4KB\": Sets the size of each output shard to \"4KB\" using the SI format.")
flag.StringVar(&sampleExts, "sample_exts", "", "Comma-separated list of extensions that should exists in the dataset. e.g. -sample=\".JPEG,.xml,.json\". See -missing_extension_action for handling missing extensions")
flag.StringVar(&sampleKeyPatternStr, "sample_key_pattern", "", "The pattern used to substitute source file names to sample keys. Default it `\"base_filename\"`. Options are \"base_file_name\" | \"full_name\" | \"collapse_all_dir\" | \"any other valid regex\" \n"+
"This ensures that files with the same sample key are always sharded into the same output shard.\n"+
Expand All @@ -256,12 +291,6 @@ func parseCliParams(cfg *Config) {

flag.Parse()

if cfg.MaxShardSize, err = cos.ParseSize(maxShardSizeStr, cos.UnitsIEC); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
}

var reactions = []string{"ignore", "warn", "abort", "exclude"}
if !cos.StringInSlice(missingExtActStr, reactions) {
fmt.Printf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr)
Expand Down
80 changes: 49 additions & 31 deletions cmd/ishard/ishard/ishard.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,20 @@ type ISharder struct {
baseParams api.BaseParams
sampleKeyRegex *regexp.Regexp

// used for tracking size while archiving
currentShardSize int64
currentFileCount int

shardFactory *factory.ShardFactory
}

// archive traverses through nodes and collects records on the way. Once it reaches
// the desired amount, it runs a goroutine to archive those collected records
func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Records, _ int64, _ error) {
var (
totalSize int64
recs = shard.NewRecords(16)
errCh = make(chan error, 1)
wg = cos.NewLimitedWaitGroup(cmn.MaxParallelism(), 0)
recs = shard.NewRecords(16)
errCh = make(chan error, 1)
wg = cos.NewLimitedWaitGroup(cmn.MaxParallelism(), 0)
)

for name, child := range n.children {
Expand All @@ -136,20 +139,17 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor
if childRecords != nil && childRecords.Len() != 0 {
recs.Insert(childRecords.All()...)
}
totalSize += subtreeSize

if totalSize < is.cfg.MaxShardSize {
continue
}

wg.Add(1)
go func(recs *shard.Records, size int64) {
is.shardFactory.Create(recs, size, errCh)
wg.Done()
}(recs, totalSize)
// Create a new shard and reset once exceeding the configured size
if totalSize := is.incAndCheck(subtreeSize); totalSize != 0 {
wg.Add(1)
go func(recs *shard.Records, size int64) {
is.shardFactory.Create(recs, size, errCh)
wg.Done()
}(recs, totalSize)

totalSize = 0
recs = shard.NewRecords(16)
recs = shard.NewRecords(16)
}
}

n.records.Lock()
Expand All @@ -160,31 +160,28 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor
default:
}

totalSize += record.TotalSize()
recs.Insert(record)

if totalSize < is.cfg.MaxShardSize {
continue
}
// Create a new shard and reset once exceeding the configured size
if totalSize := is.incAndCheck(record.TotalSize()); totalSize != 0 {
wg.Add(1)
go func(recs *shard.Records, size int64) {
is.shardFactory.Create(recs, size, errCh)
wg.Done()
}(recs, totalSize)

wg.Add(1)
go func(recs *shard.Records, size int64) {
is.shardFactory.Create(recs, size, errCh)
wg.Done()
}(recs, totalSize)

totalSize = 0
recs = shard.NewRecords(16)
recs = shard.NewRecords(16)
}
}
n.records.Unlock()

// if cfg.Collapse is not set, or no parent to collapse to (root level), archive all remaining objects regardless the current total size
// if cfg.Collapse is not set, or no parent to collapse to (at root level), archive all remaining objects regardless the current total size
if !is.cfg.Collapse || path == "" {
wg.Add(1)
go func(recs *shard.Records, size int64) {
is.shardFactory.Create(recs, size, errCh)
wg.Done()
}(recs, totalSize)
}(recs, is.currentShardSize)
}

done := make(chan struct{})
Expand All @@ -197,10 +194,31 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor
case err := <-errCh:
return nil, 0, err
case <-done:
return recs, totalSize, nil
return recs, is.currentShardSize, nil
}
}

// incAndCheck adds the provided size to the current shard and checks if it exceeds the configured limit
// If the limit is exceeded, it resets the counters and returns the accumulated size. Otherwise, it returns 0.
func (is *ISharder) incAndCheck(size int64) (accumulatedSize int64) {
is.currentShardSize += size
is.currentFileCount++

accumulatedSize = is.currentShardSize

if is.cfg.ShardSize.IsCount && is.currentFileCount < is.cfg.ShardSize.Count {
return 0
}

if accumulatedSize < is.cfg.ShardSize.Size {
return 0
}

is.currentFileCount = 0
is.currentShardSize = 0
return
}

// NewISharder instantiates an ISharder with the configuration if provided;
// otherwise, it loads from CLI or uses the default config.
func NewISharder(cfgArg *config.Config) (is *ISharder, err error) {
Expand Down
Loading

0 comments on commit 767dbb6

Please sign in to comment.