Skip to content

Commit

Permalink
pyaisloader: Enabled ETL option for pyaisloader benchmarks
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jun 28, 2024
1 parent 779a7b9 commit 0732eb9
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 37 deletions.
30 changes: 27 additions & 3 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,26 @@ test:long:aisloader:
- ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT
- sleep 10 # make sure that cluster properly starts
- FLAGS="--duration=5m" make test-aisloader
- cd ./python
- make PYAISLOADER_TEST_TYPE=short test-pyaisloader
- make PYAISLOADER_TEST_TYPE=long test-pyaisloader
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"'
when: manual
allow_failure: true

test:long:pyaisloader:
stage: test-long
tags:
- ais
timeout: 15m
variables:
AIS_ENDPOINT: "http://localhost:8080"
script:
- ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT
- sleep 10 # make sure that cluster properly starts
- cd ./python; make PYAISLOADER_TEST_TYPE=long test-pyaisloader
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"'
when: manual
allow_failure: true

#
# Kubernetes stages
Expand Down Expand Up @@ -494,6 +506,18 @@ test:long:k8s:aisloader:
- make aisloader
- BUCKET="ais://test" FLAGS="--duration=2m --etl" make test-aisloader

test:long:k8s:pyaisloader:
extends: .test_k8s_long_template
timeout: 15m
rules:
- if: '$CI_MERGE_REQUEST_LABELS =~ /.*k8s-ci.*/'
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"'
when: manual
allow_failure: true
script:
- sleep 10 # Give some time for the cluster to stabilize.
- cd ./python; make PYAISLOADER_TEST_TYPE=etl test-pyaisloader

test:long:k8s:all:
extends: .test_k8s_long_template
timeout: 5h
Expand Down
14 changes: 7 additions & 7 deletions docs/etl.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,9 @@ This section describes how to interact with ETLs via RESTful API.
| List ETLs | Lists all running ETLs. | GET /v1/etl | `curl -L -X GET 'http://G/v1/etl'` |
| View ETLs Init spec/code | View code/spec of ETL by `ETL_NAME` | GET /v1/etl/ETL_NAME | `curl -L -X GET 'http://G/v1/etl/ETL_NAME'` |
| Transform object | Transforms an object based on ETL with `ETL_NAME`. | GET /v1/objects/<bucket>/<objname>?etl_name=ETL_NAME | `curl -L -X GET 'http://G/v1/objects/shards/shard01.tar?etl_name=ETL_NAME' -o transformed_shard01.tar` |
| Transform bucket | Transforms all objects in a bucket and puts them to destination bucket. | POST {"action": "etl-bck"} /v1/buckets/from-name | `curl -i -X POST -H 'Content-Type: application/json' -d '{"action": "etl-bck", "name": "to-name", "value":{"ext":"destext", "prefix":"prefix", "suffix": "suffix"}}' 'http://G/v1/buckets/from-name'` |
| Dry run transform bucket | Accumulates in xaction stats how many objects and bytes would be created, without actually doing it. | POST {"action": "etl-bck"} /v1/buckets/from-name | `curl -i -X POST -H 'Content-Type: application/json' -d '{"action": "etl-bck", "name": "to-name", "value":{"ext":"destext", "dry_run": true}}' 'http://G/v1/buckets/from-name'` |
| Transform bucket | Transforms all objects in a bucket and puts them to destination bucket. | POST {"action": "etl-bck"} /v1/buckets/SRC_BUCKET | `curl -i -X POST -H 'Content-Type: application/json' -d '{"action": "etl-bck", "name": "to-name", "value":{"id": "ETL_NAME", "ext":{"SRC_EXT": "DEST_EXT"}, "prefix":"PREFIX_FILTER", "prepend":"PREPEND_NAME"}}' 'http://G/v1/buckets/SRC_BUCKET?bck_to=PROVIDER%2FNAMESPACE%2FDEST_BUCKET%2F'` |
| Transform and synchronize bucket | Synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source. | POST {"action": "etl-bck"} /v1/buckets/SRC_BUCKET | `curl -i -X POST -H 'Content-Type: application/json' -d '{"action": "etl-bck", "name": "to-name", "value":{"id": "ETL_NAME", "synchronize": true}}' 'http://G/v1/buckets/SRC_BUCKET?bck_to=PROVIDER%2FNAMESPACE%2FDEST_BUCKET%2F'` |
| Dry run transform bucket | Accumulates in xaction stats how many objects and bytes would be created, without actually doing it. | POST {"action": "etl-bck"} /v1/buckets/SRC_BUCKET | `curl -i -X POST -H 'Content-Type: application/json' -d '{"action": "etl-bck", "name": "to-name", "value":{"id": "ETL_NAME", "dry_run": true}}' 'http://G/v1/buckets/SRC_BUCKET?bck_to=PROVIDER%2FNAMESPACE%2FDEST_BUCKET%2F'` |
| Stop ETL | Stops ETL with given `ETL_NAME`. | DELETE /v1/etl/ETL_NAME/stop | `curl -X POST 'http://G/v1/etl/ETL_NAME/stop'` |
| Delete ETL | Delete ETL spec/code with given `ETL_NAME` | DELETE /v1/etl/<ETL_NAME> | `curl -X DELETE 'http://G/v1/etl/ETL_NAME' |

Expand All @@ -378,11 +379,10 @@ $ ais etl init spec --name=etl-md5 --from-file=spec.yaml --comm-type hpull
```

Below are specifications for a valid `ETL_NAME`:
1. Starts with an alphabet 'A' to 'Z' or 'a' to 'z'.
2. Can contain alphabets, numbers, underscore ('_'), or hyphen ('-').
3. Should have a length greater than 5 and less than 21.
4. Shouldn't contain special characters, except for underscore and hyphen.

1. Must start and end with a lowercase alphabet ('a' to 'z') or a number ('1' to '9').
2. Can contain lowercase alphabets, numbers, or hyphen ('-').
3. Should have a length greater than 5 and less than 33.
4. Shouldn't contain special characters except for hyphen (no capitals or underscore).

## References

Expand Down
30 changes: 30 additions & 0 deletions python/aistore/sdk/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
#
import sys
import re

import base64
from typing import Callable, List
Expand Down Expand Up @@ -56,6 +57,7 @@ class Etl:
def __init__(self, client: "Client", name: str):
self._client = client
self._name = name
self.validate_etl_name(name)

@property
def name(self) -> str:
Expand Down Expand Up @@ -219,3 +221,31 @@ def _encode_dependencies(dependencies: List[str]):
dependencies.append("cloudpickle==2.2.0")
deps = "\n".join(dependencies).encode(UTF_ENCODING)
return base64.b64encode(deps).decode(UTF_ENCODING)

@staticmethod
def validate_etl_name(name: str):
"""
Validate the ETL name based on specific criteria.
Args:
name (str): The name of the ETL to validate.
Raises:
ValueError: If the name is too short (less than 6 characters),
too long (more than 32 characters),
or contains invalid characters (anything other than lowercase letters, digits, or hyphens).
"""
prefix = f"ETL name '{name}' "
short_name_etl = 6
long_name_etl = 32

length = len(name)
if length < short_name_etl:
raise ValueError(f"{prefix}is too short")
if length > long_name_etl:
raise ValueError(f"{prefix}is too long")

if not re.fullmatch(r"[a-z0-9]([-a-z0-9]*[a-z0-9])", name):
raise ValueError(
f"{prefix}is invalid: must start/end with a lowercase letter/number, and can only contain [a-z0-9-]"
)
6 changes: 6 additions & 0 deletions python/pyaisloader/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ short_put: ## Run a short PUT benchmark
short_get: ## Run a short GET benchmark
pyaisloader GET --bucket ais://abc --duration 30s --workers 16 --totalsize 1GB --minsize 5KB --maxsize 10KB --cleanup

short_get_etl: ## Run a short GET benchmark where all GETs undergo the ECHO ETL transformation
pyaisloader GET --bucket ais://abc --duration 30s --workers 16 --totalsize 1GB --minsize 5KB --maxsize 10KB --etl ECHO --cleanup

short_mixed: ## Run a short MIXED benchmark
pyaisloader MIXED --bucket ais://abc --duration 30s --workers 16 --minsize 5KB --maxsize 10KB --putpct 50 --cleanup

short_mixed_etl: ## Run a short MIXED benchmark where all GETs undergo the ECHO ETL transformation
pyaisloader MIXED --bucket ais://abc --duration 30s --workers 16 --minsize 5KB --maxsize 10KB --putpct 50 --etl ECHO --cleanup

short_list: ## Run a short LIST benchmark
pyaisloader LIST --bucket ais://abc --cleanup true --objects 50000 --workers 16

Expand Down
2 changes: 2 additions & 0 deletions python/pyaisloader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Runs a time-based benchmark with 100% GET workload.
| --maxsize | -max | Maximum size of objects to be PUT in bucket (if bucket is smaller than total size) | No | N/A |
| --duration | -d | Duration for which benchmark should be run | Yes | N/A |
| --workers | -w | Number of workers | Yes | N/A |
| --etl | -e | Whether objects from aisloader GETs should undergoes the specified ETL transformation | No | N/A |

#### Type: MIXED

Expand All @@ -71,6 +72,7 @@ Runs a time-based benchmark with a mixed load of GETs and PUTs (based on `putpct
| --putpct | -p | Percentage for PUT operations in MIXED benchmark | Yes | N/A |
| --duration | -d | Duration for which benchmark should be run | Yes | N/A |
| --workers | -w | Number of workers | Yes | N/A |
| --etl | -e | Whether objects from aisloader GETs should undergoes the specified ETL transformation | No | N/A |

#### Type: LIST

Expand Down
23 changes: 12 additions & 11 deletions python/pyaisloader/ci-test.sh
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
#!/bin/bash

if [ -z "$1" ]
then
echo "No argument supplied. Please provide a test type: 'short' or 'long'."
if [ -z "$1" ]; then
echo "No argument supplied. Please provide a test type: 'short', 'long', or 'etl'."
exit 1
fi

# install aistore python sdk from source
pip install -e .
# Install aistore python SDK from source
pip install -e . --quiet

cd ./pyaisloader
cd ./pyaisloader || exit
make install

export AIS_ENDPOINT="http://localhost:8080"

if [ "$1" == "short" ]
then
if [ "$1" == "short" ]; then
yes "y" | head -n 2 | pyaisloader p -b ais://testpyaisloader -d 15s -min 1mb -max 10mb -s 1gb -w 16
yes "y" | head -n 2 | pyaisloader g -b ais://testpyaisloader -d 15s -min 1mb -max 10mb -s 1gb -w 16
yes "y" | head -n 2 | pyaisloader m -b ais://testpyaisloader -d 15s -min 1mb -max 10mb -w 16 -c
yes "y" | head -n 2 | pyaisloader ais_dataset -b ais://testpyaisloader -d 15s -min 1mb -max 10mb -w 16 -c
yes "y" | head -n 2 | pyaisloader ais_iter_dataset -b ais://testpyaisloader -i 1 -d 15s -min 1mb -max 10mb -w 16 -c
elif [ "$1" == "long" ]
then
elif [ "$1" == "long" ]; then
yes "y" | head -n 2 | pyaisloader p -b ais://testpyaisloader -d 3m -min 1mb -max 10mb -s 10gb -w 16
yes "y" | head -n 2 | pyaisloader g -b ais://testpyaisloader -d 3m -min 1mb -max 10mb -s 10gb -w 16
yes "y" | head -n 2 | pyaisloader m -b ais://testpyaisloader -d 3m -min 1mb -max 10mb -w 16 -c
elif [ "$1" == "etl" ]; then
yes "y" | head -n 2 | pyaisloader m -b ais://testpyaisloader -d 1m -min 1mb -max 10mb -w 16 --etl ECHO
yes "y" | head -n 2 | pyaisloader m -b ais://testpyaisloader -d 1m -min 1mb -max 10mb -w 16 --etl MD5 -c
else
echo "Invalid test type: $1. Please provide a valid test type: 'short' or 'long'."
echo "Invalid test type: $1. Please provide a valid test type: 'short', 'long', or 'etl'."
exit 1
fi
73 changes: 60 additions & 13 deletions python/pyaisloader/pyaisloader/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, bucket, workers, cleanup):
self.workers = workers
self.cleanup = cleanup
self.objs_created = []
self.etls_created = []
# Track for intelligent clean-up (deletes bucket if bucket was created by benchmark, otherwise only deletes objects in bucket created by benchmark)
self.bck_created = False

Expand Down Expand Up @@ -182,6 +183,7 @@ def __init__(
maxsize=None,
duration=None,
totalsize=None,
etl=None,
*args,
**kwargs,
):
Expand All @@ -191,6 +193,7 @@ def __init__(
self.totalsize = totalsize
self.minsize = minsize
self.maxsize = maxsize
self.etl = etl

def run(self):
if self.put_pct == 100:
Expand All @@ -204,11 +207,17 @@ def run(self):

def __run_put(self):
totalsize = None if self.totalsize is None else (self.totalsize // self.workers)
print_in_progress("Performing PUT benchmark")
print_in_progress(
"Performing PUT benchmark"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
)
results = multiworker_deploy(
self, self.put_benchmark, (self.duration, totalsize)
)
print_success("Completed PUT benchmark")
print_success(
"Completed PUT benchmark"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
)
result = []
for worker_result, worker_objs_created in results:
result.append(worker_result)
Expand All @@ -217,27 +226,51 @@ def __run_put(self):
if self.cleanup:
self.clean_up()
print_sep()
print_results(result, title="Benchmark Results (100% PUT):")
print_results(
result,
title=(
"Benchmark Results (100% PUT)"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
),
)

def __run_get(self):
if bucket_obj_count(self.bucket) == 0:
add_one_object(self)
self.get_objs_queue = self.bucket.list_all_objects()
print_in_progress("Performing GET benchmark")
print_in_progress(
"Performing GET benchmark"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
)
result = multiworker_deploy(self, self.get_benchmark, (self.duration,))
print_success("Completed GET benchmark")
print_success(
"Completed GET benchmark"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
)
result = combine_results(result, self.workers)
if self.cleanup:
self.clean_up()
print_sep()
print_results(result, title="Benchmark Results (100% GET):")
print_results(
result,
title=(
"Benchmark Results (100% GET)"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
),
)

def __run_mixed(self):
if bucket_obj_count(self.bucket) == 0:
add_one_object(self)
print_in_progress("Performing MIXED benchmark")
print_in_progress(
"Performing MIXED benchmark"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
)
result = multiworker_deploy(self, self.mixed_benchmark, (self.duration,))
print_success("Completed MIXED benchmark")
print_success(
"Completed MIXED benchmark"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
)
workers_objs_created = [
obj for worker_result in result for obj in worker_result[2]
]
Expand All @@ -249,8 +282,20 @@ def __run_mixed(self):
if self.cleanup:
self.clean_up()
print_sep()
print_results(result_put, title="Benchmark Results for PUT operations:")
print_results(result_get, title="Benchmark Results for GET operations:")
print_results(
result_put,
title=(
"Benchmark Results for PUT operations"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
),
)
print_results(
result_get,
title=(
"Benchmark Results for GET operations"
+ (f" with ETL {self.etl.spec_type}" if self.etl else "")
),
)

def _run_prepopulate(self):
print_in_progress("Starting Pre-Population")
Expand Down Expand Up @@ -327,11 +372,13 @@ def get_benchmark(self, duration): # Done

def __get_benchmark_h(self, stats, objs): # Done
op_start = time.time()
content = self.bucket.object(random.choice(objs).name).get()
content.read_all()
content = self.bucket.object(random.choice(objs).name).get(
etl_name=(self.etl.name if self.etl else None)
)
size = len(content.read_all())
op_end = time.time()
latency = op_end - op_start
stats.update(content.attributes.size, latency)
stats.update(size, latency)

def mixed_benchmark(self, duration): # Done
prefix = generate_random_str() # Each worker with unique prefix
Expand Down
Loading

0 comments on commit 0732eb9

Please sign in to comment.