Skip to content

Commit

Permalink
dsort: supported algorithms (alpha, shuffle, content key) in `DsortFr…
Browse files Browse the repository at this point in the history
…amework`

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jun 20, 2024
1 parent 8e961d7 commit 308cd2c
Show file tree
Hide file tree
Showing 5 changed files with 515 additions and 90 deletions.
92 changes: 92 additions & 0 deletions python/aistore/sdk/dsort/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Using the DsortFramework in Python SDK

The `DsortFramework` class in the Python SDK provides a way to define and manage dSort jobs. Below is an instruction on how to construct a `DsortFramework` instance and start a dSort job directly from the Python SDK.

## Key Fields

- **extension**: The extension of input and output shards (either .tar, .tgz or .zip).
- **input_format.template**: Name template for input shard.
- **output_format**: Name template for output shard.
- **input_bck.name**: Bucket name where shards objects are stored.
- **input_bck.provider**: Bucket backend provider.
- **output_bck.name**: Bucket name where new output shards will be saved.
- **output_bck.provider**: Bucket backend provider.
- **description**: Description of the dSort job.
- **output_shard_size**: Size (in bytes) of the output shard, can be in the form of raw numbers (e.g., 10240) or suffixed (e.g., 10KB).
- **algorithm.kind**: Determines which sorting algorithm dSort job uses. Available options are: "alphanumeric", "shuffle", "content".
- **algorithm.decreasing**: Determines if the algorithm should sort the records in decreasing or increasing order, used for kind=alphanumeric or kind=content.
- **algorithm.seed**: Seed provided to the random generator, used when kind=shuffle.
- **algorithm.extension**: Content of the file with the provided extension will be used as the sorting key, used when kind=content.
- **algorithm.content_key_type**: Content key type; may have one of the following values: "int", "float", or "string"; used exclusively with kind=content sorting.

## Example Usage

1. **Creating a DsortFramework from a JSON/YAML Specification File:**

```python
from aistore.sdk.dsort import DsortFramework

# Create a DsortFramework instance from a specification file
dsort_framework = DsortFramework.from_file("path/to/spec.json")

# Start the dSort job
dsort_job_id = client.dsort().start(dsort_framework)
```

2. **Creating a DsortFramework Directly:**

```python
from aistore.sdk.dsort import DsortFramework, DsortShardsGroup, DsortAlgorithm
from aistore.sdk.types import BucketModel

# Define the input and output shard configurations
input_shards = DsortShardsGroup(
bck=BucketModel(name="input_bucket", provider="aws"),
role="input",
format={"template": "input_template"},
extension=".tar"
)

output_shards = DsortShardsGroup(
bck=BucketModel(name="output_bucket", provider="aws"),
role="output",
format="output_template",
extension=".tar"
)

# Define the algorithm configuration
algorithm = DsortAlgorithm(
kind="content",
decreasing=False,
seed="",
extension=".key",
content_key_type="int"
)

# Create a DsortFramework instance
dsort_framework = DsortFramework(
input_shards=input_shards,
output_shards=output_shards,
algorithm=algorithm,
description="Dsort Job Description",
output_shard_size="10MB"
)

# Start the dSort job
dsort_job_id = client.dsort().start(dsort_framework)
```

## Important Notes

- **Input and Output Formats:**
- The `input_format` must be a dictionary containing the key "template".
- The `output_format` must be a string.

- **Algorithm Kind:**
- For the `content` kind, `extension` and `content_key_type` fields are required.
- For other kinds, these fields should not be provided.

- **Error Handling:**
- Validation errors are raised if the required fields are missing or incorrectly formatted.

For more detailed information, refer to the original [dSort documentation](https://github.com/NVIDIA/aistore/blob/main/docs/dsort.md). Also see the [dSort CLI](https://github.com/NVIDIA/aistore/blob/main/docs/cli/dsort.md).
2 changes: 1 addition & 1 deletion python/aistore/sdk/dsort/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from aistore.sdk.dsort.core import Dsort
from aistore.sdk.dsort.framework import DsortFramework, DsortShardsGroup
from aistore.sdk.dsort.framework import DsortFramework, DsortShardsGroup, DsortAlgorithm
73 changes: 67 additions & 6 deletions python/aistore/sdk/dsort/framework.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal, Dict, Union
from typing import Literal, Dict, Union, Optional
from pathlib import Path

import json
Expand Down Expand Up @@ -26,40 +26,99 @@ def check_key(cls, values):
Validates that if the role contains required key fields
"""

if values.get("role") == "input":
format_str = values.get("format")
if "template" not in format_str:
role = values.get("role")
format_value = values.get("format")

if role == "input":
if not isinstance(format_value, dict) or "template" not in format_value:
raise ValueError(
'format of input shards must contain the key "template"'
'For input shards, format must be a dictionary containing the key "template".'
)
elif role == "output":
if not isinstance(format_value, str):
raise ValueError("For output shards, format must be a string.")

return values

def as_dict(self):
"""
Converts the DsortShardsGroup instance to a dictionary representation
"""

return {
f"{self.role}_bck": self.bck.as_dict(),
f"{self.role}_format": self.format,
f"{self.role}_extension": self.extension,
}


class DsortAlgorithm(BaseModel):
"""
Represents the algorithm used in a Dsort job
"""

kind: Literal["alphanumeric", "shuffle", "content"] = "alphanumeric"
decreasing: bool = False
seed: Optional[str] = ""
extension: Optional[str] = None
content_key_type: Optional[Literal["int", "float", "string"]] = None

# pylint: disable=no-self-argument
@root_validator
def validate_content_fields(cls, values):
"""
Validates required key fields
"""
kind = values.get("kind")
extension = values.get("extension")
content_key_type = values.get("content_key_type")

if kind == "content":
if not extension:
raise ValueError(
'For kind="content", the "extension" field is required.'
)
if not content_key_type:
raise ValueError(
'For kind="content", the "content_key_type" field is required.'
)
else:
if extension or content_key_type:
raise ValueError(
'The "extension" and "content_key_type" fields are only allowed for kind="content".'
)

return values

def as_dict(self):
"""
Converts the DsortAlgorithm instance to a dictionary representation
"""
dict_rep = {"kind": self.kind, "decreasing": self.decreasing, "seed": self.seed}
if self.kind == "content":
dict_rep["extension"] = self.extension
dict_rep["content_key_type"] = self.content_key_type
return dict_rep


class DsortFramework:
"""
Represents the framework for a dSort job, including input and output shard configurations.
"""

# pylint: disable=too-many-arguments
def __init__(
self,
input_shards: DsortShardsGroup,
output_shards: DsortShardsGroup,
algorithm: DsortAlgorithm,
description=None,
output_shard_size=None,
) -> None:
self.input_shards = input_shards
self.output_shards = output_shards
self.output_shard_size = output_shard_size
self.algorithm = algorithm
self.description = description

@classmethod
Expand Down Expand Up @@ -95,9 +154,10 @@ def from_file(cls, spec):
output_shards=DsortShardsGroup(
bck=BucketModel(**spec_data.get("output_bck")),
role="output",
format=spec_data.get("output_format", {}),
format=spec_data.get("output_format", ""),
extension=spec_data.get("output_extension", ""),
),
algorithm=DsortAlgorithm(**spec_data.get("algorithm", {})),
output_shard_size=spec_data.get("output_shard_size", ""),
)

Expand All @@ -109,6 +169,7 @@ def to_spec(self):
Dictionary representation of dSort specification
"""
spec = {**self.input_shards.as_dict(), **self.output_shards.as_dict()}
spec["algorithm"] = self.algorithm.as_dict()
spec["description"] = self.description if self.description else ""
if self.output_shard_size:
spec["output_shard_size"] = self.output_shard_size
Expand Down
Loading

0 comments on commit 308cd2c

Please sign in to comment.