Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue #69] TrainingClient for Kubeflow updated #71

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
"from kubeflow.training import constants\n",
"from kubeflow.training.utils import utils\n",
"from kubeflow.training import V1ReplicaSpec\n",
"from kubeflow.training import V1PyTorchJob\n",
"from kubeflow.training import V1PyTorchJobSpec\n",
"from kubeflow.training import PyTorchJobClient\n",
"from kubeflow.training import KubeflowOrgV1PyTorchJob\n",
"from kubeflow.training import KubeflowOrgV1PyTorchJobSpec\n",
"from kubeflow.training import TrainingClient\n",
"from kubeflow.training import V1RunPolicy\n",
"\n",
"import kfp\n",
Expand Down Expand Up @@ -248,11 +248,11 @@
"outputs": [],
"source": [
"# Define PyTorchJob custom resource manifest \n",
"pytorchjob = V1PyTorchJob(\n",
"pytorchjob = KubeflowOrgV1PyTorchJob(\n",
" api_version=\"kubeflow.org/v1\",\n",
" kind=\"PyTorchJob\",\n",
" metadata=V1ObjectMeta(name=pytorch_distributed_jobname,namespace=user_namespace),\n",
" spec=V1PyTorchJobSpec(\n",
" spec=KubeflowOrgV1PyTorchJobSpec(\n",
" run_policy=V1RunPolicy(clean_pod_policy=\"None\"),\n",
" pytorch_replica_specs={\"Master\": master,\n",
" \"Worker\": worker}\n",
Expand All @@ -269,7 +269,7 @@
},
"outputs": [],
"source": [
"pytorchjob_client = PyTorchJobClient()\n",
"pytorchjob_client = TrainingClient()\n",
"\n",
"try:\n",
" if(pytorchjob_client.get(pytorch_distributed_jobname, namespace=user_namespace)):\n",
Expand All @@ -287,7 +287,7 @@
"outputs": [],
"source": [
"# Creates and Submits PyTorchJob custom resource file to Kubernetes\n",
"pytorch_job_manifest=pytorchjob_client.create(pytorchjob)"
"pytorch_job_manifest=pytorchjob_client.create_pytorchjob(pytorchjob)"
]
},
{
Expand Down Expand Up @@ -347,7 +347,7 @@
"source": [
"# Check if the job succeeded\n",
"\n",
"pytorchjob_client.is_job_succeeded(pytorch_distributed_jobname, user_namespace)"
"pytorchjob_client.is_job_succeeded(pytorch_distributed_jobname, user_namespace, job_kind=constants.PYTORCHJOB_KIND)"
]
},
{
Expand Down Expand Up @@ -395,7 +395,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
"version": "3.10.8"
}
},
"nbformat": 4,
Expand Down
93 changes: 93 additions & 0 deletions labs/kubeflow/image-classify/eks/pytorch_dist_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from kubeflow.training import TrainingClient
from kubeflow.training.constants import constants

def save_master_worker_spec(pytorch_client: TrainingClient, pytorch_jobname: str) -> str:
"""save_master_worker_spec saves master and worker spec to a pipeline_yaml_specifications folder. """
import yaml
import os

if pytorch_client is None:
print("Please pass a valid pytorch client")
if (not len(pytorch_jobname) or pytorch_jobname is None):
print("Please pass a valid job name")

pytorchjob=pytorch_client.get_pytorchjob(pytorch_jobname)

master_spec=pytorchjob.spec.pytorch_replica_specs['Master']
worker_spec=pytorchjob.spec.pytorch_replica_specs['Worker']

if not os.path.exists("pipeline_yaml_specifications"):
os.makedirs("pipeline_yaml_specifications")

with open('pipeline_yaml_specifications/pipeline_master_spec.yml', 'w') as yaml_outfile_file:
yaml.dump(master_spec, yaml_outfile_file, default_flow_style=False)

with open('pipeline_yaml_specifications/pipeline_worker_spec.yml', 'w') as yaml_outfile_file:
yaml.dump(worker_spec, yaml_outfile_file, default_flow_style=False)

return "specs saved in ./pipeline_yaml_specifications folder"

def read_logs(pytorch_client: TrainingClient, jobname: str, namespace: str, log_type: str) -> None:
"""read_logs helps get logs from master and worker pods of distributed training using PyTorch Training Operators.
log_type: all, worker:all, master:all, worker:0, worker:1
"""
import time

print("Waiting for Pod condition to be Running")

pytorch_client.wait_for_job_conditions(
jobname,
expected_conditions=set(["Running"]),
job_kind=constants.PYTORCHJOB_KIND,
namespace=namespace
)

print("Master and Worker Pods are Running now")

print("**** PyTorchJob status **** ")
print(pytorch_client.get_job_conditions(jobname, namespace, job_kind=constants.PYTORCHJOB_KIND))
print("*************************** \n")

print("\n**** Pod names of the PyTorchJob **** ")
print(pytorch_client.get_job_pod_names(jobname, namespace))
print("*************************** \n")

if pytorch_client is None:
print("Please pass a valid pytorch client")
if (not len(jobname) or jobname is None):
print("Please pass a valid job name")
if (not len(namespace) or namespace is None):
print("Please pass a valid namespace")
if (not len(log_type) or (log_type is None) or (":" not in log_type and "all" not in log_type)):
print("Please pass a valid log_type name which is not empty and has ':'. e.g all, worker:all, master:all, worker:0, worker:1")

log_type_list=log_type.split(":")

if log_type_list[0] in ['worker','master']:
if log_type_list[1] == 'all':
pytorch_client.get_job_logs(
jobname,
namespace=namespace,
replica_type=log_type_list[0],
container=constants.PYTORCHJOB_CONTAINER,
is_master=False,
follow=True
)
else:
pytorch_client.get_job_logs(
jobname,
namespace=namespace,
replica_type=log_type_list[0],
replica_index=log_type_list[1],
container=constants.PYTORCHJOB_CONTAINER,
is_master=False,
follow=False
)
else:
pytorch_client.get_job_logs(
jobname,
namespace=namespace,
container=constants.PYTORCHJOB_CONTAINER,
is_master=False,
follow=False
)