Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Fix: expose get cluster configuration API #648

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ install:

script:
- yapf --style .style.yapf -dpr aztk/ aztk_cli/
- pylint -E aztk
- pylint -j 2 -E aztk aztk_cli
- pytest --ignore=tests/integration_tests

branches:
Expand Down
2 changes: 1 addition & 1 deletion .vsts-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ phases:
displayName: yapf

- script: |
pylint -E aztk
pylint -j 2 -E aztk aztk_cli
condition: succeeded()
displayName: pylint

Expand Down
2 changes: 1 addition & 1 deletion aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, context):
self.blob_client = context['blob_client']
self.secrets_configuration = context['secrets_configuration']

def get_cluster_config(self, id: str) -> models.ClusterConfiguration:
def get_cluster_configuration(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node

Args:
Expand Down
11 changes: 11 additions & 0 deletions aztk/spark/client/cluster/helpers/get_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import azure.batch.models.batch_error as batch_error

from aztk import error
from aztk.utils import helpers


def get_configuration(core_cluster_operations, cluster_id: str):
try:
return core_cluster_operations.get_cluster_configuration(cluster_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
14 changes: 13 additions & 1 deletion aztk/spark/client/cluster/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from aztk.spark.client.base import SparkBaseOperations

from .helpers import (copy, create, create_user, delete, diagnostics, download, get, get_application_log,
get_application_status, get_remote_login_settings, list, node_run, run, submit, wait)
get_application_status, get_configuration, get_remote_login_settings, list, node_run, run, submit,
wait)


class ClusterOperations(SparkBaseOperations):
Expand Down Expand Up @@ -248,3 +249,14 @@ def wait(self, id: str, application_name: str):
:obj:`None`
"""
return wait.wait_for_application_to_complete(self._core_cluster_operations, id, application_name)

def get_configuration(self, id: str):
"""Get the initial configuration of the cluster

Args:
id (:obj:`str`): the id of the cluster

Returns:
:obj:`aztk.spark.models.ClusterConfiguration`
"""
return get_configuration.get_configuration(self._core_cluster_operations, id)
11 changes: 6 additions & 5 deletions aztk_cli/spark/endpoints/cluster/cluster_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def execute(args: typing.NamedTuple):
cluster = spark_client.cluster.get(cluster_id)
utils.print_cluster(spark_client, cluster, args.internal)

configuration = spark_client.cluster.get_cluster_config(cluster_id)
if configuration and args.show_config:
log.info("-------------------------------------------")
log.info("Cluster configuration:")
utils.print_cluster_conf(configuration, False)
if args.show_config:
configuration = spark_client.cluster.get_configuration(cluster_id)
if configuration:
log.info("-------------------------------------------")
log.info("Cluster configuration:")
utils.print_cluster_conf(configuration, False)
21 changes: 11 additions & 10 deletions aztk_cli/spark/endpoints/cluster/cluster_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets())
cluster = spark_client.cluster.get(args.cluster_id)
cluster_config = spark_client.cluster.get_cluster_config(args.cluster_id)
cluster_configuration = spark_client.cluster.get_configuration(args.cluster_id)
ssh_conf = SshConfig()

ssh_conf.merge(
Expand All @@ -55,21 +55,21 @@ def execute(args: typing.NamedTuple):
utils.log_property("open webui", "{0}{1}".format(http_prefix, ssh_conf.web_ui_port))
utils.log_property("open jobui", "{0}{1}".format(http_prefix, ssh_conf.job_ui_port))
utils.log_property("open jobhistoryui", "{0}{1}".format(http_prefix, ssh_conf.job_history_ui_port))
print_plugin_ports(cluster_config)
print_plugin_ports(cluster_configuration)
utils.log_property("ssh username", ssh_conf.username)
utils.log_property("connect", ssh_conf.connect)
log.info("-------------------------------------------")

try:
shell_out_ssh(spark_client, ssh_conf)
shell_out_ssh(spark_client, cluster_configuration, ssh_conf)
except OSError:
# no ssh client is found, falling back to pure python
native_python_ssh_into_master(spark_client, cluster, ssh_conf, args.password)
native_python_ssh_into_master(spark_client, cluster, cluster_configuration, ssh_conf, args.password)


def print_plugin_ports(cluster_config: ClusterConfiguration):
if cluster_config and cluster_config.plugins:
plugins = cluster_config.plugins
def print_plugin_ports(cluster_configuration: ClusterConfiguration):
if cluster_configuration and cluster_configuration.plugins:
plugins = cluster_configuration.plugins
has_ports = False
plugin_ports = {}
for plugin in plugins:
Expand All @@ -93,12 +93,12 @@ def print_plugin_ports(cluster_config: ClusterConfiguration):
utils.log_property(label, url)


def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
def native_python_ssh_into_master(spark_client, cluster, cluster_configuration, ssh_conf, password):
if not ssh_conf.connect:
log.warning("No ssh client found, using pure python connection.")
return

configuration = spark_client.cluster.get_cluster_config(cluster.id)
configuration = spark_client.cluster.get_configuration(cluster.id)
plugin_ports = []
if configuration and configuration.plugins:
ports = [
Expand All @@ -124,11 +124,12 @@ def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
internal=ssh_conf.internal)


def shell_out_ssh(spark_client, ssh_conf):
def shell_out_ssh(spark_client, cluster_configuration, ssh_conf):
try:
ssh_cmd = utils.ssh_in_master(
client=spark_client,
cluster_id=ssh_conf.cluster_id,
cluster_configuration=cluster_configuration,
webui=ssh_conf.web_ui_port,
jobui=ssh_conf.job_ui_port,
jobhistoryui=ssh_conf.job_history_ui_port,
Expand Down
6 changes: 3 additions & 3 deletions aztk_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def stream_logs(client, cluster_id, application_name):

def ssh_in_master(client,
cluster_id: str,
cluster_configuration: models.ClusterConfiguration,
username: str = None,
webui: str = None,
jobui: str = None,
Expand All @@ -152,7 +153,6 @@ def ssh_in_master(client,

# Get master node id from task (job and task are both named pool_id)
cluster = client.cluster.get(cluster_id)
configuration = client.cluster.get_cluster_config(cluster_id)

master_node_id = cluster.master_node_id

Expand Down Expand Up @@ -186,8 +186,8 @@ def ssh_in_master(client,
if ports is not None:
for port in ports:
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port[0], port[1]))
if configuration and configuration.plugins:
for plugin in configuration.plugins:
if cluster_configuration and cluster_configuration.plugins:
for plugin in cluster_configuration.plugins:
for port in plugin.ports:
if port.expose_publicly:
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port.public_port, port.internal))
Expand Down