Skip to content

Commit

Permalink
import studyjob client
Browse files Browse the repository at this point in the history
  • Loading branch information
hougangliu committed Feb 19, 2019
1 parent 2afee3c commit d4e49fc
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 149 deletions.
2 changes: 1 addition & 1 deletion components/kubeflow/katib-launcher/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN apt-get update -y && \
pip install pyyaml==3.12 six==1.11.0 requests==2.18.4 grpcio gcloud google-api-python-client protobuf kubernetes && \
wget https://github.com/kubeflow/katib/archive/master.zip && unzip master.zip

ENV PYTHONPATH $PYTHONPATH:/katib-master/pkg/api/python
ENV PYTHONPATH $PYTHONPATH:/katib-master/pkg/api/python:/katib-master/py

ADD build /ml

Expand Down
152 changes: 4 additions & 148 deletions components/kubeflow/katib-launcher/src/launch_study_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,159 +18,15 @@
import logging
import requests
import subprocess
import six
import time
import yaml
import grpc

import api_pb2
import api_pb2_grpc

from kubernetes import client as k8s_client
from kubernetes.client import rest
from kubernetes import config

STUDY_JOB_GROUP = "kubeflow.org"
STUDY_JOB_PLURAL = "studyjobs"
STUDY_JOB_KIND = "StudyJob"

TIMEOUT = 120

def wait_for_condition(client,
namespace,
name,
expected_condition,
version="v1alpha1",
timeout=datetime.timedelta(minutes=10),
polling_interval=datetime.timedelta(seconds=30),
status_callback=None):
"""Waits until any of the specified conditions occur.
Args:
client: K8s api client.
namespace: namespace for the job.
name: Name of the job.
expected_condition: A list of conditions. Function waits until any of the
supplied conditions is reached.
timeout: How long to wait for the job.
polling_interval: How often to poll for the status of the job.
status_callback: (Optional): Callable. If supplied this callable is
invoked after we poll the job. Callable takes a single argument which
is the job.
"""
crd_api = k8s_client.CustomObjectsApi(client)
end_time = datetime.datetime.now() + timeout
while True:
# By setting async_req=True ApiClient returns multiprocessing.pool.AsyncResult
# If we don't set async_req=True then it could potentially block forever.
thread = crd_api.get_namespaced_custom_object(
STUDY_JOB_GROUP, version, namespace, STUDY_JOB_PLURAL, name, async_req=True)

# Try to get the result but timeout.
results = None
try:
results = thread.get(TIMEOUT)
except multiprocessing.TimeoutError:
logging.error("Timeout trying to get TFJob.")
except Exception as e:
logging.error("There was a problem waiting for Job %s.%s; Exception; %s",
name, name, e)
raise

if results:
if status_callback:
status_callback(results)

# If we poll the CRD quick enough status won't have been set yet.
condition = results.get("status", {}).get("condition")
# might have a value of None in status.
if condition in expected_condition:
return results

if datetime.datetime.now() + polling_interval > end_time:
raise util.JobTimeoutError(
"Timeout waiting for job {0} in namespace {1} to enter one of the "
"conditions {2}.".format(name, namespace, expected_condition), results)

time.sleep(polling_interval.seconds)

def create_study_job(client, spec, version="v1alpha1"):
"""Create a studyJob.
Args:
client: A K8s api client.
spec: The spec for the job.
"""
crd_api = k8s_client.CustomObjectsApi(client)
try:
# Create a Resource
namespace = spec["metadata"].get("namespace", "default")
thread = crd_api.create_namespaced_custom_object(
STUDY_JOB_GROUP, version, namespace, STUDY_JOB_PLURAL, spec, async_req=True)
api_response = thread.get(TIMEOUT)
logging.info("Created job %s", api_response["metadata"]["name"])
return api_response
except rest.ApiException as e:
message = ""
if e.message:
message = e.message
if e.body:
try:
body = json.loads(e.body)
except ValueError:
# There was a problem parsing the body of the response as json.
logging.error(
("Exception when calling DefaultApi->"
"apis_fqdn_v1_namespaces_namespace_resource_post. body: %s"), e.body)
raise
message = body.get("message")

logging.error(("Exception when calling DefaultApi->"
"apis_fqdn_v1_namespaces_namespace_resource_post: %s"),
message)
raise e

def delete_study_job(client, name, namespace, version="v1alpha1"):
crd_api = k8s_client.CustomObjectsApi(client)
try:
body = {
# Set garbage collection so that job won't be deleted until all
# owned references are deleted.
"propagationPolicy": "Foreground",
}
logging.info("Deleting job %s.%s", namespace, name)
thread = crd_api.delete_namespaced_custom_object(
STUDY_JOB_GROUP,
version,
namespace,
STUDY_JOB_PLURAL,
name,
body,
async_req=True)
api_response = thread.get(TIMEOUT)
logging.info("Deleting job %s.%s returned: %s", namespace, name,
api_response)
return api_response
except rest.ApiException as e:
message = ""
if e.message:
message = e.message
if e.body:
try:
body = json.loads(e.body)
except ValueError:
# There was a problem parsing the body of the response as json.
logging.error(
("Exception when calling DefaultApi->"
"apis_fqdn_v1_namespaces_namespace_resource_post. body: %s"), e.body)
raise
message = body.get("message")

logging.error(("Exception when calling DefaultApi->"
"apis_fqdn_v1_namespaces_namespace_resource_post: %s"),
message)
raise e

import study_job_client

def yamlOrJsonStr(str):
if str == "" or str == None:
Expand Down Expand Up @@ -279,12 +135,12 @@ def main(argv=None):

config.load_incluster_config()
api_client = k8s_client.ApiClient()
create_response = create_study_job(api_client, content_yaml)
create_response = study_job_client.create_study_job(api_client, content_yaml)
job_name = create_response['metadata']['name']
job_namespace = create_response['metadata']['namespace']

expected_condition = ["Completed", "Failed"]
wait_response = wait_for_condition(
wait_response = study_job_client.wait_for_condition(
api_client, job_namespace, job_name, expected_condition,
timeout=datetime.timedelta(minutes=args.studyjobtimeoutminutes))
succ = False
Expand All @@ -299,7 +155,7 @@ def main(argv=None):
if succ:
logging.info('Study success.')

delete_study_job(api_client, job_name, job_namespace)
study_job_client.delete_study_job(api_client, job_name, job_namespace)

if __name__== "__main__":
main()

0 comments on commit d4e49fc

Please sign in to comment.