Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: Spark scheduling target (#661)
Browse files Browse the repository at this point in the history
* initial

* update pipfile and pipfile.lock

* uncomment scheduling target, start ssh_submit impl

* get rid of debug code

* finish ssh_submit implementation

* serialize object instead of properties

* fix upload log bug, temp workaround for get logs

* remove unused function

* clean up node_scripts submit, remove debug code

* ensure warns on deprecated test

* remove commented timeout

* start scheduling_target for job_submission

* continue job scheduling target implementation

* update pipefile.lock

* update Pipfile deps, pin pynacl to fix build failure

* fix syntax

* fix pipfile with latest azure-nspkg

* update path for scheduling scripts

* update config.py import

* add nohup dependency

* use nohup and exit immediately

* remove bad dep

* remove nohup

* remove commented code

* add block to ssh, get retcode from node_exec

* fix typo

* fix some imports, add test stubs

* fixes

* start implementation of task table service

* add scheduling_target support for get_application_log

* todos

* remove useless statement

* move get_application_status to core, add scheduling_target support

* update deps in requirements.txt

* fix false positive pylint import error

* remove bad import

* bad local variable

* add batch task abstraction, add datetime field

* mediate table insertion with task abstraction

* fix issues with task abstraction usage

* fix pylint import error

* fix update task on run

* update job submission test

* make test package, update pylint

* update job submission with scheduling_target

* add job support for scheduling_target

* fix taskstate serialization to storage

* fix job submission job manager task, catch table storage errors

* fix import

* fix imports for batch sdk 5.0+

* fix test model module

* fix node election exception catch

* start fix job tests

* move get_task_status to base

* fix job tests

* fix get_application, add abstraction to batch task gets

* fix some bugs, remove some debug statements

* fix test

* use jobstate and application state

* add start_task retries

* make jobstate an enum

* fix import

* fixes

* fixes

* revert settings.json

* fixes for application state in cli

* conditionally create storage table

* remove commented code

* conditionally create storage table

* remove commented code

* fix test

* respond to comments

* fix debug statement, fix starttask issue

* remove debug test print

* formatting

* update doc string with correct return value

* revert settings.json

* more robust starget test, fix get_application for starget

* whitespace
  • Loading branch information
jafreck authored Oct 23, 2018
1 parent b7da355 commit 4408c4f
Show file tree
Hide file tree
Showing 117 changed files with 1,708 additions and 877 deletions.
141 changes: 126 additions & 15 deletions aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
from aztk import models
from aztk.internal import cluster_data

from .helpers import (
create_user_on_cluster,
create_user_on_node,
delete_user_on_cluster,
delete_user_on_node,
generate_user_on_cluster,
generate_user_on_node,
get_application_log,
get_remote_login_settings,
node_run,
run,
ssh_into_node,
)
from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
generate_user_on_cluster, generate_user_on_node, get_application_log, get_recent_job,
get_remote_login_settings, get_task_state, list_tasks, node_run, run, ssh_into_node, task_table)


class BaseOperations:
Expand All @@ -31,6 +21,7 @@ class BaseOperations:
def __init__(self, context):
self.batch_client = context["batch_client"]
self.blob_client = context["blob_client"]
self.table_service = context["table_service"]
self.secrets_configuration = context["secrets_configuration"]

def get_cluster_configuration(self, id: str) -> models.ClusterConfiguration:
Expand Down Expand Up @@ -168,7 +159,7 @@ def delete_user_on_cluster(self, username, id, nodes):
"""
return delete_user_on_cluster.delete_user_on_cluster(self, username, id, nodes)

def node_run(self, id, node_id, command, internal, container_name=None, timeout=None):
def node_run(self, id, node_id, command, internal, container_name=None, timeout=None, block=True):
"""Run a bash command on the given node
Args:
Expand All @@ -181,11 +172,12 @@ def node_run(self, id, node_id, command, internal, container_name=None, timeout=
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
block=True (:obj:`bool`, optional): If True, the command blocks until execution is complete.
Returns:
:obj:`aztk.models.NodeOutput`: object containing the output of the run command
"""
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout)
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout, block)

def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster
Expand Down Expand Up @@ -233,3 +225,122 @@ def get_application_log(self, id: str, application_name: str, tail=False, curren
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self, id, application_name, tail, current_bytes)

def create_task_table(self, id: str):
"""Create an Azure Table Storage to track tasks
Args:
id (:obj:`str`): the id of the cluster
"""
return task_table.create_task_table(self.table_service, id)

def list_task_table_entries(self, id):
"""list tasks in a storage table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`[aztk.models.Task]`: a list of models representing all entries in the Task table
"""
return task_table.list_task_table_entries(self.table_service, id)

def get_task_from_table(self, id, task_id):
"""Create a storage table to track tasks
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`[aztk.models.Task]`: the task with id task_id from the cluster's storage table
"""
return task_table.get_task_from_table(self.table_service, id, task_id)

def insert_task_into_task_table(self, id, task):
"""Insert a task into the table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`aztk.models.Task`: a model representing an entry in the Task table
"""
return task_table.insert_task_into_task_table(self.table_service, id, task)

def update_task_in_task_table(self, id, task):
"""Update a task in the table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`aztk.models.Task`: a model representing an entry in the Task table
"""
return task_table.update_task_in_task_table(self.table_service, id, task)

def delete_task_table(self, id):
"""Delete the table that tracks tasks
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`bool`: if True, the deletion was successful
"""
return task_table.delete_task_table(self.table_service, id)

def list_tasks(self, id):
"""list tasks in a storage table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`[aztk.models.Task]`: a list of models representing all entries in the Task table
"""
return list_tasks.list_tasks(self, id)

def get_recent_job(self, id):
"""Get the most recently run job in an Azure Batch job schedule
Args:
id (:obj:`str`): the id of the job schedule
Returns:
:obj:`[azure.batch.models.Job]`: the most recently run job on the job schedule
"""
return get_recent_job.get_recent_job(self, id)

def get_task_state(self, id: str, task_name: str):
"""Get the status of a submitted task
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
task_name (:obj:`str`): the name of the task to get
Returns:
:obj:`str`: the status state of the task
"""
return get_task_state.get_task_state(self, id, task_name)

def list_batch_tasks(self, id: str):
"""Get the status of a submitted task
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
Returns:
:obj:`str`: the status state of the task
"""
return task_table.list_batch_tasks(self.batch_client, id)

def get_batch_task(self, id: str, task_id: str):
"""Get the status of a submitted task
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
task_id (:obj:`str`): the name of the task to get
Returns:
:obj:`str`: the status state of the task
"""
return task_table.get_batch_task(self.batch_client, id, task_id)
6 changes: 3 additions & 3 deletions aztk/client/base/helpers/create_user_on_node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timedelta, timezone

import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from azure.batch.models import BatchErrorException

from aztk.utils import get_ssh_key

Expand Down Expand Up @@ -32,9 +32,9 @@ def __create_user(self, id: str, node_id: str, username: str, password: str = No
def create_user_on_node(base_client, id, node_id, username, ssh_key=None, password=None):
try:
__create_user(base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
except batch_error.BatchErrorException as error:
except BatchErrorException as error:
try:
base_client.delete_user_on_node(id, node_id, username)
base_client.create_user_on_node(id=id, node_id=node_id, username=username, ssh_key=ssh_key)
except batch_error.BatchErrorException as error:
except BatchErrorException as error:
raise error
74 changes: 49 additions & 25 deletions aztk/client/base/helpers/get_application_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,44 @@

import azure
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from azure.batch.models import BatchErrorException

from aztk import error, models
from aztk.models import Task, TaskState
from aztk.utils import constants, helpers

output_file = constants.TASK_WORKING_DIR + "/" + constants.SPARK_SUBMIT_LOGS_FILE


def __check_task_node_exist(batch_client, cluster_id: str, task: batch_models.CloudTask) -> bool:
def __check_task_node_exist(batch_client, cluster_id: str, task: Task) -> bool:
try:
batch_client.compute_node.get(cluster_id, task.node_info.node_id)
batch_client.compute_node.get(cluster_id, task.node_id)
return True
except batch_error.BatchErrorException:
except BatchErrorException:
return False


def __wait_for_app_to_be_running(batch_client, cluster_id: str, application_name: str) -> batch_models.CloudTask:
def __wait_for_app_to_be_running(base_operations, cluster_id: str, application_name: str) -> Task:
"""
Wait for the batch task to leave the waiting state into running(or completed if it was fast enough)
"""

while True:
task = batch_client.task.get(cluster_id, application_name)
task_state = base_operations.get_task_state(cluster_id, application_name)

if task.state is batch_models.TaskState.active or task.state is batch_models.TaskState.preparing:
if task_state in [batch_models.TaskState.active, batch_models.TaskState.preparing]:
# TODO: log
time.sleep(5)
else:
return task
return base_operations.get_batch_task(id=cluster_id, task_id=application_name)


def __get_output_file_properties(batch_client, cluster_id: str, application_name: str):
while True:
try:
file = helpers.get_file_properties(cluster_id, application_name, output_file, batch_client)
return file
except batch_error.BatchErrorException as e:
except BatchErrorException as e:
if e.response.status_code == 404:
# TODO: log
time.sleep(5)
Expand All @@ -47,6 +49,15 @@ def __get_output_file_properties(batch_client, cluster_id: str, application_name


def get_log_from_storage(blob_client, container_name, application_name, task):
"""
Args:
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
container_name (:obj:`str`): the name of the Azure Blob storage container to get data from
application_name (:obj:`str`): the name of the application to get logs for
task (:obj:`aztk.models.Task`): the aztk task for for this application
"""
try:
blob = blob_client.get_blob_to_text(container_name, application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE)
except azure.common.AzureMissingResourceHttpError:
Expand All @@ -55,23 +66,37 @@ def get_log_from_storage(blob_client, container_name, application_name, task):
return models.ApplicationLog(
name=application_name,
cluster_id=container_name,
application_state=task.state.name,
application_state=task.state,
log=blob.content,
total_bytes=blob.properties.content_length,
exit_code=task.execution_info.exit_code,
exit_code=task.exit_code,
)


def get_log(batch_client, blob_client, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
def wait_for_scheduling_target_task(base_operations, cluster_id, application_name):
application_state = base_operations.get_task_state(cluster_id, application_name)
while TaskState(application_state) not in [TaskState.Completed, TaskState.Failed]:
time.sleep(3)
# TODO: enable logger
# log.debug("{} {}: application not yet complete".format(cluster_id, application_name))
application_state = base_operations.get_task_state(cluster_id, application_name)
return base_operations.get_task_from_table(cluster_id, application_name)


def get_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
job_id = cluster_id
task_id = application_name
cluster_configuration = base_operations.get_cluster_configuration(cluster_id)

task = __wait_for_app_to_be_running(batch_client, cluster_id, application_name)

if not __check_task_node_exist(batch_client, cluster_id, task):
return get_log_from_storage(blob_client, cluster_id, application_name, task)
if cluster_configuration.scheduling_target is not models.SchedulingTarget.Any:
task = wait_for_scheduling_target_task(base_operations, cluster_id, application_name)
return get_log_from_storage(base_operations.blob_client, cluster_id, application_name, task)
else:
task = __wait_for_app_to_be_running(base_operations, cluster_id, application_name)
if not __check_task_node_exist(base_operations.batch_client, cluster_id, task):
return get_log_from_storage(base_operations.blob_client, cluster_id, application_name, task)

file = __get_output_file_properties(batch_client, cluster_id, application_name)
file = __get_output_file_properties(base_operations.batch_client, cluster_id, application_name)
target_bytes = file.content_length

if target_bytes != current_bytes:
Expand All @@ -80,32 +105,31 @@ def get_log(batch_client, blob_client, cluster_id: str, application_name: str, t
if tail:
ocp_range = "bytes={0}-{1}".format(current_bytes, target_bytes - 1)

stream = batch_client.file.get_from_task(
stream = base_operations.batch_client.file.get_from_task(
job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range))
content = helpers.read_stream_as_string(stream)

return models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state.name,
application_state=task.state,
log=content,
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code,
exit_code=task.exit_code,
)
else:
return models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state.name,
application_state=task.state,
log="",
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code,
exit_code=task.exit_code,
)


def get_application_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
try:
return get_log(base_operations.batch_client, base_operations.blob_client, cluster_id, application_name, tail,
current_bytes)
except batch_error.BatchErrorException as e:
return get_log(base_operations, cluster_id, application_name, tail, current_bytes)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
14 changes: 14 additions & 0 deletions aztk/client/base/helpers/get_recent_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from azure.batch.models import BatchErrorException

from aztk import error
from aztk.utils import helpers


# Note: this only works with jobs, not clusters
# cluster impl is planned to change to job schedule
def get_recent_job(core_job_operations, id):
try:
job_schedule = core_job_operations.batch_client.job_schedule.get(id)
return core_job_operations.batch_client.job.get(job_schedule.execution_info.recent_job.id)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
4 changes: 2 additions & 2 deletions aztk/client/base/helpers/get_remote_login_settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import azure.batch.models.batch_error as batch_error
from azure.batch.models import BatchErrorException

from aztk import error, models
from aztk.utils import helpers
Expand All @@ -18,5 +18,5 @@ def _get_remote_login_settings(base_client, pool_id: str, node_id: str):
def get_remote_login_settings(base_client, cluster_id: str, node_id: str):
try:
return _get_remote_login_settings(base_client, cluster_id, node_id)
except batch_error.BatchErrorException as e:
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
Loading

0 comments on commit 4408c4f

Please sign in to comment.