-
Notifications
You must be signed in to change notification settings - Fork 189
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sdk/python: Add example object-file stress test
Signed-off-by: Aaron Wilson <[email protected]> Co-Authored-By: Ryan Koo <[email protected]>
- Loading branch information
Showing
3 changed files
with
396 additions
and
0 deletions.
There are no files selected for viewing
115 changes: 115 additions & 0 deletions
115
python/examples/sdk/object_file/obj-file-stress-test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
# | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
|
||
# This script tests AIStore's ObjectFile and its ability to resume object | ||
# reading with interruptions to the underlying object stream (e.g. | ||
# simulating intermittent AIStore K8s node failures). | ||
|
||
import logging | ||
import os | ||
import shutil | ||
import time | ||
from pathlib import Path | ||
|
||
import urllib3 | ||
from aistore.sdk.client import Client | ||
from kubernetes import client as k8s_client, config as k8s_config | ||
|
||
from utils import generate_data, start_pod_killer, stop_pod_killer, obj_file_read | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
KB = 1024 | ||
MB = 1024 * KB | ||
GB = 1024 * MB | ||
|
||
ENDPOINT = os.getenv("AIS_ENDPOINT", "http://localhost:51080") # AIStore endpoint to communicate with | ||
BUCKET_NAME = "objfile-test-data" # Name of the AIStore bucket where tar files are stored | ||
GENERATED_DIR = Path("gen-data") # Directory to generate data | ||
OUT_DIR = Path("output") # Directory to store outputs | ||
NUM_TARS = 1 # Number of tar archives to generate | ||
NUM_FILES = 20 # Number of files in each tar archive | ||
FILE_SIZE = 1 * GB # Size of each file in the tar archive | ||
NAMESPACE = "ais" # Kubernetes namespace for the pod killer | ||
POD_PREFIX = "ais" # Prefix to identify pods to be killed during the test | ||
POD_KILL_INTERVAL = (30, 60) # Interval (in seconds) between pod killings | ||
READ_SIZES = [-1, 16 * KB, 32 * KB, 64 * KB, 128 * KB] # Read sizes to test | ||
|
||
|
||
def validate(bucket, outputs): | ||
""" | ||
Validate the downloaded objects by comparing to local files. | ||
Args: | ||
bucket (Bucket): The AIStore bucket for validation. | ||
outputs (list): List of local file paths to validate. | ||
""" | ||
for output in outputs: | ||
logging.info(f"Validating object {output.parts[-1]}...") | ||
with open(output, 'rb') as result: | ||
result_bytes = result.read() | ||
ais_bytes = bucket.object(obj_name=output.parts[-1]).get().read_all() | ||
assert ais_bytes == result_bytes, f"Validation failed for {output.parts[-1]}" | ||
logging.info("All objects validated successfully.") | ||
shutil.rmtree(OUT_DIR, ignore_errors=True) | ||
|
||
|
||
def test_with_interruptions(bucket, read_size): | ||
""" | ||
Run the object file read test with pod-killing interruptions. | ||
Args: | ||
bucket (Bucket): The AIStore bucket to read from. | ||
read_size (int): Size of chunks to read in bytes. | ||
""" | ||
logging.info(f"Starting test with interruptions for read size: {read_size} bytes") | ||
|
||
# Start the pod killer process | ||
stop_event, pod_killer_process = start_pod_killer( | ||
namespace=NAMESPACE, k8s_client=v1, pod_prefix=POD_PREFIX, pod_kill_interval=POD_KILL_INTERVAL | ||
) | ||
|
||
# Call the imported obj_file_read function from utils.py | ||
result = obj_file_read(bucket, read_size=read_size, out_dir=OUT_DIR) | ||
|
||
# Stop the pod killer process | ||
stop_pod_killer(stop_event=stop_event, pod_killer_process=pod_killer_process) | ||
|
||
time.sleep(20) # Wait for any pods to settle | ||
|
||
# Validate the downloaded files | ||
validate(bucket, result) | ||
|
||
|
||
def main(): | ||
""" | ||
Main function to run the ObjectFile read tests with interruptions. | ||
""" | ||
# Initialize AIStore client (with retry on 400 and 404 errors) | ||
retry = urllib3.Retry(total=10, backoff_factor=0.5, status_forcelist=[400, 404]) | ||
client = Client(endpoint=ENDPOINT, retry=retry) | ||
|
||
# Initialize Kubernetes client | ||
k8s_config.load_kube_config() | ||
global v1 | ||
v1 = k8s_client.CoreV1Api() | ||
|
||
# Generate Data & Populate Bucket: | ||
bucket = client.bucket(BUCKET_NAME).create() | ||
generate_data(bucket=bucket, num_tars=NUM_TARS, num_files=NUM_FILES, file_size=FILE_SIZE, dest=GENERATED_DIR) | ||
|
||
try: | ||
# Test reading in various read sizes with pod interruptions | ||
for read_size in READ_SIZES: | ||
test_with_interruptions(bucket, read_size=read_size) | ||
finally: | ||
# Cleanup any leftover data | ||
shutil.rmtree(GENERATED_DIR, ignore_errors=True) | ||
shutil.rmtree(OUT_DIR, ignore_errors=True) | ||
bucket.delete(missing_ok=True) | ||
logging.info("Cleanup completed.") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
# | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
|
||
# This script benchmarks ObjectFile vs ObjectReader, comparing their performance | ||
# without interruptions. | ||
|
||
import logging | ||
import os | ||
import shutil | ||
import time | ||
from pathlib import Path | ||
import urllib3 | ||
from aistore.sdk.client import Client | ||
from utils import obj_file_read, generate_data | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
# Constants | ||
KB = 1024 | ||
MB = 1024 * KB | ||
GB = 1024 * MB | ||
|
||
ENDPOINT = os.getenv("AIS_ENDPOINT", "http://localhost:51080") # AIStore endpoint to communicate with | ||
BUCKET_NAME = "benchmark-bucket" # Name of the AIStore bucket | ||
GENERATED_DIR = Path("gen-data") # Directory to generate data | ||
OUT_DIR = Path("output") # Directory to store outputs | ||
NUM_TARS = 10 # Number of tar archives to generate | ||
NUM_FILES = 10 # Number of files in each tar archive | ||
FILE_SIZE = 1 * GB # Size of each file in the tar archive | ||
READ_SIZES = [-1, 16 * KB, 32 * KB, 64 * KB, 128 * KB] # Read sizes for benchmarking | ||
|
||
def test_obj_reader_read_all(bucket, out_dir: Path): | ||
""" | ||
Test reading data from the bucket using ObjectReader's read_all method. | ||
Args: | ||
bucket (Bucket): The AIStore bucket to read data from. | ||
out_dir (Path): Directory to store output files. | ||
""" | ||
outputs = [] | ||
start_time = time.time() | ||
out_dir.mkdir(parents=True, exist_ok=True) | ||
|
||
for entry in bucket.list_objects_iter(): | ||
logging.info(f"Starting to read object using ObjectReader: {entry.name}") | ||
outfile_name = out_dir.joinpath(entry.name) | ||
outputs.append(outfile_name) | ||
|
||
with open(outfile_name, 'wb') as outfile: | ||
content = entry.object.get().read_all() # ObjectReader read_all | ||
outfile.write(content) | ||
|
||
elapsed_time = time.time() - start_time | ||
logging.info(f"Completed ObjectReader read_all in {elapsed_time:.2f} seconds") | ||
return outputs | ||
|
||
def cleanup_output_directory(): | ||
if OUT_DIR.exists(): | ||
shutil.rmtree(OUT_DIR) | ||
OUT_DIR.mkdir(parents=True, exist_ok=True) | ||
|
||
def main(): | ||
""" | ||
Main function to benchmark ObjectFile vs ObjectReader. | ||
""" | ||
# Initialize AIStore client (with retry on 400 and 404 errors) | ||
retry = urllib3.Retry(total=10, backoff_factor=0.5, status_forcelist=[400, 404]) | ||
client = Client(endpoint=ENDPOINT, retry=retry) | ||
|
||
# Create and populate the bucket with generated data | ||
bucket = client.bucket(BUCKET_NAME).create() | ||
generate_data(bucket=bucket, num_tars=NUM_TARS, num_files=NUM_FILES, file_size=FILE_SIZE, dest=GENERATED_DIR) | ||
|
||
try: | ||
# ObjectFile: Test all read sizes, including read_all (-1) | ||
for read_size in READ_SIZES: | ||
logging.info(f"Running ObjectFile test with read_size = {read_size} bytes...") | ||
obj_file_read(bucket, read_size=read_size, out_dir=OUT_DIR) | ||
cleanup_output_directory() | ||
|
||
# ObjectReader: Read All | ||
logging.info("Running ObjectReader read_all test...") | ||
test_obj_reader_read_all(bucket, out_dir=OUT_DIR) | ||
cleanup_output_directory() | ||
|
||
finally: | ||
# Cleanup any leftover data | ||
shutil.rmtree(GENERATED_DIR, ignore_errors=True) | ||
shutil.rmtree(OUT_DIR, ignore_errors=True) | ||
bucket.delete(missing_ok=True) | ||
logging.info("Cleanup completed.") | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
# | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
|
||
import logging | ||
import multiprocessing | ||
import os | ||
import random | ||
import shutil | ||
import subprocess | ||
import tarfile | ||
import time | ||
|
||
from pathlib import Path | ||
from kubernetes import client as k8s_client | ||
from aistore.sdk.bucket import Bucket | ||
|
||
|
||
def generate_data(bucket: Bucket, num_tars: int, num_files: int, file_size: int, dest: Path) -> None: | ||
""" | ||
Generates random binary data, packages it into tar archives, and uploads it to | ||
the AIStore bucket. Each tar archive contains 'num_files' files, each of size | ||
'file_size'. The tar files are uploaded to simulate a realistic data storage scenario. | ||
Args: | ||
bucket (Bucket): The AIStore bucket where data will be uploaded. | ||
num_tars (int): Number of tar archives to create. | ||
num_files (int): Number of files in each tar archive. | ||
file_size (int): Size of each file in bytes. | ||
dest (Path): Path where generated tar files are temporarily stored. | ||
""" | ||
dest.mkdir(parents=True, exist_ok=True) | ||
|
||
for i in range(num_tars): | ||
tarfile_path = dest.joinpath(f"random_data_{i}.tar") | ||
|
||
# Create a tarfile and add generated binary files to it | ||
with tarfile.open(tarfile_path, "w") as tar: | ||
for j in range(num_files): | ||
file_name = f"file_{i}_{j}" | ||
temp_file_path = dest.joinpath(file_name) | ||
|
||
# Write random binary data to each file | ||
with temp_file_path.open("wb") as file: | ||
file.write(os.urandom(file_size)) | ||
|
||
tar.add(temp_file_path, arcname=file_name) | ||
temp_file_path.unlink() | ||
logging.info(f"Added {file_name} to {tarfile_path.name}") | ||
|
||
# Upload the tar file to the AIStore bucket | ||
bucket.object(tarfile_path.name).put_file(tarfile_path) | ||
logging.info(f"Uploaded {tarfile_path.name} to {bucket.provider}://{bucket.name}") | ||
tarfile_path.unlink() | ||
|
||
shutil.rmtree(dest, ignore_errors=True) | ||
|
||
|
||
def start_pod_killer(namespace: str, k8s_client: k8s_client.CoreV1Api, pod_prefix: str, pod_kill_interval) -> tuple: | ||
""" | ||
Starts a separate process to kill random pods that match the given prefix at specified intervals. | ||
Args: | ||
namespace (str): Kubernetes namespace where target pods reside. | ||
k8s_client (k8s_client.CoreV1Api): Kubernetes client for interacting with pods. | ||
pod_prefix (str): Prefix of the pod name to filter the pods. | ||
pod_kill_interval (tuple): Interval range (in seconds) between pod killings. | ||
Returns: | ||
stop_event, pod_killer_process: Event and process for managing pod killer thread. | ||
""" | ||
stop_event = multiprocessing.Event() | ||
pod_killer_process = multiprocessing.Process(target=pod_killer, args=(stop_event, namespace, k8s_client, pod_prefix, pod_kill_interval)) | ||
pod_killer_process.start() | ||
return stop_event, pod_killer_process | ||
|
||
|
||
def stop_pod_killer(stop_event: multiprocessing.Event, pod_killer_process: multiprocessing.Process): | ||
""" | ||
Stops the pod killer process by setting the event and joining the process. | ||
Args: | ||
stop_event (multiprocessing.Event): Event to signal the process to stop. | ||
pod_killer_process (multiprocessing.Process): The pod killer process to stop. | ||
""" | ||
stop_event.set() | ||
pod_killer_process.join() | ||
|
||
|
||
def pod_killer(stop_event: multiprocessing.Event, namespace: str, k8s_client: k8s_client.CoreV1Api, pod_prefix: str, pod_kill_interval: tuple, initial_delay: int = 5): | ||
""" | ||
Simulates pod failure by killing a random pod with the given prefix at random intervals. | ||
Args: | ||
stop_event (multiprocessing.Event): Event to signal when to stop killing pods. | ||
namespace (str): Kubernetes namespace where pods are located. | ||
k8s_client (k8s_client.CoreV1Api): Kubernetes client to interact with the cluster. | ||
pod_prefix (str): Prefix to match the pod names. | ||
pod_kill_interval (tuple): Interval range (in seconds) between pod killings. | ||
initial_delay (int): Initial delay before the first pod kill (in seconds). | ||
""" | ||
logging.info("Pod killer process started.") | ||
|
||
# Wait for the initial small delay before the first pod kill | ||
stop_event.wait(initial_delay) | ||
|
||
while not stop_event.is_set(): | ||
pod_name = random_pod_name_with_prefix(k8s_client, namespace, pod_prefix) | ||
if pod_name: | ||
kill_pod(namespace, pod_name) | ||
else: | ||
logging.warning(f"No pods found with prefix {pod_prefix}") | ||
|
||
# Wait for the next pod kill based on the random interval | ||
stop_event.wait(random.randint(*pod_kill_interval)) # Wait for random time | ||
|
||
logging.info("Pod killer process stopping as stop_event is set.") | ||
|
||
|
||
def random_pod_name_with_prefix(k8s_client: k8s_client.CoreV1Api, namespace: str, pod_prefix: str) -> str: | ||
""" | ||
Selects a random pod that matches the given prefix from the Kubernetes namespace. | ||
Args: | ||
k8s_client (k8s_client.CoreV1Api): Kubernetes client instance. | ||
namespace (str): Kubernetes namespace where target pods are located. | ||
pod_prefix (str): Prefix to match the pod names. | ||
Returns: | ||
str: Name of the randomly selected pod, or None if no matching pods are found. | ||
""" | ||
pods = k8s_client.list_namespaced_pod(namespace=namespace).items | ||
matching_pods = [pod for pod in pods if pod.metadata.name.startswith(pod_prefix)] | ||
|
||
if matching_pods: | ||
return random.choice(matching_pods).metadata.name | ||
return None | ||
|
||
|
||
# TODO: Replace w/ Native Kubernetes API | ||
def kill_pod(namespace: str, pod_name: str): | ||
""" | ||
Deletes the specified pod using a Kubernetes command. | ||
Args: | ||
namespace (str): Kubernetes namespace where the pod is located. | ||
pod_name (str): Name of the pod to delete. | ||
""" | ||
command = f"kubectl delete pod {pod_name} -n {namespace} --grace-period=0" | ||
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
logging.info(f"Pod {pod_name} deleted successfully. Output: {result.stdout.decode().strip()}") | ||
|
||
|
||
def obj_file_read(bucket, read_size, out_dir: Path, max_resume: int = 10): | ||
""" | ||
Test reading data from the bucket using ObjectFile with specified chunk size. | ||
Args: | ||
bucket (Bucket): AIStore bucket to read data from. | ||
read_size (int): The size of data chunks to read in bytes. | ||
out_dir (Path): Directory to store output files. | ||
max_resume (int, optional): Maximum number of resume attempts for ObjectFile. | ||
""" | ||
outputs = [] | ||
resume_total = 0 | ||
out_dir.mkdir(parents=True, exist_ok=True) | ||
start_time = time.time() | ||
|
||
for entry in bucket.list_objects_iter(): | ||
logging.info(f"Starting to read object: {entry.name}") | ||
outfile_name = out_dir.joinpath(entry.name) | ||
outputs.append(outfile_name) | ||
|
||
with open(outfile_name, 'wb') as outfile: | ||
with entry.object.as_file(max_resume=max_resume) as obj_file: | ||
while True: | ||
content = obj_file.read(size=read_size) | ||
if not content: | ||
logging.info(f"Finished reading object: {entry.name}") | ||
break | ||
outfile.write(content) | ||
resume_total += obj_file._resume_total | ||
|
||
elapsed_time = time.time() - start_time | ||
logging.info(f"Completed object file read for read size {read_size} with {resume_total} total resumes in {elapsed_time:.2f} seconds") | ||
return outputs |