From a2a0445d47a14992a43f106c7ec1a0406aac5828 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Mon, 15 Jul 2024 09:21:43 -0700 Subject: [PATCH] feat: enable `ishard` prefix option for specifying source files to include Signed-off-by: Tony Chen --- cmd/ishard/README.md | 205 ++++++++++++++++++----------- cmd/ishard/ishard/config/config.go | 70 ++++++++-- cmd/ishard/ishard/ishard.go | 66 +++++++--- cmd/ishard/ishard/ishard_test.go | 108 +++++++++++---- 4 files changed, 315 insertions(+), 134 deletions(-) diff --git a/cmd/ishard/README.md b/cmd/ishard/README.md index 85db348d865..43dcd585720 100644 --- a/cmd/ishard/README.md +++ b/cmd/ishard/README.md @@ -25,7 +25,7 @@ To give a quick example, `a/b/c/toyota.jpeg` and `a/b/c/toyota.json` from an ori ## CLI Parameters - `-sample_key_pattern`: The pattern used to substitute source file names to sample keys. This ensures that objects with the same sample key are always merged into the same output shard. - - `-sample_key_pattern="base_filename"`: Extracts and uses only the base filename as the sample key to merge. Removes all directory paths and extensions. + - `-sample_key_pattern="base_filename"`: The default option. Extracts and uses only the base filename as the sample key to merge. Removes all directory paths and extensions. - `-sample_key_pattern="full_name"`: Performs no substitution, using the entire file name without extension as the sample key. - `-sample_key_pattern="collapse_all_dir"`: Removes all '/' characters from the file name, using the resulting string as the sample key. - `-sample_key_pattern="custom_regex"`: Applies a custom regex pattern to substitute the file names to sample keys for your specific requirements. @@ -41,6 +41,7 @@ To give a quick example, `a/b/c/toyota.jpeg` and `a/b/c/toyota.json` from an ori - `-missing_extension_action`: Action to take when an extension is missing at any sample: `abort` | `warn` | `ignore`, if `sample_exts` is set. - `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size. - `-progress`: If true, display the progress of processing objects in the source bucket. +- `-dry_run`: If set, only shows the layout of resulting output shards without actually executing archive jobs. Use 'show_keys' to include sample keys. ## Initial Setup @@ -135,29 +136,29 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB > Sharding a large dataset can take hours to complete. Therefore, it is highly recommended to first perform a `dry-run` of your `ishard` command to ensure it performs the desired sample key substitution and produces the expected output shard composition. See the Dry Run section below for more details. -1. **Execute `ishard` with `base_file_name` as the sample key**: +1. **Execute `ishard` with default sample key**: - When `sample_key_pattern` is set to `base_file_name`, source files with the same base name (without extensions) will be sharded together. For example, the following three files: + When `sample_key_pattern` is not specified, `ishard` uses `base_file_name` as sample key. This means that source files with the same base name (without extensions) will be sharded together. For example, the following three files: - `ImageNet/Annotations/n00000333/n00000333_02.xml` - `ImageNet/Data/train/n00000333/n00000333_02.JPEG` - `ImageNet/Data/train/n00000333/n00000333_02.loss` They have the same base name `n00000333_02`, and therefore will always present in the same output shard, regardless of `max_shard_size` value. ```sh - ./ishard -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" + ./ishard -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out $ ais archive ls ais://ImageNet-out | less - NAME SIZE - pre-0000-suf.tar 130.00KiB - pre-0000-suf.tar/ImageNet/Annotations/n00000333/n00000333_01.xml 100B - pre-0000-suf.tar/ImageNet/Annotations/n00000333/n00000333_02.xml 100B + NAME SIZE + shard-0.tar 1.00MiB + shard-0.tar/ImageNet/Annotations/n00000333/n00000333_01.xml 100B + shard-0.tar/ImageNet/Annotations/n00000333/n00000333_02.xml 100B ... - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_02.loss 100B - pre-0000-suf.tar/ImageNet/Data/train/n00000369/n00000369_01.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_02.loss 100B + shard-0.tar/ImageNet/Data/train/n00000369/n00000369_01.JPEG 30.00KiB ... - pre-0008-suf.tar 129.00KiB + shard-1.tar 129.00KiB ... ``` @@ -169,84 +170,119 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB They have the same full name `ImageNet/Data/train/n00005739/n00005739_01` and therefore will always present in the same output shard. But file `ImageNet/Annotations/n00005739/n00005739_01.xml` has different full name `ImageNet/Annotations/n00005739/n00005739_01`, and therefore will be sharded separately. ```sh - $ ./ishard -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="full_name" -shard_template="pre-{0000..8192..8}-suf" + $ ./ishard -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="full_name" - NAME SIZE + NAME SIZE ... - pre-0200-suf.tar 4.00KiB - pre-0200-suf.tar/ImageNet/Annotations/n00028550/n00028550_01.xml 100B - pre-0200-suf.tar/ImageNet/Annotations/n00028550/n00028550_02.xml 100B - pre-0200-suf.tar/ImageNet/Annotations/n00028550/n00028550_03.xml 100B - pre-0208-suf.tar 3.00KiB - pre-0208-suf.tar/ImageNet/Annotations/n00005739/n00005739_01.xml 100B - pre-0208-suf.tar/ImageNet/Annotations/n00005739/n00005739_02.xml 100B + shard-059.tar 200B + shard-059/ImageNet/Annotations/n00005739/n00005739_01.xml 100B + shard-059/ImageNet/Annotations/n00005739/n00005739_02.xml 100B ... - pre-0488-suf.tar 94.50KiB - pre-0488-suf.tar/ImageNet/Data/train/n00005739/n00005739_01.JPEG 30.00KiB - pre-0488-suf.tar/ImageNet/Data/train/n00005739/n00005739_01.loss 100B - pre-0488-suf.tar/ImageNet/Data/train/n00005739/n00005739_02.JPEG 30.00KiB - pre-0488-suf.tar/ImageNet/Data/train/n00005739/n00005739_03.JPEG 30.00KiB - pre-0488-suf.tar/ImageNet/Data/train/n00005739/n00005739_03.loss 100B - pre-0496-suf.tar 126.00KiB + shard-097.tar 90.20KiB + shard-097/ImageNet/Data/train/n00005739/n00005739_01.JPEG 30.00KiB + shard-097/ImageNet/Data/train/n00005739/n00005739_01.loss 100B + shard-097/ImageNet/Data/train/n00005739/n00005739_02.JPEG 30.00KiB + shard-097/ImageNet/Data/train/n00005739/n00005739_03.JPEG 30.00KiB + shard-097/ImageNet/Data/train/n00005739/n00005739_03.loss 100B + shard-098.tar 60.00KiB ... ``` - By default, `ishard` ensures that files with different virtual directory structure (after applying `sample_key_pattern`) won't present in the same output shard. In other words, `ishard` clearly cut the boundary between files that belong to different virtual directory, even if some output shard's size hasn't reached the `max_shard_size`. As shown in the example above, there are only three objects in `pre-0200-suf.tar` regardless `max_shard_size` value, since they are the only three objects under their virtual directory structure. + By default, `ishard` ensures that files with different virtual directory structure (after applying `sample_key_pattern`) won't present in the same output shard. In other words, `ishard` maintains clear boundaries between files that belong to different virtual directory, even if some output shard's size doesn't reached the `max_shard_size`. As shown in the example above, there are only two objects in the `shard-059.tar` output shard regardless of the `max_shard_size` value, since they are the only two files under their virtual directory structure. - To disable this default setting, you can specify `-collapse` flag to flatten samples into its parent virtual directory if their overall size doesn't reach `max_shard_size`. + To disable this default setting and compact each output shard's size closer to `max_shard_size`, regardless of virtual directories, you can specify `-collapse` flag. This will to flatten samples into its parent virtual directory if their overall size doesn't reach `max_shard_size`. ```sh - $ ./ishard -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -progress --sample_key_pattern="full_name" -shard_template="pre-{0000..8192..8}-suf" -collapse - + $ ./ishard -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="full_name" -collapse + + NAME SIZE + shard-0.tar 1.03MiB + shard-0/ImageNet/Data/train/n00003215/n00003215_01.JPEG 30.00KiB + shard-0/ImageNet/Data/train/n00003215/n00003215_02.JPEG 30.00KiB + shard-0/ImageNet/Data/train/n00003215/n00003215_02.loss 100B + shard-0/ImageNet/Data/train/n00003215/n00003215_03.JPEG 30.00KiB ... - pre-0464-suf.tar 123.00KiB - pre-0464-suf.tar/ImageNet/Data/val/n00019349/ILSVRC2012_val_00002384.JPEG 30.00KiB - pre-0464-suf.tar/ImageNet/Data/val/n00019349/ILSVRC2012_val_00004277.JPEG 30.00KiB - pre-0464-suf.tar/ImageNet/Data/val/n00019349/ILSVRC2012_val_00007772.JPEG 30.00KiB - pre-0464-suf.tar/ImageNet/Data/val/n00019349/ILSVRC2012_val_00010511.JPEG 30.00KiB - pre-0472-suf.tar 123.00KiB - pre-0472-suf.tar/ImageNet/Data/val/n00026528/ILSVRC2012_val_00001391.JPEG 30.00KiB - pre-0472-suf.tar/ImageNet/Data/val/n00026528/ILSVRC2012_val_00002608.JPEG 30.00KiB - pre-0472-suf.tar/ImageNet/Data/val/n00026528/ILSVRC2012_val_00012843.JPEG 30.00KiB - pre-0472-suf.tar/ImageNet/Data/val/n00026528/ILSVRC2012_val_00016277.JPEG 30.00KiB - pre-0480-suf.tar 6.34MiB - pre-0480-suf.tar/ImageNet/Annotations/n00000333/n00000333_01.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000333/n00000333_02.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000369/n00000369_01.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000369/n00000369_02.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000369/n00000369_03.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000369/n00000369_04.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000565/n00000565_01.xml 100B - pre-0480-suf.tar/ImageNet/Annotations/n00000565/n00000565_02.xml 100B + shard-6.tar 1.03MiB + shard-6/ImageNet/Data/val/n00015250/ILSVRC2012_val_00001158.JPEG 30.00KiB + shard-6/ImageNet/Data/val/n00015250/ILSVRC2012_val_00007151.JPEG 30.00KiB + shard-6/ImageNet/Data/val/n00015250/ILSVRC2012_val_00017846.JPEG 30.00KiB + shard-6/ImageNet/Data/val/n00015250/ILSVRC2012_val_00020293.JPEG 30.00KiB + ... + shard-12.tar 653.05KiB + shard-12/ImageNet/Annotations/n00023258/n00023258_01.xml 100B + shard-12/ImageNet/Annotations/n00023258/n00023258_02.xml 100B + shard-12/ImageNet/Annotations/n00032644/n00032644_01.xml 100B + shard-12/ImageNet/Annotations/n00032644/n00032644_02.xml 100B + shard-12/ImageNet/Annotations/n00032644/n00032644_03.xml 100B ... ``` 3. **Customized regex sample key:** You can also provide your own `sample_key_pattern` as regex for sample key substitution. For example, the following demonstrates how to only extract the last level of virtual directory name `n00000333` as sample key using custom regex `.*/([^/]+)/[^/]+$`. ```sh - $ ./ishard -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -progress --sample_key_pattern=".*/([^/]+)/[^/]+$" -shard_template="pre-{0000..8192..8}-suf" + $ ./ishard -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -progress --sample_key_pattern=".*/([^/]+)/[^/]+$" 2024/07/11 11:34:26 `sample_key_pattern` .*/([^/]+)/[^/]+$ is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex. $ ais archive ls ais://ImageNet-out | less + NAME SIZE + shard-0.tar 1.17MiB + shard-0/ImageNet/Annotations/n00000333/n00000333_01.xml 100B + shard-0/ImageNet/Annotations/n00000333/n00000333_02.xml 100B + shard-0/ImageNet/Data/train/n00000333/n00000333_01.JPEG 30.00KiB + shard-0/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB + shard-0/ImageNet/Data/train/n00000333/n00000333_02.loss 100B + shard-0/ImageNet/Data/train/n00000333/n00000333_03.JPEG 30.00KiB + shard-0/ImageNet/Data/train/n00000333/n00000333_03.loss 100B + shard-0/ImageNet/Data/val/n00000333/ILSVRC2012_val_00001851.JPEG 30.00KiB + shard-0/ImageNet/Data/val/n00000333/ILSVRC2012_val_00006595.JPEG 30.00KiB + shard-0/ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB + shard-0/ImageNet/Data/val/n00000333/ILSVRC2012_val_00012920.JPEG 30.00KiB + shard-0/ImageNet/Data/val/n00000333/ILSVRC2012_val_00021981.JPEG 30.00KiB + shard-0/ImageNet/Annotations/n00000369/n00000369_01.xml 100B + ... + ``` + +4. **Filter source files using prefix:** You can specify a prefix for the files to include in `ishard` using the `src_bck` parameter. For example, the following command specifies `Data` as the prefix in the source bucket, which includes only the files whose names start with `Data`. + + ```sh + $ ./ishard -src_bck=ais://ImageNet/ImageNet/Data -dst_bck=ais://ImageNet-out + + $ ais archive ls ais://ImageNet-out | less + + NAME SIZE + shard-0.tar 1.03MiB + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_01.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_02.loss 100B + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_03.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000333/n00000333_03.loss 100B + shard-0.tar/ImageNet/Data/train/n00000369/n00000369_01.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000369/n00000369_01.loss 100B + shard-0.tar/ImageNet/Data/train/n00000369/n00000369_02.JPEG 30.00KiB + shard-0.tar/ImageNet/Data/train/n00000369/n00000369_02.loss 100B + shard-0.tar/ImageNet/Data/train/n00000369/n00000369_03.JPEG 30.00KiB + ... + ``` + +5. **Generate output shards name using template:** You can use various templates to generate output shards using `-shard_template`. For example: + + ```sh + $ ./ishard-cli -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out -shard_template="pre-{0000..8192..8}-suf" + NAME SIZE - pre-0000-suf.tar 249.00KiB + pre-0000-suf.tar 1.07MiB pre-0000-suf.tar/ImageNet/Annotations/n00000333/n00000333_01.xml 100B pre-0000-suf.tar/ImageNet/Annotations/n00000333/n00000333_02.xml 100B - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_01.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_02.loss 100B - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_03.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/train/n00000333/n00000333_03.loss 100B - pre-0000-suf.tar/ImageNet/Data/val/n00000333/ILSVRC2012_val_00001851.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/val/n00000333/ILSVRC2012_val_00006595.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/val/n00000333/ILSVRC2012_val_00012920.JPEG 30.00KiB - pre-0000-suf.tar/ImageNet/Data/val/n00000333/ILSVRC2012_val_00021981.JPEG 30.00KiB - pre-0008-suf.tar 281.50KiB - pre-0008-suf.tar/ImageNet/Annotations/n00000369/n00000369_01.xml 100B - pre-0008-suf.tar/ImageNet/Annotations/n00000369/n00000369_02.xml 100B + ... + pre-0008-suf.tar 1.07MiB + pre-0008-suf.tar/ImageNet/Annotations/n00005864/n00005864_02.xml 100B + pre-0008-suf.tar/ImageNet/Annotations/n00007702/n00007702_01.xml 100B + ... + pre-0016-suf.tar 1.07MiB + pre-0016-suf.tar/ImageNet/Annotations/n00014536/n00014536_02.xml 100B + pre-0016-suf.tar/ImageNet/Annotations/n00015250/n00015250_01.xml 100B ... ``` @@ -268,32 +304,46 @@ ImageNet/Data/val/n00000333/ILSVRC2012_val_00007175.JPEG 30.00KiB ## Dry Run -The `-dry_run` flag in the CLI parameters allows `ishard` to only print a preview of the output shards composition without performing the actual archiving tasks. This is especially useful when working with large datasets, where the full execution of `ishard` can take hours to complete. The string inside `[]` in the output represents the sample key of the sample to which the following files belong. +The `-dry_run` flag in the CLI parameters allows `ishard` to only print a preview of the output shards composition without performing the actual archiving tasks. This is especially useful when working with large datasets, where the full execution of `ishard` can take hours to complete. ```sh $ ./ishard-cli -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run | less -pre-0000-suf.tar 120.68KiB - [n00000333_01] +pre-0000-suf.tar 120.68KiB pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_01.xml 100B pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_01.JPEG 30.00KiB - [n00000333_02] pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_02.xml 100B pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_02.loss 100B - [n00000369_01] pre-0000-suf/ImageNet/Annotations/n00000369/n00000369_01.xml 100B pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_01.JPEG 30.00KiB pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_01.loss 100B - [n00000369_02] pre-0000-suf/ImageNet/Annotations/n00000369/n00000369_02.xml 100B pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_02.JPEG 30.00KiB pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_02.loss 100B -pre-0008-suf.tar 120.59KiB - [n00000369_03] +pre-0008-suf.tar 120.59KiB pre-0008-suf/ImageNet/Annotations/n00000369/n00000369_03.xml 100B - pre-0008-suf/ImageNet/Data/train/n00000369/n00000369_03.JPEG 30.00KiB - [n00000369_04] +... +``` + +You can also apply `-dry_run="show_keys"` to display the key of each group of samples after `sample_key_pattern` substitution. The string inside `[]` in the output represents the sample key of the sample to which the following files belong. + +```sh +$ ./ishard-cli -max_shard_size=102400 -src_bck=ais://ImageNet -dst_bck=ais://ImageNet-out --sample_key_pattern="base_file_name" -shard_template="pre-{0000..8192..8}-suf" -dry_run="show_keys" | less + +pre-0000-suf.tar 120.68KiB + [n00000333_01] + pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_01.xml 100B + pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_01.JPEG 30.00KiB + [n00000333_02] + pre-0000-suf/ImageNet/Annotations/n00000333/n00000333_02.xml 100B + pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_02.JPEG 30.00KiB + pre-0000-suf/ImageNet/Data/train/n00000333/n00000333_02.loss 100B + [n00000369_01] + pre-0000-suf/ImageNet/Annotations/n00000369/n00000369_01.xml 100B + pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_01.JPEG 30.00KiB + pre-0000-suf/ImageNet/Data/train/n00000369/n00000369_01.loss 100B + [n00000369_02] ... ``` @@ -335,8 +385,9 @@ go test -v -short -tags=debug -run=TestIshardMaxShardSize - [ ] version 0.9 (github checksum, git cmd) - [ ] go install - [ ] debug build -- [ ] allow user to specify source directories to include/exclude +- [X] allow user to specify source directories to include/exclude (achieved by prefix option) - [ ] logging (timestamp, nlog) +- [ ] Large list of objects, need to swap MEM temporary - [X] Long stress tests ### GOOD TO HAVE diff --git a/cmd/ishard/ishard/config/config.go b/cmd/ishard/ishard/config/config.go index 6d0a98791ce..f7254b94eac 100644 --- a/cmd/ishard/ishard/config/config.go +++ b/cmd/ishard/ishard/config/config.go @@ -7,6 +7,7 @@ package config import ( "flag" "log" + "os" "regexp" "strings" @@ -27,13 +28,18 @@ type ( MissingExtAction string Collapse bool } + DryRunFlag struct { + IsSet bool + Mode string + } Config struct { ClusterConfig IshardConfig - SrcBck cmn.Bck - DstBck cmn.Bck - Progress bool - DryRun bool + DryRunFlag + SrcBck cmn.Bck + SrcPrefix string + DstBck cmn.Bck + Progress bool } ) @@ -48,7 +54,26 @@ var DefaultConfig = Config{ SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS}, DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS}, Progress: false, - DryRun: false, + DryRunFlag: DryRunFlag{IsSet: false}, +} + +func (d *DryRunFlag) String() string { + return d.Mode +} + +func (d *DryRunFlag) Set(value string) error { + if value == "true" || value == "false" { + d.IsSet = value == "true" + d.Mode = "" + } else { + d.IsSet = true + d.Mode = value + } + return nil +} + +func (d *DryRunFlag) IsBoolFlag() bool { + return true } // Load configuration for ishard from cli, or spec files (TODO) @@ -60,14 +85,14 @@ func Load() (*Config, error) { func parseCliParams(cfg *Config) { flag.Int64Var(&cfg.MaxShardSize, "max_shard_size", 1024000, "Desired size of each output shard") - flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "Source bucket name or URI. If empty, a bucket with random name will be created") - flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "Destination bucket name or URI. If empty, a bucket with random name will be created") + flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "Source bucket name or URI.") + flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "Destination bucket name or URI.") flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%d", "Template used for generating output shards. Accepts Bash (prefix{0001..0010}suffix), Fmt (prefix-%06d-suffix), or At (prefix-@00001-gap-@100-suffix) templates") flag.StringVar(&cfg.Ext, "ext", ".tar", "Extension used for generating output shards.") flag.StringVar(&cfg.MissingExtAction, "missing_extension_action", "ignore", "Action to take when an extension is missing: abort | warn | ignore") flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.") flag.BoolVar(&cfg.Progress, "progress", false, "If true, display the progress of processing objects in the source bucket.") - flag.BoolVar(&cfg.DryRun, "dry_run", false, "If true, only shows the layout of resulting output shards without actually executing archive jobs.") + flag.Var(&cfg.DryRunFlag, "dry_run", "If set, only shows the layout of resulting output shards without actually executing archive jobs. Use 'show_keys' to include sample keys.") var ( sampleExts string @@ -91,20 +116,37 @@ func parseCliParams(cfg *Config) { "full_name": FullNamePattern, "collapse_all_dir": CollapseAllDirPattern, } - if pattern, ok := commonPatterns[sampleKeyPatternStr]; ok { + + if sampleKeyPatternStr == "" { + log.Printf("`sample_key_pattern` is not specified, use `base_file_name` as sample key by default.") + cfg.SampleKeyPattern = BaseFileNamePattern + } else if pattern, ok := commonPatterns[sampleKeyPatternStr]; ok { cfg.SampleKeyPattern = pattern } else { log.Printf("`sample_key_pattern` %s is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex.", sampleKeyPatternStr) if _, err := regexp.Compile(sampleKeyPatternStr); err != nil { - log.Fatalf("Invalid regex pattern: %s. Error: %v", cfg.SampleKeyPattern, err) + log.Printf("Invalid regex pattern: %s. Error: %v", cfg.SampleKeyPattern, err) + flag.Usage() + os.Exit(1) } cfg.SampleKeyPattern = SampleKeyPattern{Regex: sampleKeyPatternStr, CaptureGroup: "$1"} } - if cfg.SrcBck.Provider, cfg.SrcBck.Name = cmn.ParseURLScheme(cfg.SrcBck.Name); cfg.SrcBck.Provider == "" { - cfg.SrcBck.Provider = apc.AIS + if cfg.SrcBck.Name == "" || cfg.DstBck.Name == "" { + log.Println("Error: src_bck and dst_bck are required parameters.") + flag.Usage() + os.Exit(1) + } + + var err error + if cfg.SrcBck, cfg.SrcPrefix, err = cmn.ParseBckObjectURI(cfg.SrcBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil { + log.Printf("Error on parsing source bucket: %s. Error: %v", cfg.SrcBck.Name, err) + flag.Usage() + os.Exit(1) } - if cfg.DstBck.Provider, cfg.DstBck.Name = cmn.ParseURLScheme(cfg.DstBck.Name); cfg.DstBck.Provider == "" { - cfg.DstBck.Provider = apc.AIS + if cfg.DstBck, _, err = cmn.ParseBckObjectURI(cfg.DstBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil { + log.Printf("Error on parsing destination bucket: %s. Error: %v", cfg.SrcBck.Name, err) + flag.Usage() + os.Exit(1) } } diff --git a/cmd/ishard/ishard/ishard.go b/cmd/ishard/ishard/ishard.go index 913d8e87b7c..3b7768fb0ee 100644 --- a/cmd/ishard/ishard/ishard.go +++ b/cmd/ishard/ishard/ishard.go @@ -19,7 +19,6 @@ import ( "github.com/NVIDIA/aistore/cmd/ishard/ishard/config" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/ext/dsort/shard" "github.com/NVIDIA/aistore/xact" "github.com/vbauerster/mpb/v4" @@ -110,6 +109,7 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor totalSize int64 recs = shard.NewRecords(16) errCh = make(chan error, 1) + wg = cos.NewLimitedWaitGroup(cmn.MaxParallelism(), 0) ) for name, child := range n.children { @@ -125,9 +125,36 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor recs.Insert(childRecords.All()...) } totalSize += subtreeSize + + if totalSize < is.cfg.MaxShardSize { + continue + } + + name, hasNext := is.shardIter.Next() + if !hasNext { + return nil, 0, fmt.Errorf("number of shards to be created exceeds expected number of shards (%d)", is.shardIter.Count()) + } + + sh := &shard.Shard{ + Size: totalSize, + Records: recs, + Name: name, + } + totalSize = 0 + recs = shard.NewRecords(16) + + if is.cfg.DryRunFlag.IsSet { + is.printShard(sh) + continue + } + + wg.Add(1) + go func(sh *shard.Shard) { + is.generateShard(sh, errCh) + wg.Done() + }(sh) } - wg := cos.NewLimitedWaitGroup(cmn.MaxParallelism(), 0) n.records.Lock() for _, record := range n.records.All() { totalSize += record.TotalSize() @@ -150,7 +177,7 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor totalSize = 0 recs = shard.NewRecords(16) - if is.cfg.DryRun { + if is.cfg.DryRunFlag.IsSet { is.printShard(sh) continue } @@ -176,7 +203,7 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor Name: name, } - if is.cfg.DryRun { + if is.cfg.DryRunFlag.IsSet { is.printShard(sh) } else { wg.Add(1) @@ -202,12 +229,11 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor } func (is *ISharder) generateShard(sh *shard.Shard, errCh chan error) { + defer sh.Records.Drain() if sh.Records.Len() == 0 { return } - defer sh.Records.Drain() - paths := []string{} sh.Records.Lock() for _, record := range sh.Records.All() { @@ -275,12 +301,16 @@ func (is *ISharder) findMissingExt(node *dirNode, recursive bool) error { } func (is *ISharder) printShard(sh *shard.Shard) { + defer sh.Records.Drain() + if sh.Records.Len() == 0 { + return + } + w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', tabwriter.AlignRight) if err := is.CLItemplate.Execute(w, sh); err != nil { fmt.Println("error executing template: %w", err) } w.Flush() - sh.Records.Drain() } // NewISharder instantiates an ISharder with the configuration if provided; @@ -294,7 +324,6 @@ func NewISharder(cfgArg *config.Config) (is *ISharder, err error) { } else { is.cfg, err = config.Load() if err != nil { - nlog.Errorf("Error initializing config: %v. Using default config.", err) defaultCfg := config.DefaultConfig is.cfg = &defaultCfg } @@ -316,24 +345,29 @@ func NewISharder(cfgArg *config.Config) (is *ISharder, err error) { is.missingExtAct = config.MissingExtActMap[is.cfg.MissingExtAction] is.sampleKeyRegex = regexp.MustCompile(is.cfg.SampleKeyPattern.Regex) - if is.cfg.DryRun { - tmplWithSampleKey := `{{$shard := .}}{{appendExt $shard.Name}} {{formatSize $shard.Size}} -{{range $rec := .Records.All}} {{sampleKey $rec.Name}} -{{range $obj := .Objects}} {{contentPath $shard.Name $obj.ContentPath}} {{formatSize .Size}} -{{end}}{{end}}` + if is.cfg.DryRunFlag.IsSet { + var sb strings.Builder + sb.WriteString("{{$shard := .}}{{appendExt $shard.Name}}\t{{formatSize $shard.Size}}\n") + sb.WriteString("{{range $rec := .Records.All}}") + if is.cfg.DryRunFlag.Mode == "show_keys" { + sb.WriteString(" {{sampleKey $rec.Name}}\t\n") + } + sb.WriteString("{{range $obj := .Objects}} {{contentPath $shard.Name $obj.ContentPath}}\t{{formatSize $obj.Size}}\n") + sb.WriteString("{{end}}{{end}}") + is.CLItemplate = template.Must(template.New("shard").Funcs(template.FuncMap{ "formatSize": func(size int64) string { return cos.ToSizeIEC(size, 2) }, "sampleKey": func(sampleKey string) string { return "[" + sampleKey + "]" }, "contentPath": func(shardName, objContentPath string) string { return filepath.Join(shardName, objContentPath) }, "appendExt": func(shardName string) string { return shardName + is.cfg.Ext }, - }).Parse(tmplWithSampleKey)) + }).Parse(sb.String())) } return is, err } func (is *ISharder) Start() error { - msg := &apc.LsoMsg{} + msg := &apc.LsoMsg{Prefix: is.cfg.SrcPrefix, Flags: apc.LsNameSize} objList, err := api.ListObjects(is.baseParams, is.cfg.SrcBck, msg, api.ListArgs{}) if err != nil { return err @@ -377,7 +411,7 @@ func (is *ISharder) Start() error { return err } - if is.cfg.DryRun { + if is.cfg.DryRunFlag.IsSet { return nil } diff --git a/cmd/ishard/ishard/ishard_test.go b/cmd/ishard/ishard/ishard_test.go index 165ea1f4778..e0c80ea1090 100644 --- a/cmd/ishard/ishard/ishard_test.go +++ b/cmd/ishard/ishard/ishard_test.go @@ -29,8 +29,8 @@ import ( ) func runIshardTest(t *testing.T, cfg *config.Config, baseParams api.BaseParams, numRecords, numExtensions int, fileSize int64, sampleKeyPattern config.SampleKeyPattern, randomize, dropout bool) { - tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, false /*cleanup*/) - tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, false /*cleanup*/) + tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) + tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) extensions := cfg.IshardConfig.SampleExtensions // If sample extensions is not specified in config, randomly generate them @@ -42,7 +42,7 @@ func runIshardTest(t *testing.T, cfg *config.Config, baseParams api.BaseParams, numExtensions = len(extensions) } - totalSize, err := generateNestedStructure(baseParams, cfg.SrcBck, numRecords, extensions, fileSize, randomize, dropout) + totalSize, err := generateNestedStructure(baseParams, cfg.SrcBck, numRecords, "", extensions, fileSize, randomize, dropout) tassert.CheckFatal(t, err) isharder, err := ishard.NewISharder(cfg) @@ -172,6 +172,61 @@ func TestIshardMaxShardSize(t *testing.T) { } } +func TestIshardPrefix(t *testing.T) { + var ( + cfg = &config.Config{ + SrcBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + DstBck: cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + }, + IshardConfig: config.IshardConfig{ + MaxShardSize: 102400, + Collapse: false, + ShardTemplate: "shard-%d", + Ext: ".tar", + SampleKeyPattern: config.BaseFileNamePattern, + }, + ClusterConfig: config.DefaultConfig.ClusterConfig, + SrcPrefix: "matched_prefix_", + } + baseParams = api.BaseParams{ + URL: cfg.URL, + Client: cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}), + } + numRecords = 50 + numExtensions = 3 + fileSize = 32 * cos.KiB + ) + + tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) + tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) + + extensions := make([]string, numExtensions) + for i := range numExtensions { + extensions[i] = "." + trand.String(3) + } + + totalSize, err := generateNestedStructure(baseParams, cfg.SrcBck, numRecords, "matched_prefix_", extensions, int64(fileSize), false, false) + tassert.CheckFatal(t, err) + _, err = generateNestedStructure(baseParams, cfg.SrcBck, numRecords/2, "unmatched_prefix_", extensions, int64(fileSize), false, false) + tassert.CheckFatal(t, err) + + isharder, err := ishard.NewISharder(cfg) + tassert.CheckFatal(t, err) + + tlog.Logf("starting ishard, from %s to %s\n", cfg.SrcBck, cfg.DstBck) + + err = isharder.Start() + tassert.CheckFatal(t, err) + + // Only files with `matched_prefix_` counts + checkOutputShards(t, baseParams, cfg.DstBck, numRecords*numExtensions, totalSize, config.BaseFileNamePattern, false) +} + func TestIshardTemplate(t *testing.T) { testCases := []struct { numRecords int @@ -180,8 +235,8 @@ func TestIshardTemplate(t *testing.T) { shardTemplate string }{ {numRecords: 50, fileSize: 32 * cos.KiB, maxShardSize: 128 * cos.KiB, shardTemplate: "prefix{0000..9999}-suffix"}, - {numRecords: 100, fileSize: 96 * cos.KiB, maxShardSize: 256 * cos.KiB, shardTemplate: "prefix-%06d-suffix"}, - {numRecords: 200, fileSize: 24 * cos.KiB, maxShardSize: 16 * cos.KiB, shardTemplate: "prefix-@00001-gap-@100-suffix"}, + {numRecords: 50, fileSize: 96 * cos.KiB, maxShardSize: 256 * cos.KiB, shardTemplate: "prefix-%06d-suffix"}, + {numRecords: 50, fileSize: 24 * cos.KiB, maxShardSize: 16 * cos.KiB, shardTemplate: "prefix-@00001-gap-@100-suffix"}, } for _, tc := range testCases { @@ -238,18 +293,25 @@ func TestIshardTemplate(t *testing.T) { } func TestIshardSampleKeyPattern(t *testing.T) { - testCases := []struct { + type tc struct { collapse bool pattern config.SampleKeyPattern - }{ - {pattern: config.FullNamePattern, collapse: true}, + } + testCases := []tc{ {pattern: config.FullNamePattern, collapse: false}, - {pattern: config.BaseFileNamePattern, collapse: true}, {pattern: config.BaseFileNamePattern, collapse: false}, - {pattern: config.CollapseAllDirPattern, collapse: true}, {pattern: config.CollapseAllDirPattern, collapse: false}, } + // If long test, extend to test cases + if !testing.Short() { + testCases = append(testCases, + tc{pattern: config.FullNamePattern, collapse: true}, + tc{pattern: config.BaseFileNamePattern, collapse: true}, + tc{pattern: config.CollapseAllDirPattern, collapse: true}, + ) + } + for _, tc := range testCases { t.Run(fmt.Sprintf("Pattern:%v/Collapse:%v", tc.pattern, tc.collapse), func(t *testing.T) { var ( @@ -275,15 +337,11 @@ func TestIshardSampleKeyPattern(t *testing.T) { URL: cfg.URL, Client: cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}), } - numRecords = 500 + numRecords = 50 numExtensions = 5 fileSize = 32 * cos.KiB ) - if testing.Short() { - numRecords /= 10 - } - runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), tc.pattern, true /*randomize*/, false /*dropout*/) }) } @@ -328,6 +386,8 @@ func TestIshardMissingExtension(t *testing.T) { } func TestIshardParallel(t *testing.T) { + tools.CheckSkip(t, &tools.SkipTestArgs{Long: true}) + var ishardsCount = 5 wg := &sync.WaitGroup{} @@ -361,10 +421,6 @@ func TestIshardParallel(t *testing.T) { fileSize = 32 * cos.KiB ) - if testing.Short() { - numRecords /= 1000 - } - wg.Add(1) go func(cfg config.Config, baseParams api.BaseParams) { defer wg.Done() @@ -375,6 +431,8 @@ func TestIshardParallel(t *testing.T) { } func TestIshardChain(t *testing.T) { + tools.CheckSkip(t, &tools.SkipTestArgs{Long: true}) + var ishardsCount = 5 for i := range ishardsCount { @@ -407,10 +465,6 @@ func TestIshardChain(t *testing.T) { fileSize = 32 * cos.KiB ) - if testing.Short() { - numRecords /= 1000 - } - runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) } } @@ -484,7 +538,7 @@ func TestIshardLargeFiles(t *testing.T) { } // Helper function to generate a nested directory structure -func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecords int, extensions []string, fileSize int64, randomize, dropout bool) (totalSize int64, _ error) { +func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecords int, prefix string, extensions []string, fileSize int64, randomize, dropout bool) (totalSize int64, _ error) { randomFilePath := func() string { levels := rand.IntN(3) + 1 // Random number of subdirectory levels (1-3) parts := make([]string, levels) @@ -517,7 +571,7 @@ func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecor if _, err := api.PutObject(&api.PutArgs{ BaseParams: baseParams, Bck: bucket, - ObjName: filepath.Join(basePath, baseNameWithExt), + ObjName: filepath.Join(prefix, basePath, baseNameWithExt), Reader: r, Size: uint64(size), }); err != nil { @@ -546,7 +600,7 @@ func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecor if _, err := api.PutObject(&api.PutArgs{ BaseParams: baseParams, Bck: bucket, - ObjName: objectName, + ObjName: filepath.Join(prefix, objectName), Reader: r, Size: uint64(size), }); err != nil { @@ -563,7 +617,7 @@ func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecor if _, err := api.PutObject(&api.PutArgs{ BaseParams: baseParams, Bck: bucket, - ObjName: objectName, + ObjName: filepath.Join(prefix, objectName), Reader: r, Size: uint64(size), }); err != nil {