-
Notifications
You must be signed in to change notification settings - Fork 189
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement
ishard
utility to archive objects according to subdirecto…
…ry paths Signed-off-by: Tony Chen <[email protected]>
- Loading branch information
1 parent
3c6a024
commit 252f952
Showing
5 changed files
with
591 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
// Package main for the `ishard` executable. | ||
/* | ||
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
*/ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
|
||
"github.com/NVIDIA/aistore/tools/ishard" | ||
) | ||
|
||
func main() { | ||
ishard.Init(nil) | ||
if err := ishard.Start(); err != nil { | ||
fmt.Fprintf(os.Stderr, "ishard exited with error: %v\n", err) | ||
os.Exit(1) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# Initial Sharder | ||
|
||
This package provides a utility for archiving objects from a bucket into shard files. It ensures that objects from the same directory are not split across different output shards. | ||
|
||
## Assumptions | ||
|
||
**Do Not Split A Record Across Directories:** The utility assumes that objects within the same record should NOT be split across multiple subdirectories. This ensures that members of the same record are archived together. Users will be able to customize the **record key** and the **extensions** associated with the record through configuration (feature to be implemented). | ||
|
||
> This limitation will become a configurable user selection at a later point, with capabilities to customize record keys, etc. | ||
## Parameters | ||
|
||
- `-shard_size`: The desired size of each output shard in bytes. Default is `1024000`. | ||
- `-src_bck`: The source bucket name or URI. If empty, a bucket with a random name will be created. | ||
- `-dst_bck`: The destination bucket name or URI. If empty, a bucket with a random name will be created. | ||
- `-src_bck_provider`: The provider for the source bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`. | ||
- `-dst_bck_provider`: The provider for the destination bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`. | ||
- `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size. | ||
|
||
## Initial Setup | ||
|
||
**Build the Package:** | ||
|
||
```sh | ||
go build -o ishard . | ||
``` | ||
|
||
## Sample Usage | ||
|
||
```sh | ||
./ishard -shard_size=10240 -src_bck=source_bucket -dst_bck=destination_bucket -src_bck_provider=ais -dst_bck_provider=ais | ||
``` | ||
|
||
## Running the Tests | ||
|
||
```sh | ||
go test -v | ||
``` | ||
|
||
## TODO List | ||
|
||
### MUST HAVE/DESIRABLE | ||
- [ ] Shard name patterns | ||
- [ ] Utilize existing name template tools | ||
- [ ] goroutine | ||
- [ ] configurable record key, extensions | ||
- [ ] logging (timestamp, nlog) | ||
- [ ] reports missing member in a record | ||
- [ ] allow user to specify target directories to include/exclude | ||
- [ ] E2E testing from CLI | ||
|
||
### GOOD TO HAVE | ||
- [ ] progress bar (later) | ||
- [ ] integration into aistore (later) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
// Package config provides types and functions to configure ishard executable. | ||
/* | ||
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
*/ | ||
package config | ||
|
||
import ( | ||
"flag" | ||
|
||
"github.com/NVIDIA/aistore/api/apc" | ||
"github.com/NVIDIA/aistore/cmn" | ||
) | ||
|
||
type ( | ||
ClusterConfig struct { | ||
URL string | ||
} | ||
IshardConfig struct { | ||
MaxShardSize int64 | ||
StartIdx int | ||
IdxDigits int | ||
Ext string | ||
Prefix string | ||
Collapse bool | ||
} | ||
Config struct { | ||
ClusterConfig | ||
IshardConfig | ||
SrcBck cmn.Bck | ||
DstBck cmn.Bck | ||
} | ||
) | ||
|
||
const ( | ||
defaultClusterIPv4 = "127.0.0.1" | ||
defaultProxyPort = "8080" | ||
) | ||
|
||
var DefaultConfig = Config{ | ||
ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort}, | ||
IshardConfig: IshardConfig{MaxShardSize: 102400, StartIdx: 0, IdxDigits: 4, Ext: ".tar", Prefix: "shard-", Collapse: false}, | ||
SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS}, | ||
DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS}, | ||
} | ||
|
||
// Load configuration for ishard from cli, or spec files (TODO) | ||
func Load() (*Config, error) { | ||
cfg := DefaultConfig | ||
parseCliParams(&cfg) | ||
return &cfg, nil | ||
} | ||
|
||
func parseCliParams(cfg *Config) { | ||
flag.Int64Var(&cfg.MaxShardSize, "max_shard_size", 1024000, "desired size of each output shard") | ||
flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "the source bucket name or URI. If empty, a bucket with random name will be created") | ||
flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "the destination bucket name or URI. If empty, a bucket with random name will be created") | ||
flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.") | ||
flag.Parse() | ||
|
||
if cfg.SrcBck.Provider, cfg.SrcBck.Name = cmn.ParseURLScheme(cfg.SrcBck.Name); cfg.SrcBck.Provider == "" { | ||
cfg.SrcBck.Provider = apc.AIS | ||
} | ||
if cfg.DstBck.Provider, cfg.DstBck.Name = cmn.ParseURLScheme(cfg.DstBck.Name); cfg.DstBck.Provider == "" { | ||
cfg.DstBck.Provider = apc.AIS | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
// Package ishard provides utility for shard the initial dataset | ||
/* | ||
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
*/ | ||
package ishard | ||
|
||
import ( | ||
"fmt" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/NVIDIA/aistore/api" | ||
"github.com/NVIDIA/aistore/api/apc" | ||
"github.com/NVIDIA/aistore/cmn" | ||
"github.com/NVIDIA/aistore/cmn/nlog" | ||
"github.com/NVIDIA/aistore/ext/dsort/shard" | ||
"github.com/NVIDIA/aistore/tools/ishard/config" | ||
) | ||
|
||
var ( | ||
cfg *config.Config | ||
baseParams api.BaseParams | ||
) | ||
|
||
var ( | ||
shardIdx = 0 | ||
) | ||
|
||
// Represents the hierarchical structure of virtual directories within a bucket | ||
type Node struct { | ||
children map[string]*Node | ||
records *shard.Records | ||
} | ||
|
||
func NewNode() *Node { | ||
return &Node{ | ||
children: make(map[string]*Node), | ||
records: shard.NewRecords(16), | ||
} | ||
} | ||
|
||
func (n *Node) Insert(path string, size int64) { | ||
parts := strings.Split(path, "/") | ||
current := n | ||
|
||
for i, part := range parts { | ||
if _, exists := current.children[part]; !exists { | ||
if i == len(parts)-1 { | ||
ext := filepath.Ext(path) | ||
base := strings.TrimSuffix(path, ext) | ||
current.records.Insert(&shard.Record{ | ||
Key: base, | ||
Name: base, | ||
Objects: []*shard.RecordObj{{ | ||
ContentPath: path, | ||
StoreType: shard.SGLStoreType, | ||
Offset: 0, | ||
MetadataSize: 0, | ||
Size: size, | ||
Extension: ext, | ||
}}, | ||
}) | ||
} else { | ||
current.children[part] = NewNode() | ||
} | ||
} | ||
current = current.children[part] | ||
} | ||
} | ||
|
||
func (n *Node) Print(prefix string) { | ||
for name, child := range n.children { | ||
fmt.Printf("%s%s/", prefix, name) | ||
names := []string{} | ||
child.records.Lock() | ||
for _, r := range child.records.All() { | ||
names = append(names, r.Name) | ||
} | ||
child.records.Unlock() | ||
fmt.Println(names) | ||
|
||
child.Print(prefix + " ") | ||
} | ||
} | ||
|
||
// Archive objects from this node into shards according to subdirectories structure | ||
func (n *Node) Archive() error { | ||
paths := []string{} | ||
_, err := archiveNode(n, "", &paths) | ||
|
||
if cfg.Collapse && len(paths) != 0 { | ||
msg := cmn.ArchiveBckMsg{ | ||
ToBck: cfg.DstBck, | ||
ArchiveMsg: apc.ArchiveMsg{ | ||
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), | ||
ListRange: apc.ListRange{ | ||
ObjNames: paths, | ||
}, | ||
}, | ||
} | ||
|
||
if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { | ||
return fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) | ||
} | ||
} | ||
return err | ||
} | ||
|
||
func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) { | ||
totalSize := int64(0) | ||
paths := []string{} | ||
|
||
for name, child := range node.children { | ||
fullPath := path + "/" + name | ||
if path == "" { | ||
fullPath = name | ||
} | ||
subtreeSize, err := archiveNode(child, fullPath, &paths) | ||
if err != nil { | ||
return 0, err | ||
} | ||
totalSize += subtreeSize | ||
} | ||
|
||
node.records.Lock() | ||
for _, record := range node.records.All() { | ||
totalSize += record.TotalSize() | ||
for _, obj := range record.Objects { | ||
paths = append(paths, record.MakeUniqueName(obj)) | ||
} | ||
if totalSize > cfg.MaxShardSize { | ||
msg := cmn.ArchiveBckMsg{ | ||
ToBck: cfg.DstBck, | ||
ArchiveMsg: apc.ArchiveMsg{ | ||
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), | ||
ListRange: apc.ListRange{ | ||
ObjNames: paths, | ||
}, | ||
}, | ||
} | ||
|
||
if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { | ||
return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) | ||
} | ||
|
||
shardIdx++ | ||
totalSize = 0 | ||
paths = []string{} | ||
} | ||
} | ||
node.records.Unlock() | ||
|
||
if len(paths) == 0 { | ||
return 0, nil | ||
} | ||
|
||
// Allow to flatten remaining objects into parent directory | ||
if cfg.Collapse { | ||
*parentPaths = append(*parentPaths, paths...) | ||
paths = nil | ||
return totalSize, nil | ||
} | ||
|
||
// Otherwise, archive all remaining objects regardless the current total size | ||
msg := cmn.ArchiveBckMsg{ | ||
ToBck: cfg.DstBck, | ||
ArchiveMsg: apc.ArchiveMsg{ | ||
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext), | ||
ListRange: apc.ListRange{ | ||
ObjNames: paths, | ||
}, | ||
}, | ||
} | ||
|
||
if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil { | ||
return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err) | ||
} | ||
shardIdx++ | ||
return totalSize, nil | ||
} | ||
|
||
// Init sets the configuration for ishard. If a config is provided, it uses that; | ||
// otherwise, it loads from CLI or uses the default config. | ||
func Init(cfgArg *config.Config) error { | ||
var err error | ||
|
||
// Use provided config if given | ||
if cfgArg != nil { | ||
cfg = cfgArg | ||
} else { | ||
cfg, err = config.Load() | ||
if err != nil { | ||
nlog.Errorf("Error initializing config: %v. Using default config.", err) | ||
defaultCfg := config.DefaultConfig | ||
cfg = &defaultCfg | ||
} | ||
} | ||
|
||
baseParams = api.BaseParams{URL: cfg.URL} | ||
baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}) | ||
return err | ||
} | ||
|
||
func Start() error { | ||
msg := &apc.LsoMsg{} | ||
objList, err := api.ListObjects(baseParams, cfg.SrcBck, msg, api.ListArgs{}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
root := NewNode() | ||
for _, en := range objList.Entries { | ||
root.Insert(en.Name, en.Size) | ||
} | ||
|
||
return root.Archive() | ||
} |
Oops, something went wrong.