Skip to content

Commit

Permalink
Implement ishard utility to archive objects according to subdirecto…
Browse files Browse the repository at this point in the history
…ry paths

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jul 3, 2024
1 parent 3c6a024 commit 252f952
Show file tree
Hide file tree
Showing 5 changed files with 591 additions and 0 deletions.
20 changes: 20 additions & 0 deletions cmd/ishard/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Package main for the `ishard` executable.
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*/
package main

import (
"fmt"
"os"

"github.com/NVIDIA/aistore/tools/ishard"
)

func main() {
ishard.Init(nil)
if err := ishard.Start(); err != nil {
fmt.Fprintf(os.Stderr, "ishard exited with error: %v\n", err)
os.Exit(1)
}
}
54 changes: 54 additions & 0 deletions tools/ishard/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Initial Sharder

This package provides a utility for archiving objects from a bucket into shard files. It ensures that objects from the same directory are not split across different output shards.

## Assumptions

**Do Not Split A Record Across Directories:** The utility assumes that objects within the same record should NOT be split across multiple subdirectories. This ensures that members of the same record are archived together. Users will be able to customize the **record key** and the **extensions** associated with the record through configuration (feature to be implemented).

> This limitation will become a configurable user selection at a later point, with capabilities to customize record keys, etc.
## Parameters

- `-shard_size`: The desired size of each output shard in bytes. Default is `1024000`.
- `-src_bck`: The source bucket name or URI. If empty, a bucket with a random name will be created.
- `-dst_bck`: The destination bucket name or URI. If empty, a bucket with a random name will be created.
- `-src_bck_provider`: The provider for the source bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`.
- `-dst_bck_provider`: The provider for the destination bucket (`ais`, `aws`, `azure`, `gcp`). Default is `ais`.
- `-collapse`: If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.

## Initial Setup

**Build the Package:**

```sh
go build -o ishard .
```

## Sample Usage

```sh
./ishard -shard_size=10240 -src_bck=source_bucket -dst_bck=destination_bucket -src_bck_provider=ais -dst_bck_provider=ais
```

## Running the Tests

```sh
go test -v
```

## TODO List

### MUST HAVE/DESIRABLE
- [ ] Shard name patterns
- [ ] Utilize existing name template tools
- [ ] goroutine
- [ ] configurable record key, extensions
- [ ] logging (timestamp, nlog)
- [ ] reports missing member in a record
- [ ] allow user to specify target directories to include/exclude
- [ ] E2E testing from CLI

### GOOD TO HAVE
- [ ] progress bar (later)
- [ ] integration into aistore (later)
66 changes: 66 additions & 0 deletions tools/ishard/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Package config provides types and functions to configure ishard executable.
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*/
package config

import (
"flag"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
)

type (
ClusterConfig struct {
URL string
}
IshardConfig struct {
MaxShardSize int64
StartIdx int
IdxDigits int
Ext string
Prefix string
Collapse bool
}
Config struct {
ClusterConfig
IshardConfig
SrcBck cmn.Bck
DstBck cmn.Bck
}
)

const (
defaultClusterIPv4 = "127.0.0.1"
defaultProxyPort = "8080"
)

var DefaultConfig = Config{
ClusterConfig: ClusterConfig{URL: "http://" + defaultClusterIPv4 + ":" + defaultProxyPort},
IshardConfig: IshardConfig{MaxShardSize: 102400, StartIdx: 0, IdxDigits: 4, Ext: ".tar", Prefix: "shard-", Collapse: false},
SrcBck: cmn.Bck{Name: "src_bck", Provider: apc.AIS},
DstBck: cmn.Bck{Name: "dst_bck", Provider: apc.AIS},
}

// Load configuration for ishard from cli, or spec files (TODO)
func Load() (*Config, error) {
cfg := DefaultConfig
parseCliParams(&cfg)
return &cfg, nil
}

func parseCliParams(cfg *Config) {
flag.Int64Var(&cfg.MaxShardSize, "max_shard_size", 1024000, "desired size of each output shard")
flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "the source bucket name or URI. If empty, a bucket with random name will be created")
flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "the destination bucket name or URI. If empty, a bucket with random name will be created")
flag.BoolVar(&cfg.Collapse, "collapse", false, "If true, files in a subdirectory will be flattened and merged into its parent directory if their overall size doesn't reach the desired shard size.")
flag.Parse()

if cfg.SrcBck.Provider, cfg.SrcBck.Name = cmn.ParseURLScheme(cfg.SrcBck.Name); cfg.SrcBck.Provider == "" {
cfg.SrcBck.Provider = apc.AIS
}
if cfg.DstBck.Provider, cfg.DstBck.Name = cmn.ParseURLScheme(cfg.DstBck.Name); cfg.DstBck.Provider == "" {
cfg.DstBck.Provider = apc.AIS
}
}
217 changes: 217 additions & 0 deletions tools/ishard/ishard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Package ishard provides utility for shard the initial dataset
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*/
package ishard

import (
"fmt"
"path/filepath"
"strings"

"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort/shard"
"github.com/NVIDIA/aistore/tools/ishard/config"
)

var (
cfg *config.Config
baseParams api.BaseParams
)

var (
shardIdx = 0
)

// Represents the hierarchical structure of virtual directories within a bucket
type Node struct {
children map[string]*Node
records *shard.Records
}

func NewNode() *Node {
return &Node{
children: make(map[string]*Node),
records: shard.NewRecords(16),
}
}

func (n *Node) Insert(path string, size int64) {
parts := strings.Split(path, "/")
current := n

for i, part := range parts {
if _, exists := current.children[part]; !exists {
if i == len(parts)-1 {
ext := filepath.Ext(path)
base := strings.TrimSuffix(path, ext)
current.records.Insert(&shard.Record{
Key: base,
Name: base,
Objects: []*shard.RecordObj{{
ContentPath: path,
StoreType: shard.SGLStoreType,
Offset: 0,
MetadataSize: 0,
Size: size,
Extension: ext,
}},
})
} else {
current.children[part] = NewNode()
}
}
current = current.children[part]
}
}

func (n *Node) Print(prefix string) {
for name, child := range n.children {
fmt.Printf("%s%s/", prefix, name)
names := []string{}
child.records.Lock()
for _, r := range child.records.All() {
names = append(names, r.Name)
}
child.records.Unlock()
fmt.Println(names)

child.Print(prefix + " ")
}
}

// Archive objects from this node into shards according to subdirectories structure
func (n *Node) Archive() error {
paths := []string{}
_, err := archiveNode(n, "", &paths)

if cfg.Collapse && len(paths) != 0 {
msg := cmn.ArchiveBckMsg{
ToBck: cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext),
ListRange: apc.ListRange{
ObjNames: paths,
},
},
}

if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil {
return fmt.Errorf("failed to archive shard %d: %w", shardIdx, err)
}
}
return err
}

func archiveNode(node *Node, path string, parentPaths *[]string) (int64, error) {
totalSize := int64(0)
paths := []string{}

for name, child := range node.children {
fullPath := path + "/" + name
if path == "" {
fullPath = name
}
subtreeSize, err := archiveNode(child, fullPath, &paths)
if err != nil {
return 0, err
}
totalSize += subtreeSize
}

node.records.Lock()
for _, record := range node.records.All() {
totalSize += record.TotalSize()
for _, obj := range record.Objects {
paths = append(paths, record.MakeUniqueName(obj))
}
if totalSize > cfg.MaxShardSize {
msg := cmn.ArchiveBckMsg{
ToBck: cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext),
ListRange: apc.ListRange{
ObjNames: paths,
},
},
}

if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil {
return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err)
}

shardIdx++
totalSize = 0
paths = []string{}
}
}
node.records.Unlock()

if len(paths) == 0 {
return 0, nil
}

// Allow to flatten remaining objects into parent directory
if cfg.Collapse {
*parentPaths = append(*parentPaths, paths...)
paths = nil
return totalSize, nil
}

// Otherwise, archive all remaining objects regardless the current total size
msg := cmn.ArchiveBckMsg{
ToBck: cfg.DstBck,
ArchiveMsg: apc.ArchiveMsg{
ArchName: fmt.Sprintf("%s%0*d%s", cfg.Prefix, cfg.IdxDigits, shardIdx, cfg.Ext),
ListRange: apc.ListRange{
ObjNames: paths,
},
},
}

if _, err := api.ArchiveMultiObj(baseParams, cfg.SrcBck, &msg); err != nil {
return 0, fmt.Errorf("failed to archive shard %d: %w", shardIdx, err)
}
shardIdx++
return totalSize, nil
}

// Init sets the configuration for ishard. If a config is provided, it uses that;
// otherwise, it loads from CLI or uses the default config.
func Init(cfgArg *config.Config) error {
var err error

// Use provided config if given
if cfgArg != nil {
cfg = cfgArg
} else {
cfg, err = config.Load()
if err != nil {
nlog.Errorf("Error initializing config: %v. Using default config.", err)
defaultCfg := config.DefaultConfig
cfg = &defaultCfg
}
}

baseParams = api.BaseParams{URL: cfg.URL}
baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true})
return err
}

func Start() error {
msg := &apc.LsoMsg{}
objList, err := api.ListObjects(baseParams, cfg.SrcBck, msg, api.ListArgs{})
if err != nil {
return err
}

root := NewNode()
for _, en := range objList.Entries {
root.Insert(en.Name, en.Size)
}

return root.Archive()
}
Loading

0 comments on commit 252f952

Please sign in to comment.