diff --git a/.travis.yml b/.travis.yml index f7de79f4..9e6c0006 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,7 @@ install: script: - yapf --style .style.yapf -dpr aztk/ aztk_cli/ - - pylint -E aztk + - pylint -j 2 -E aztk aztk_cli - pytest --ignore=tests/integration_tests branches: diff --git a/.vsts-ci.yml b/.vsts-ci.yml index f37bc4a6..a4c9f47a 100644 --- a/.vsts-ci.yml +++ b/.vsts-ci.yml @@ -24,7 +24,7 @@ phases: displayName: yapf - script: | - pylint -E aztk + pylint -j 2 -E aztk aztk_cli condition: succeeded() displayName: pylint diff --git a/aztk/client/base/base_operations.py b/aztk/client/base/base_operations.py index fb02924f..ea2652ef 100644 --- a/aztk/client/base/base_operations.py +++ b/aztk/client/base/base_operations.py @@ -24,7 +24,7 @@ def __init__(self, context): self.blob_client = context['blob_client'] self.secrets_configuration = context['secrets_configuration'] - def get_cluster_config(self, id: str) -> models.ClusterConfiguration: + def get_cluster_configuration(self, id: str) -> models.ClusterConfiguration: """Open an ssh tunnel to a node Args: diff --git a/aztk/spark/client/cluster/helpers/get_configuration.py b/aztk/spark/client/cluster/helpers/get_configuration.py new file mode 100644 index 00000000..fc2d8b46 --- /dev/null +++ b/aztk/spark/client/cluster/helpers/get_configuration.py @@ -0,0 +1,11 @@ +import azure.batch.models.batch_error as batch_error + +from aztk import error +from aztk.utils import helpers + + +def get_configuration(core_cluster_operations, cluster_id: str): + try: + return core_cluster_operations.get_cluster_configuration(cluster_id) + except batch_error.BatchErrorException as e: + raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/operations.py b/aztk/spark/client/cluster/operations.py index edc8fb9c..37e26c22 100644 --- a/aztk/spark/client/cluster/operations.py +++ b/aztk/spark/client/cluster/operations.py @@ -3,7 +3,8 @@ from aztk.spark.client.base import SparkBaseOperations from .helpers import (copy, create, create_user, delete, diagnostics, download, get, get_application_log, - get_application_status, get_remote_login_settings, list, node_run, run, submit, wait) + get_application_status, get_configuration, get_remote_login_settings, list, node_run, run, submit, + wait) class ClusterOperations(SparkBaseOperations): @@ -248,3 +249,14 @@ def wait(self, id: str, application_name: str): :obj:`None` """ return wait.wait_for_application_to_complete(self._core_cluster_operations, id, application_name) + + def get_configuration(self, id: str): + """Get the initial configuration of the cluster + + Args: + id (:obj:`str`): the id of the cluster + + Returns: + :obj:`aztk.spark.models.ClusterConfiguration` + """ + return get_configuration.get_configuration(self._core_cluster_operations, id) diff --git a/aztk_cli/spark/endpoints/cluster/cluster_get.py b/aztk_cli/spark/endpoints/cluster/cluster_get.py index f66cee4d..2bdf5ce4 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_get.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_get.py @@ -20,8 +20,9 @@ def execute(args: typing.NamedTuple): cluster = spark_client.cluster.get(cluster_id) utils.print_cluster(spark_client, cluster, args.internal) - configuration = spark_client.cluster.get_cluster_config(cluster_id) - if configuration and args.show_config: - log.info("-------------------------------------------") - log.info("Cluster configuration:") - utils.print_cluster_conf(configuration, False) + if args.show_config: + configuration = spark_client.cluster.get_configuration(cluster_id) + if configuration: + log.info("-------------------------------------------") + log.info("Cluster configuration:") + utils.print_cluster_conf(configuration, False) diff --git a/aztk_cli/spark/endpoints/cluster/cluster_ssh.py b/aztk_cli/spark/endpoints/cluster/cluster_ssh.py index ae085809..a0b7aa4a 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_ssh.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_ssh.py @@ -37,7 +37,7 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): spark_client = aztk.spark.Client(config.load_aztk_secrets()) cluster = spark_client.cluster.get(args.cluster_id) - cluster_config = spark_client.cluster.get_cluster_config(args.cluster_id) + cluster_configuration = spark_client.cluster.get_configuration(args.cluster_id) ssh_conf = SshConfig() ssh_conf.merge( @@ -55,21 +55,21 @@ def execute(args: typing.NamedTuple): utils.log_property("open webui", "{0}{1}".format(http_prefix, ssh_conf.web_ui_port)) utils.log_property("open jobui", "{0}{1}".format(http_prefix, ssh_conf.job_ui_port)) utils.log_property("open jobhistoryui", "{0}{1}".format(http_prefix, ssh_conf.job_history_ui_port)) - print_plugin_ports(cluster_config) + print_plugin_ports(cluster_configuration) utils.log_property("ssh username", ssh_conf.username) utils.log_property("connect", ssh_conf.connect) log.info("-------------------------------------------") try: - shell_out_ssh(spark_client, ssh_conf) + shell_out_ssh(spark_client, cluster_configuration, ssh_conf) except OSError: # no ssh client is found, falling back to pure python - native_python_ssh_into_master(spark_client, cluster, ssh_conf, args.password) + native_python_ssh_into_master(spark_client, cluster, cluster_configuration, ssh_conf, args.password) -def print_plugin_ports(cluster_config: ClusterConfiguration): - if cluster_config and cluster_config.plugins: - plugins = cluster_config.plugins +def print_plugin_ports(cluster_configuration: ClusterConfiguration): + if cluster_configuration and cluster_configuration.plugins: + plugins = cluster_configuration.plugins has_ports = False plugin_ports = {} for plugin in plugins: @@ -93,12 +93,12 @@ def print_plugin_ports(cluster_config: ClusterConfiguration): utils.log_property(label, url) -def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password): +def native_python_ssh_into_master(spark_client, cluster, cluster_configuration, ssh_conf, password): if not ssh_conf.connect: log.warning("No ssh client found, using pure python connection.") return - configuration = spark_client.cluster.get_cluster_config(cluster.id) + configuration = spark_client.cluster.get_configuration(cluster.id) plugin_ports = [] if configuration and configuration.plugins: ports = [ @@ -124,11 +124,12 @@ def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password): internal=ssh_conf.internal) -def shell_out_ssh(spark_client, ssh_conf): +def shell_out_ssh(spark_client, cluster_configuration, ssh_conf): try: ssh_cmd = utils.ssh_in_master( client=spark_client, cluster_id=ssh_conf.cluster_id, + cluster_configuration=cluster_configuration, webui=ssh_conf.web_ui_port, jobui=ssh_conf.job_ui_port, jobhistoryui=ssh_conf.job_history_ui_port, diff --git a/aztk_cli/utils.py b/aztk_cli/utils.py index 363d178a..a3f13080 100644 --- a/aztk_cli/utils.py +++ b/aztk_cli/utils.py @@ -130,6 +130,7 @@ def stream_logs(client, cluster_id, application_name): def ssh_in_master(client, cluster_id: str, + cluster_configuration: models.ClusterConfiguration, username: str = None, webui: str = None, jobui: str = None, @@ -152,7 +153,6 @@ def ssh_in_master(client, # Get master node id from task (job and task are both named pool_id) cluster = client.cluster.get(cluster_id) - configuration = client.cluster.get_cluster_config(cluster_id) master_node_id = cluster.master_node_id @@ -186,8 +186,8 @@ def ssh_in_master(client, if ports is not None: for port in ports: ssh_command.add_option("-L", "{0}:localhost:{1}".format(port[0], port[1])) - if configuration and configuration.plugins: - for plugin in configuration.plugins: + if cluster_configuration and cluster_configuration.plugins: + for plugin in cluster_configuration.plugins: for port in plugin.ports: if port.expose_publicly: ssh_command.add_option("-L", "{0}:localhost:{1}".format(port.public_port, port.internal))