Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Fix: spark roll back scheduling disable (#653)
Browse files Browse the repository at this point in the history
* disable offlining on node

* disable scheduling_target in config, cli, and sdk

* remove schedluing target function

* formatting

* remove alway none return value
  • Loading branch information
jafreck authored Aug 29, 2018
1 parent 7c37b06 commit 93615d9
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 68 deletions.
28 changes: 12 additions & 16 deletions aztk/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
from .application_log import ApplicationLog
from .cluster import Cluster
from .cluster_configuration import ClusterConfiguration
from .custom_script import CustomScript
from .file_share import FileShare
from .toolkit import TOOLKIT_MAP, Toolkit
from .user_configuration import UserConfiguration
from .secrets_configuration import (
SecretsConfiguration,
ServicePrincipalConfiguration,
SharedKeyConfiguration,
DockerConfiguration,
)
from .file import File
from .file_share import FileShare
from .node_output import NodeOutput
from .plugins import *
# from .scheduling_target import SchedulingTarget
from .port_forward_specification import PortForwardingSpecification
from .remote_login import RemoteLogin
from .secrets_configuration import (DockerConfiguration, SecretsConfiguration, ServicePrincipalConfiguration,
SharedKeyConfiguration)
from .software import Software
from .ssh_log import SSHLog
from .toolkit import TOOLKIT_MAP, Toolkit
from .user_configuration import UserConfiguration
from .vm_image import VmImage
from .node_output import NodeOutput
from .software import Software
from .cluster import Cluster
from .scheduling_target import SchedulingTarget
from .port_forward_specification import PortForwardingSpecification
from .application_log import ApplicationLog
from .plugins import *
9 changes: 5 additions & 4 deletions aztk/models/cluster_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .custom_script import CustomScript
from .file_share import FileShare
from .plugins import PluginConfiguration
from .scheduling_target import SchedulingTarget
# from .scheduling_target import SchedulingTarget
from .toolkit import Toolkit
from .user_configuration import UserConfiguration

Expand Down Expand Up @@ -37,7 +37,8 @@ class ClusterConfiguration(Model):
plugins = fields.List(PluginConfiguration)
file_shares = fields.List(FileShare)
user_configuration = fields.Model(UserConfiguration, default=None)
scheduling_target = fields.Enum(SchedulingTarget, default=None)

# scheduling_target = fields.Enum(SchedulingTarget, default=None)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -73,5 +74,5 @@ def __validate__(self) -> bool:
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes). "
"Set the VNET's subnet_id in your cluster.yaml or with a parameter (--subnet-id).")

if self.scheduling_target == SchedulingTarget.Dedicated and self.size == 0:
raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")
# if self.scheduling_target == SchedulingTarget.Dedicated and self.size == 0:
# raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")
4 changes: 2 additions & 2 deletions aztk/node_scripts/install/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from core import config
from install import create_user, pick_master, plugins, spark, spark_container

from .node_scheduling import setup_node_scheduling
# from .node_scheduling import setup_node_scheduling


def read_cluster_config():
Expand Down Expand Up @@ -48,7 +48,7 @@ def setup_host(docker_repo: str, docker_run_options: str):

cluster_conf = read_cluster_config()

setup_node_scheduling(client, cluster_conf, is_master)
# setup_node_scheduling(client, cluster_conf, is_master)

# TODO pass azure file shares
spark_container.start_spark_container(
Expand Down
6 changes: 5 additions & 1 deletion aztk/node_scripts/install/node_scheduling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from azure import batch
from aztk.models import ClusterConfiguration, SchedulingTarget

from aztk.models import ClusterConfiguration
from core import config, log

# from aztk.models import SchedulingTarget
SchedulingTarget = "SchedulingTarget" # this code isn't used anywhere until scheduling_target reenabled


def disable_scheduling(batch_client: batch.BatchServiceClient):
"""
Expand Down
15 changes: 7 additions & 8 deletions aztk/spark/client/cluster/helpers/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))


def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
# def _default_scheduling_target(vm_count: int):
# if vm_count == 0:
# return models.SchedulingTarget.Any
# else:
# return models.SchedulingTarget.Dedicated


def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration):
cluster_conf = models.ClusterConfiguration()
cluster_conf.merge(configuration)
if cluster_conf.scheduling_target is None:
cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
# if cluster_conf.scheduling_target is None:
# cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
return cluster_conf


Expand Down
14 changes: 7 additions & 7 deletions aztk/spark/client/job/helpers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ def generate_job_manager_task(core_job_operations, job, application_tasks):
return task


def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
# def _default_scheduling_target(vm_count: int):
# if vm_count == 0:
# return models.SchedulingTarget.Any
# else:
# return models.SchedulingTarget.Dedicated


def _apply_default_for_job_config(job_conf: models.JobConfiguration):
if job_conf.scheduling_target is None:
job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)
# if job_conf.scheduling_target is None:
# job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)

return job_conf

Expand Down
12 changes: 6 additions & 6 deletions aztk/spark/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class PluginConfiguration(aztk.models.PluginConfiguration):
pass


SchedulingTarget = aztk.models.SchedulingTarget
# SchedulingTarget = aztk.models.SchedulingTarget


class ClusterConfiguration(aztk.models.ClusterConfiguration):
Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
max_dedicated_nodes=0,
max_low_pri_nodes=0,
subnet_id=None,
scheduling_target: SchedulingTarget = None,
# scheduling_target: SchedulingTarget = None,
worker_on_master=None,
):

Expand All @@ -214,7 +214,7 @@ def __init__(
self.max_low_pri_nodes = max_low_pri_nodes
self.subnet_id = subnet_id
self.worker_on_master = worker_on_master
self.scheduling_target = scheduling_target
# self.scheduling_target = scheduling_target

def to_cluster_config(self):
return ClusterConfiguration(
Expand All @@ -226,7 +226,7 @@ def to_cluster_config(self):
subnet_id=self.subnet_id,
worker_on_master=self.worker_on_master,
spark_configuration=self.spark_configuration,
scheduling_target=self.scheduling_target,
# scheduling_target=self.scheduling_target,
)

def mixed_mode(self) -> bool:
Expand Down Expand Up @@ -263,8 +263,8 @@ def validate(self) -> bool:
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes) "
"and pass the subnet_id in your configuration..")

if self.scheduling_target == SchedulingTarget.Dedicated and self.max_dedicated_nodes == 0:
raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")
# if self.scheduling_target == SchedulingTarget.Dedicated and self.max_dedicated_nodes == 0:
# raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")


class JobState:
Expand Down
12 changes: 7 additions & 5 deletions aztk_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import aztk.spark
from aztk.models import Toolkit
from aztk.models.plugins.internal import PluginReference
from aztk.spark.models import (ClusterConfiguration, SchedulingTarget, SecretsConfiguration)
from aztk.spark.models import ClusterConfiguration, SecretsConfiguration

# from aztk.spark.models import SchedulingTarget


def load_aztk_secrets() -> SecretsConfiguration:
Expand Down Expand Up @@ -179,7 +181,7 @@ def __init__(self):
self.core_site_xml = None
self.subnet_id = None
self.worker_on_master = None
self.scheduling_target = None
# self.scheduling_target = None
self.jars = []

def _merge_dict(self, config):
Expand All @@ -198,9 +200,9 @@ def _merge_dict(self, config):
self.max_low_pri_nodes = cluster_configuration.get("size_low_priority")
self.subnet_id = cluster_configuration.get("subnet_id")
self.worker_on_master = cluster_configuration.get("worker_on_master")
scheduling_target = cluster_configuration.get("scheduling_target")
if scheduling_target:
self.scheduling_target = SchedulingTarget(scheduling_target)
# scheduling_target = cluster_configuration.get("scheduling_target")
# if scheduling_target:
# self.scheduling_target = SchedulingTarget(scheduling_target)

applications = config.get("applications")
if applications:
Expand Down
2 changes: 0 additions & 2 deletions aztk_cli/config/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ plugins:
# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

# wait: <true/false>
wait: false
2 changes: 0 additions & 2 deletions aztk_cli/config/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ job:
# Optional command line options to pass to `docker run`
# docker_run_options: <additional command line options to pass to `docker run` (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

spark_configuration:
spark_defaults_conf: .aztk/spark-defaults.conf
Expand Down
2 changes: 1 addition & 1 deletion aztk_cli/spark/endpoints/job/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def execute(args: typing.NamedTuple):
max_low_pri_nodes=job_conf.max_low_pri_nodes,
subnet_id=job_conf.subnet_id,
worker_on_master=job_conf.worker_on_master,
scheduling_target=job_conf.scheduling_target,
# scheduling_target=job_conf.scheduling_target,
)

# TODO: utils.print_job_conf(job_configuration)
Expand Down
6 changes: 4 additions & 2 deletions docs/13-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ plugins:
# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

# wait: <true/false>
wait: true
```
<!--- this goes about wait: true
# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated
-->
Running `aztk spark cluster create` will create a cluster of 4 **Standard\_A2** nodes called 'spark\_cluster' with a linux user named 'spark'. This is equivalent to running the command

Expand Down
25 changes: 13 additions & 12 deletions tests/models/test_cluster_configuration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest

from aztk.error import InvalidModelError
from aztk.models import (ClusterConfiguration, SchedulingTarget, Toolkit, UserConfiguration)
from aztk.models import ClusterConfiguration, Toolkit, UserConfiguration
# from aztk.models import SchedulingTarget
from aztk.spark.models.plugins import HDFSPlugin, JupyterPlugin


Expand Down Expand Up @@ -60,15 +61,15 @@ def test_cluster_configuration():
assert config.plugins[1].name == 'hdfs'


def test_scheduling_target_dedicated_with_no_dedicated_nodes_raise_error():
with pytest.raises(InvalidModelError, match="Scheduling target cannot be Dedicated if dedicated vm size is 0"):
conf = ClusterConfiguration(
cluster_id="abc",
scheduling_target=SchedulingTarget.Dedicated,
vm_size="standard_a2",
size=0,
size_low_priority=2,
toolkit=Toolkit(software="spark", version="1.6.3"),
)
# def test_scheduling_target_dedicated_with_no_dedicated_nodes_raise_error():
# with pytest.raises(InvalidModelError, match="Scheduling target cannot be Dedicated if dedicated vm size is 0"):
# conf = ClusterConfiguration(
# cluster_id="abc",
# scheduling_target=SchedulingTarget.Dedicated,
# vm_size="standard_a2",
# size=0,
# size_low_priority=2,
# toolkit=Toolkit(software="spark", version="1.6.3"),
# )

conf.validate()
# conf.validate()

0 comments on commit 93615d9

Please sign in to comment.