From 4ef0e4e109903859394d688278fea3e6b354996a Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Thu, 15 Mar 2018 17:04:18 -0700 Subject: [PATCH 1/4] enable mixed mode for jobs --- aztk/client.py | 2 +- aztk/spark/client.py | 16 +++++----------- aztk/spark/models/models.py | 3 +++ 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/aztk/client.py b/aztk/client.py index 0c70b2bc..be3b810c 100644 --- a/aztk/client.py +++ b/aztk/client.py @@ -300,7 +300,7 @@ def __submit_job(self, auto_scale_formula=autoscale_formula, auto_scale_evaluation_interval=timedelta(minutes=5), start_task=start_task, - enable_inter_node_communication=True, + enable_inter_node_communication=False if job_configuration.mixed_mode() else True, network_configuration=network_conf, max_tasks_per_node=1, metadata=[ diff --git a/aztk/spark/client.py b/aztk/spark/client.py index d6f83bc1..51825513 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -171,6 +171,7 @@ def submit_job(self, job_configuration): zip_resource_files, job_configuration.gpu_enabled, job_configuration.docker_repo, + mixed_mode=job_configuration.mixed_mode(), worker_on_master=job_configuration.worker_on_master) application_tasks = [] @@ -189,17 +190,10 @@ def submit_job(self, job_configuration): offer='UbuntuServer', sku='16.04') - if job_configuration.max_dedicated_nodes and not job_configuration.max_low_pri_nodes: - autoscale_formula = "maxNumberofVMs = {0}; targetNumberofVMs = {1};" \ - " $TargetDedicatedNodes=min(maxNumberofVMs, targetNumberofVMs)".format( - job_configuration.max_dedicated_nodes, job_configuration.max_dedicated_nodes) - elif job_configuration.max_low_pri_nodes and not job_configuration.max_dedicated_nodes: - autoscale_formula = "maxNumberofVMs = {0}; targetNumberofVMs = {1};" \ - " $TargetLowPriorityNodes=min(maxNumberofVMs, targetNumberofVMs)".format( - job_configuration.max_low_pri_nodes, job_configuration.max_low_pri_nodes) - else: - raise error.AztkError("Jobs do not support both dedicated and low priority nodes." \ - " JobConfiguration fields max_dedicated_nodes and max_low_pri_nodes are mutually exclusive values.") + autoscale_formula = "$TargetDedicatedNodes = {0}; " \ + "$TargetLowPriorityNodes = {1}".format( + job_configuration.max_dedicated_nodes, + job_configuration.max_low_pri_nodes) job = self.__submit_job( job_configuration=job_configuration, diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index 64345f7f..b21433bf 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -230,6 +230,9 @@ def as_cluster_config(self): spark_configuration=self.spark_configuration, ) + def mixed_mode(self) -> bool: + return self.max_dedicated_nodes > 0 and self.max_low_pri_nodes > 0 + class JobState(): complete = 'completed' From a41ff11b01eb85206526f26767cf56bcd11d084c Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Thu, 15 Mar 2018 17:10:21 -0700 Subject: [PATCH 2/4] simplify --- aztk/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aztk/client.py b/aztk/client.py index be3b810c..aabf4cae 100644 --- a/aztk/client.py +++ b/aztk/client.py @@ -300,7 +300,7 @@ def __submit_job(self, auto_scale_formula=autoscale_formula, auto_scale_evaluation_interval=timedelta(minutes=5), start_task=start_task, - enable_inter_node_communication=False if job_configuration.mixed_mode() else True, + enable_inter_node_communication=not job_configuration.mixed_mode(), network_configuration=network_conf, max_tasks_per_node=1, metadata=[ From 846a83b56f127f506dfbec55ca039210a4894de9 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Thu, 15 Mar 2018 17:26:58 -0700 Subject: [PATCH 3/4] add job configuration validation --- aztk/spark/client.py | 3 ++- aztk/spark/models/models.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/aztk/spark/client.py b/aztk/spark/client.py index 51825513..ecab07e6 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -163,8 +163,9 @@ def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str) ''' def submit_job(self, job_configuration): try: + job_configuration.validate() cluster_data = self._get_cluster_data(job_configuration.id) - node_data = NodeData(job_configuration.as_cluster_config()).add_core().done() + node_data = NodeData(job_configuration.to_cluster_config()).add_core().done() zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file() start_task = create_cluster_helper.generate_cluster_start_task(self, diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index b21433bf..01fde293 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -2,6 +2,7 @@ from Crypto.PublicKey import RSA from typing import List import aztk.models +from aztk import error from aztk.utils import constants, helpers import azure.batch.models as batch_models @@ -222,17 +223,45 @@ def __init__( self.subnet_id = subnet_id self.worker_on_master = worker_on_master - def as_cluster_config(self): + def to_cluster_config(self): return ClusterConfiguration( cluster_id = self.id, custom_scripts = self.custom_scripts, + docker_repo=self.docker_repo, vm_size=self.vm_size, + vm_count=self.max_dedicated_nodes, + vm_low_pri_count=self.max_low_pri_nodes, + subnet_id=self.subnet_id, + worker_on_master=self.worker_on_master, spark_configuration=self.spark_configuration, ) def mixed_mode(self) -> bool: return self.max_dedicated_nodes > 0 and self.max_low_pri_nodes > 0 + def validate(self) -> bool: + """ + Validate the config at its current state. + Raises: Error if invalid + """ + if self.id is None: + raise error.AztkError("Please supply an ID for the Job in your configuration.") + + if self.max_dedicated_nodes == 0 and self.max_low_pri_nodes == 0: + raise error.AztkError( + "Please supply a valid (greater than 0) value for either max_dedicated_nodes or max_low_pri_nodes in your configuration." + ) + + if self.vm_size is None: + raise error.AztkError( + "Please supply a vm_size in your configuration." + ) + + if self.mixed_mode() and not self.subnet_id: + raise error.AztkError( + "You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes) and pass the subnet_id in your configuration.." + ) + class JobState(): complete = 'completed' From 5c37b2c0504cefa80112036f31ba6047def97fdc Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Thu, 15 Mar 2018 17:35:41 -0700 Subject: [PATCH 4/4] whitespace --- aztk/models/models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aztk/models/models.py b/aztk/models/models.py index 30d12064..50552958 100644 --- a/aztk/models/models.py +++ b/aztk/models/models.py @@ -50,7 +50,6 @@ def merge(self, other): ]) - class ClusterConfiguration(ConfigurationBase): """ Cluster configuration model