Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Fix: expose get cluster configuration API (#648)
Browse files Browse the repository at this point in the history
* fix get and ssh cli calls, add get_configuration api

* update builds to lint aztk_cli in parallel

* remove unnecessary get_configuration calls
  • Loading branch information
jafreck authored Aug 17, 2018
1 parent 9098533 commit 7c14648
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ install:

script:
- yapf --style .style.yapf -dpr aztk/ aztk_cli/
- pylint -E aztk
- pylint -j 2 -E aztk aztk_cli
- pytest --ignore=tests/integration_tests

branches:
Expand Down
2 changes: 1 addition & 1 deletion .vsts-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ phases:
displayName: yapf
- script: |
pylint -E aztk
pylint -j 2 -E aztk aztk_cli
condition: succeeded()
displayName: pylint
Expand Down
2 changes: 1 addition & 1 deletion aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, context):
self.blob_client = context['blob_client']
self.secrets_configuration = context['secrets_configuration']

def get_cluster_config(self, id: str) -> models.ClusterConfiguration:
def get_cluster_configuration(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node
Args:
Expand Down
11 changes: 11 additions & 0 deletions aztk/spark/client/cluster/helpers/get_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import azure.batch.models.batch_error as batch_error

from aztk import error
from aztk.utils import helpers


def get_configuration(core_cluster_operations, cluster_id: str):
try:
return core_cluster_operations.get_cluster_configuration(cluster_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
14 changes: 13 additions & 1 deletion aztk/spark/client/cluster/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from aztk.spark.client.base import SparkBaseOperations

from .helpers import (copy, create, create_user, delete, diagnostics, download, get, get_application_log,
get_application_status, get_remote_login_settings, list, node_run, run, submit, wait)
get_application_status, get_configuration, get_remote_login_settings, list, node_run, run, submit,
wait)


class ClusterOperations(SparkBaseOperations):
Expand Down Expand Up @@ -248,3 +249,14 @@ def wait(self, id: str, application_name: str):
:obj:`None`
"""
return wait.wait_for_application_to_complete(self._core_cluster_operations, id, application_name)

def get_configuration(self, id: str):
"""Get the initial configuration of the cluster
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`aztk.spark.models.ClusterConfiguration`
"""
return get_configuration.get_configuration(self._core_cluster_operations, id)
11 changes: 6 additions & 5 deletions aztk_cli/spark/endpoints/cluster/cluster_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def execute(args: typing.NamedTuple):
cluster = spark_client.cluster.get(cluster_id)
utils.print_cluster(spark_client, cluster, args.internal)

configuration = spark_client.cluster.get_cluster_config(cluster_id)
if configuration and args.show_config:
log.info("-------------------------------------------")
log.info("Cluster configuration:")
utils.print_cluster_conf(configuration, False)
if args.show_config:
configuration = spark_client.cluster.get_configuration(cluster_id)
if configuration:
log.info("-------------------------------------------")
log.info("Cluster configuration:")
utils.print_cluster_conf(configuration, False)
21 changes: 11 additions & 10 deletions aztk_cli/spark/endpoints/cluster/cluster_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets())
cluster = spark_client.cluster.get(args.cluster_id)
cluster_config = spark_client.cluster.get_cluster_config(args.cluster_id)
cluster_configuration = spark_client.cluster.get_configuration(args.cluster_id)
ssh_conf = SshConfig()

ssh_conf.merge(
Expand All @@ -55,21 +55,21 @@ def execute(args: typing.NamedTuple):
utils.log_property("open webui", "{0}{1}".format(http_prefix, ssh_conf.web_ui_port))
utils.log_property("open jobui", "{0}{1}".format(http_prefix, ssh_conf.job_ui_port))
utils.log_property("open jobhistoryui", "{0}{1}".format(http_prefix, ssh_conf.job_history_ui_port))
print_plugin_ports(cluster_config)
print_plugin_ports(cluster_configuration)
utils.log_property("ssh username", ssh_conf.username)
utils.log_property("connect", ssh_conf.connect)
log.info("-------------------------------------------")

try:
shell_out_ssh(spark_client, ssh_conf)
shell_out_ssh(spark_client, cluster_configuration, ssh_conf)
except OSError:
# no ssh client is found, falling back to pure python
native_python_ssh_into_master(spark_client, cluster, ssh_conf, args.password)
native_python_ssh_into_master(spark_client, cluster, cluster_configuration, ssh_conf, args.password)


def print_plugin_ports(cluster_config: ClusterConfiguration):
if cluster_config and cluster_config.plugins:
plugins = cluster_config.plugins
def print_plugin_ports(cluster_configuration: ClusterConfiguration):
if cluster_configuration and cluster_configuration.plugins:
plugins = cluster_configuration.plugins
has_ports = False
plugin_ports = {}
for plugin in plugins:
Expand All @@ -93,12 +93,12 @@ def print_plugin_ports(cluster_config: ClusterConfiguration):
utils.log_property(label, url)


def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
def native_python_ssh_into_master(spark_client, cluster, cluster_configuration, ssh_conf, password):
if not ssh_conf.connect:
log.warning("No ssh client found, using pure python connection.")
return

configuration = spark_client.cluster.get_cluster_config(cluster.id)
configuration = spark_client.cluster.get_configuration(cluster.id)
plugin_ports = []
if configuration and configuration.plugins:
ports = [
Expand All @@ -124,11 +124,12 @@ def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
internal=ssh_conf.internal)


def shell_out_ssh(spark_client, ssh_conf):
def shell_out_ssh(spark_client, cluster_configuration, ssh_conf):
try:
ssh_cmd = utils.ssh_in_master(
client=spark_client,
cluster_id=ssh_conf.cluster_id,
cluster_configuration=cluster_configuration,
webui=ssh_conf.web_ui_port,
jobui=ssh_conf.job_ui_port,
jobhistoryui=ssh_conf.job_history_ui_port,
Expand Down
6 changes: 3 additions & 3 deletions aztk_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def stream_logs(client, cluster_id, application_name):

def ssh_in_master(client,
cluster_id: str,
cluster_configuration: models.ClusterConfiguration,
username: str = None,
webui: str = None,
jobui: str = None,
Expand All @@ -152,7 +153,6 @@ def ssh_in_master(client,

# Get master node id from task (job and task are both named pool_id)
cluster = client.cluster.get(cluster_id)
configuration = client.cluster.get_cluster_config(cluster_id)

master_node_id = cluster.master_node_id

Expand Down Expand Up @@ -186,8 +186,8 @@ def ssh_in_master(client,
if ports is not None:
for port in ports:
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port[0], port[1]))
if configuration and configuration.plugins:
for plugin in configuration.plugins:
if cluster_configuration and cluster_configuration.plugins:
for plugin in cluster_configuration.plugins:
for port in plugin.ports:
if port.expose_publicly:
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port.public_port, port.internal))
Expand Down

0 comments on commit 7c14648

Please sign in to comment.