Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Feature: enable mixed mode for jobs #442

Merged
merged 4 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def __submit_job(self,
auto_scale_formula=autoscale_formula,
auto_scale_evaluation_interval=timedelta(minutes=5),
start_task=start_task,
enable_inter_node_communication=True,
enable_inter_node_communication=not job_configuration.mixed_mode(),
network_configuration=network_conf,
max_tasks_per_node=1,
metadata=[
Expand Down
1 change: 0 additions & 1 deletion aztk/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def merge(self, other):
])



class ClusterConfiguration(ConfigurationBase):
"""
Cluster configuration model
Expand Down
19 changes: 7 additions & 12 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,16 @@ def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str)
'''
def submit_job(self, job_configuration):
try:
job_configuration.validate()
cluster_data = self._get_cluster_data(job_configuration.id)
node_data = NodeData(job_configuration.as_cluster_config()).add_core().done()
node_data = NodeData(job_configuration.to_cluster_config()).add_core().done()
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()

start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
job_configuration.gpu_enabled,
job_configuration.docker_repo,
mixed_mode=job_configuration.mixed_mode(),
worker_on_master=job_configuration.worker_on_master)

application_tasks = []
Expand All @@ -189,17 +191,10 @@ def submit_job(self, job_configuration):
offer='UbuntuServer',
sku='16.04')

if job_configuration.max_dedicated_nodes and not job_configuration.max_low_pri_nodes:
autoscale_formula = "maxNumberofVMs = {0}; targetNumberofVMs = {1};" \
" $TargetDedicatedNodes=min(maxNumberofVMs, targetNumberofVMs)".format(
job_configuration.max_dedicated_nodes, job_configuration.max_dedicated_nodes)
elif job_configuration.max_low_pri_nodes and not job_configuration.max_dedicated_nodes:
autoscale_formula = "maxNumberofVMs = {0}; targetNumberofVMs = {1};" \
" $TargetLowPriorityNodes=min(maxNumberofVMs, targetNumberofVMs)".format(
job_configuration.max_low_pri_nodes, job_configuration.max_low_pri_nodes)
else:
raise error.AztkError("Jobs do not support both dedicated and low priority nodes." \
" JobConfiguration fields max_dedicated_nodes and max_low_pri_nodes are mutually exclusive values.")
autoscale_formula = "$TargetDedicatedNodes = {0}; " \
"$TargetLowPriorityNodes = {1}".format(
job_configuration.max_dedicated_nodes,
job_configuration.max_low_pri_nodes)

job = self.__submit_job(
job_configuration=job_configuration,
Expand Down
34 changes: 33 additions & 1 deletion aztk/spark/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from Crypto.PublicKey import RSA
from typing import List
import aztk.models
from aztk import error
from aztk.utils import constants, helpers
import azure.batch.models as batch_models

Expand Down Expand Up @@ -222,14 +223,45 @@ def __init__(
self.subnet_id = subnet_id
self.worker_on_master = worker_on_master

def as_cluster_config(self):
def to_cluster_config(self):
return ClusterConfiguration(
cluster_id = self.id,
custom_scripts = self.custom_scripts,
docker_repo=self.docker_repo,
vm_size=self.vm_size,
vm_count=self.max_dedicated_nodes,
vm_low_pri_count=self.max_low_pri_nodes,
subnet_id=self.subnet_id,
worker_on_master=self.worker_on_master,
spark_configuration=self.spark_configuration,
)

def mixed_mode(self) -> bool:
return self.max_dedicated_nodes > 0 and self.max_low_pri_nodes > 0

def validate(self) -> bool:
"""
Validate the config at its current state.
Raises: Error if invalid
"""
if self.id is None:
raise error.AztkError("Please supply an ID for the Job in your configuration.")

if self.max_dedicated_nodes == 0 and self.max_low_pri_nodes == 0:
raise error.AztkError(
"Please supply a valid (greater than 0) value for either max_dedicated_nodes or max_low_pri_nodes in your configuration."
)

if self.vm_size is None:
raise error.AztkError(
"Please supply a vm_size in your configuration."
)

if self.mixed_mode() and not self.subnet_id:
raise error.AztkError(
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes) and pass the subnet_id in your configuration.."
)


class JobState():
complete = 'completed'
Expand Down