Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: spark submit scheduling internal (#674)
Browse files Browse the repository at this point in the history
* add internal support for scheduling_target cluster submit

* add internal support for scheduling target job submission

* add cli flag
  • Loading branch information
jafreck authored Oct 26, 2018
1 parent 18b74e4 commit 8c2bf0c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
2 changes: 1 addition & 1 deletion aztk/node_scripts/scheduling/job_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def schedule_with_target(scheduling_target, task_sas_urls):
format(task_working_dir, aztk_cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE))
node_id = select_scheduling_target_node(config.spark_client.cluster, config.pool_id, scheduling_target)
node_run_output = config.spark_client.cluster.node_run(
config.pool_id, node_id, task_cmd, timeout=120, block=False)
config.pool_id, node_id, task_cmd, timeout=120, block=False, internal=True)
# block job_manager_task until scheduling_target task completion
wait_until_tasks_complete(aztk_cluster_id)

Expand Down
21 changes: 16 additions & 5 deletions aztk/spark/client/cluster/helpers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ def select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduli
return cluster.master_node_id


def schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task, wait):
def schedule_with_target(
core_cluster_operations,
spark_cluster_operations,
cluster_id,
scheduling_target,
task,
wait,
internal,
):
# upload "real" task definition to storage
serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.blob_client, cluster_id,
task)
Expand All @@ -65,7 +73,8 @@ def schedule_with_target(core_cluster_operations, spark_cluster_operations, clus
format(task_working_dir, cluster_id, serialized_task_resource_file.blob_source,
constants.SPARK_SUBMIT_LOGS_FILE))
node_id = select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduling_target)
node_run_output = spark_cluster_operations.node_run(cluster_id, node_id, task_cmd, timeout=120, block=wait)
node_run_output = spark_cluster_operations.node_run(
cluster_id, node_id, task_cmd, timeout=120, block=wait, internal=internal)


def get_cluster_scheduling_target(core_cluster_operations, cluster_id):
Expand All @@ -80,6 +89,7 @@ def submit_application(
application,
remote: bool = False,
wait: bool = False,
internal: bool = False,
):
"""
Submit a spark app
Expand All @@ -90,7 +100,7 @@ def submit_application(
scheduling_target = get_cluster_scheduling_target(core_cluster_operations, cluster_id)
if scheduling_target is not models.SchedulingTarget.Any:
schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task,
wait)
wait, internal)
else:
# Add task to batch job (which has the same name as cluster_id)
core_cluster_operations.batch_client.task.add(job_id=cluster_id, task=task)
Expand All @@ -107,9 +117,10 @@ def submit(
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False,
scheduling_target: str = None,
internal: bool = False,
):
try:
submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait)
submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait,
internal)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
14 changes: 12 additions & 2 deletions aztk/spark/client/cluster/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ def list(self):
"""
return list.list_clusters(self._core_cluster_operations)

def submit(self, id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False):
def submit(
self,
id: str,
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False,
internal: bool = False,
):
"""Submit an application to a cluster.
Args:
Expand All @@ -72,13 +79,16 @@ def submit(self, id: str, application: models.ApplicationConfiguration, remote:
remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable
by the cluster already. This is useful when your application is stored in a mounted Azure File Share
and not the client. Defaults to False.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. This only applies if the cluster's
SchedulingTarget is not set to SchedulingTarget.Any. Defaults to False.
wait (:obj:`bool`, optional): If True, this function blocks until the application has completed.
Defaults to False.
Returns:
:obj:`None`
"""
return submit.submit(self._core_cluster_operations, self, id, application, remote, wait)
return submit.submit(self._core_cluster_operations, self, id, application, remote, wait, internal)

def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None):
"""Create a user on every node in the cluster
Expand Down
7 changes: 7 additions & 0 deletions aztk_cli/spark/endpoints/cluster/cluster_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ def setup_parser(parser: argparse.ArgumentParser):
already accessible at the given path",
)

parser.add_argument(
"--internal",
action="store_true",
help="Connect using the local IP of the master node. Only use if using a VPN.",
)

parser.add_argument(
"app",
help="App jar OR python file to execute. A path to a local "
Expand Down Expand Up @@ -133,6 +139,7 @@ def execute(args: typing.NamedTuple):
max_retry_count=args.max_retry_count,
),
remote=args.remote,
internal=args.internal,
wait=False,
)

Expand Down

0 comments on commit 8c2bf0c

Please sign in to comment.