Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Feature: spark submit scheduling internal #674

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aztk/node_scripts/scheduling/job_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def schedule_with_target(scheduling_target, task_sas_urls):
format(task_working_dir, aztk_cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE))
node_id = select_scheduling_target_node(config.spark_client.cluster, config.pool_id, scheduling_target)
node_run_output = config.spark_client.cluster.node_run(
config.pool_id, node_id, task_cmd, timeout=120, block=False)
config.pool_id, node_id, task_cmd, timeout=120, block=False, internal=True)
# block job_manager_task until scheduling_target task completion
wait_until_tasks_complete(aztk_cluster_id)

Expand Down
21 changes: 16 additions & 5 deletions aztk/spark/client/cluster/helpers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ def select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduli
return cluster.master_node_id


def schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task, wait):
def schedule_with_target(
core_cluster_operations,
spark_cluster_operations,
cluster_id,
scheduling_target,
task,
wait,
internal,
):
# upload "real" task definition to storage
serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.blob_client, cluster_id,
task)
Expand All @@ -65,7 +73,8 @@ def schedule_with_target(core_cluster_operations, spark_cluster_operations, clus
format(task_working_dir, cluster_id, serialized_task_resource_file.blob_source,
constants.SPARK_SUBMIT_LOGS_FILE))
node_id = select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduling_target)
node_run_output = spark_cluster_operations.node_run(cluster_id, node_id, task_cmd, timeout=120, block=wait)
node_run_output = spark_cluster_operations.node_run(
cluster_id, node_id, task_cmd, timeout=120, block=wait, internal=internal)


def get_cluster_scheduling_target(core_cluster_operations, cluster_id):
Expand All @@ -80,6 +89,7 @@ def submit_application(
application,
remote: bool = False,
wait: bool = False,
internal: bool = False,
):
"""
Submit a spark app
Expand All @@ -90,7 +100,7 @@ def submit_application(
scheduling_target = get_cluster_scheduling_target(core_cluster_operations, cluster_id)
if scheduling_target is not models.SchedulingTarget.Any:
schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task,
wait)
wait, internal)
else:
# Add task to batch job (which has the same name as cluster_id)
core_cluster_operations.batch_client.task.add(job_id=cluster_id, task=task)
Expand All @@ -107,9 +117,10 @@ def submit(
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False,
scheduling_target: str = None,
internal: bool = False,
):
try:
submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait)
submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait,
internal)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
14 changes: 12 additions & 2 deletions aztk/spark/client/cluster/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ def list(self):
"""
return list.list_clusters(self._core_cluster_operations)

def submit(self, id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False):
def submit(
self,
id: str,
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False,
internal: bool = False,
):
"""Submit an application to a cluster.

Args:
Expand All @@ -72,13 +79,16 @@ def submit(self, id: str, application: models.ApplicationConfiguration, remote:
remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable
by the cluster already. This is useful when your application is stored in a mounted Azure File Share
and not the client. Defaults to False.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. This only applies if the cluster's
SchedulingTarget is not set to SchedulingTarget.Any. Defaults to False.
wait (:obj:`bool`, optional): If True, this function blocks until the application has completed.
Defaults to False.

Returns:
:obj:`None`
"""
return submit.submit(self._core_cluster_operations, self, id, application, remote, wait)
return submit.submit(self._core_cluster_operations, self, id, application, remote, wait, internal)

def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None):
"""Create a user on every node in the cluster
Expand Down
7 changes: 7 additions & 0 deletions aztk_cli/spark/endpoints/cluster/cluster_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ def setup_parser(parser: argparse.ArgumentParser):
already accessible at the given path",
)

parser.add_argument(
"--internal",
action="store_true",
help="Connect using the local IP of the master node. Only use if using a VPN.",
)

parser.add_argument(
"app",
help="App jar OR python file to execute. A path to a local "
Expand Down Expand Up @@ -133,6 +139,7 @@ def execute(args: typing.NamedTuple):
max_retry_count=args.max_retry_count,
),
remote=args.remote,
internal=args.internal,
wait=False,
)

Expand Down