Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: add internal flag to node commands (#482)
Browse files Browse the repository at this point in the history
* add internal ssh flag

* add --internal flag to cluster get

* cluster run internal flag

* fix add command back

* cluster copy internal

* fix method params

* fix method params

* add debug statement

* fix params

* remove debug statement

* fixes

* add debug statement

* remove debug statement

* add hostname to /etc/hosts

* remove hostname from /etc/hosts

* add sdk docs for internal switch in cluster run and copy
  • Loading branch information
jafreck authored Apr 6, 2018
1 parent be8cd2a commit 1eaa1b6
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 29 deletions.
14 changes: 10 additions & 4 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,13 @@ def __delete_user_on_pool(self, username, pool_id, nodes):
concurrent.futures.wait(futures)


def __cluster_run(self, cluster_id, container_name, command):
def __cluster_run(self, cluster_id, container_name, command, internal):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = [node for node in nodes]
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
if internal:
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
else:
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
try:
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
Expand All @@ -245,10 +248,13 @@ def __cluster_run(self, cluster_id, container_name, command):
finally:
self.__delete_user_on_pool('aztk', pool.id, nodes)

def __cluster_copy(self, cluster_id, container_name, source_path, destination_path):
def __cluster_copy(self, cluster_id, container_name, source_path, destination_path, internal):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = [node for node in nodes]
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
if internal:
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
else:
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
try:
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_copy(container_name=container_name,
Expand Down
8 changes: 4 additions & 4 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ def get_application_status(self, cluster_id: str, app_name: str):
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_run(self, cluster_id: str, command: str):
def cluster_run(self, cluster_id: str, command: str, internal: bool = False):
try:
return self.__cluster_run(cluster_id, 'spark', command)
return self.__cluster_run(cluster_id, 'spark', command, internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str):
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, internal: bool = False):
try:
return self.__cluster_copy(cluster_id, 'spark', source_path, destination_path)
return self.__cluster_copy(cluster_id, 'spark', source_path, destination_path, internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down
9 changes: 7 additions & 2 deletions aztk_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def __init__(self):
self.cluster_id = None
self.host = False
self.connect = True
self.internal = False

# Set up ports with default values
self.job_ui_port = '4040'
Expand Down Expand Up @@ -258,8 +259,11 @@ def _merge_dict(self, config):
if config.get('connect') is not None:
self.connect = config['connect']

if config.get('internal') is not None:
self.internal = config['internal']

def merge(self, cluster_id, username, job_ui_port, job_history_ui_port,
web_ui_port, host, connect):
web_ui_port, host, connect, internal):
"""
Merges fields with args object
"""
Expand All @@ -275,7 +279,8 @@ def merge(self, cluster_id, username, job_ui_port, job_history_ui_port,
job_history_ui_port=job_history_ui_port,
web_ui_port=web_ui_port,
host=host,
connect=connect))
connect=connect,
internal=internal))

if self.cluster_id is None:
raise aztk.error.AztkError(
Expand Down
3 changes: 3 additions & 0 deletions aztk_cli/config/ssh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ web_ui_port: 8080

# connect: <true/false, connect to spark master or print connection string (--no-connect)>
connect: true

# internal: <true/false, connect to the spark master using the internal IP address. Only use if using VPN>
internal: false
6 changes: 5 additions & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--dest-path', required=True,
help='the path the file will be copied to on each node in the cluster.'\
'Note that this must include the file name.')
parser.add_argument('--internal', action='store_true',
help='Connect using the local IP of the master node. Only use if using a VPN.')
parser.set_defaults(internal=False)


def execute(args: typing.NamedTuple):
Expand All @@ -23,5 +26,6 @@ def execute(args: typing.NamedTuple):
spark_client.cluster_copy(
cluster_id=args.cluster_id,
source_path=args.source_path,
destination_path=args.dest_path
destination_path=args.dest_path,
internal=args.internal
)
6 changes: 5 additions & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ def setup_parser(parser: argparse.ArgumentParser):
dest='show_config',
action='store_true',
help='Show the cluster configuration')
parser.add_argument('--internal', action='store_true',
help="Show the local IP of the nodes. "\
"Only use if using connecting with a VPN.")
parser.set_defaults(internal=False)


def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets())
cluster_id = args.cluster_id
cluster = spark_client.get_cluster(cluster_id)
utils.print_cluster(spark_client, cluster)
utils.print_cluster(spark_client, cluster, args.internal)

configuration = spark_client.get_cluster_config(cluster_id)
if configuration and args.show_config:
Expand Down
6 changes: 5 additions & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ def setup_parser(parser: argparse.ArgumentParser):
help='The unique id of your spark cluster')
parser.add_argument('command',
help='The command to run on your spark cluster')
parser.add_argument('--internal', action='store_true',
help='Connect using the local IP of the master node. Only use if using a VPN.')
parser.set_defaults(internal=False)


def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets())
result = spark_client.cluster_run(args.cluster_id, args.command)
result = spark_client.cluster_run(args.cluster_id, args.command, args.internal)
#TODO: pretty print result
19 changes: 9 additions & 10 deletions aztk_cli/spark/endpoints/cluster/cluster_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--jobhistoryui', help='Local port to port spark\'s job history UI to')
parser.add_argument('-u', '--username', help='Username to spark cluster')
parser.add_argument('--host', dest="host", action='store_true', help='Connect to the host of the Spark container')
parser.add_argument(
'--no-connect',
dest="connect",
action='store_false',
help='Do not create the ssh session. Only print out \
the command to run.')

parser.set_defaults(connect=True)
parser.add_argument('--no-connect', dest="connect", action='store_false',
help='Do not create the ssh session. Only print out the command to run.')
parser.add_argument('--internal', action='store_true',
help='Connect using the local IP of the master node. Only use if using a VPN.')
parser.set_defaults(connect=True, internal=False)


http_prefix = 'http://localhost:'
Expand All @@ -42,7 +39,8 @@ def execute(args: typing.NamedTuple):
job_history_ui_port=args.jobhistoryui,
web_ui_port=args.webui,
host=args.host,
connect=args.connect)
connect=args.connect,
internal=args.internal)

log.info("-------------------------------------------")
utils.log_property("spark cluster id", ssh_conf.cluster_id)
Expand All @@ -64,7 +62,8 @@ def execute(args: typing.NamedTuple):
jobhistoryui=ssh_conf.job_history_ui_port,
username=ssh_conf.username,
host=ssh_conf.host,
connect=ssh_conf.connect)
connect=ssh_conf.connect,
internal=ssh_conf.internal)

if not ssh_conf.connect:
log.info("")
Expand Down
24 changes: 18 additions & 6 deletions aztk_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_ssh_key_or_prompt(ssh_key, username, password, secrets_config):
raise error.AztkError("Failed to get valid password, cannot add user to cluster. It is recommended that you provide a ssh public key in .aztk/secrets.yaml. Or provide an ssh-key or password with commnad line parameters (--ssh-key or --password). You may also run the 'aztk spark cluster add-user' command to add a user to this cluster.")
return ssh_key, password

def print_cluster(client, cluster: models.Cluster):
def print_cluster(client, cluster: models.Cluster, internal: bool = False):
node_count = __pretty_node_count(cluster)

log.info("")
Expand All @@ -48,18 +48,25 @@ def print_cluster(client, cluster: models.Cluster):

print_format = '|{:^36}| {:^19} | {:^21}| {:^10} | {:^8} |'
print_format_underline = '|{:-^36}|{:-^21}|{:-^22}|{:-^12}|{:-^10}|'
log.info(print_format.format("Nodes", "State", "IP:Port", "Dedicated", "Master"))
if internal:
log.info(print_format.format("Nodes", "State", "IP", "Dedicated", "Master"))
else:
log.info(print_format.format("Nodes", "State", "IP:Port", "Dedicated", "Master"))
log.info(print_format_underline.format('', '', '', '', ''))

if not cluster.nodes:
return
for node in cluster.nodes:
remote_login_settings = client.get_remote_login_settings(cluster.id, node.id)
if internal:
ip = node.ip_address
else:
ip ='{}:{}'.format(remote_login_settings.ip_address, remote_login_settings.port)
log.info(
print_format.format(
node.id,
node.state.value,
'{}:{}'.format(remote_login_settings.ip_address, remote_login_settings.port),
ip,
"*" if node.is_dedicated else '',
'*' if node.id == cluster.master_node_id else '')
)
Expand Down Expand Up @@ -134,7 +141,8 @@ def ssh_in_master(
jobhistoryui: str = None,
ports=None,
host: bool = False,
connect: bool = True):
connect: bool = True,
internal: bool = False):
"""
SSH into head node of spark-app
:param cluster_id: Id of the cluster to ssh in
Expand All @@ -156,6 +164,7 @@ def ssh_in_master(

# get remote login settings for the user
remote_login_settings = client.get_remote_login_settings(cluster.id, master_node_id)
master_internal_node_ip = [node.ip_address for node in cluster.nodes if node.id == master_node_id][0]
master_node_ip = remote_login_settings.ip_address
master_node_port = remote_login_settings.port

Expand Down Expand Up @@ -190,8 +199,11 @@ def ssh_in_master(
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port.public_port, port.internal))

user = username if username is not None else '<username>'
ssh_command.add_argument(
"{0}@{1} -p {2}".format(user, master_node_ip, master_node_port))
if internal:
ssh_command.add_argument("{0}@{1}".format(user, master_internal_node_ip))
else:
ssh_command.add_argument(
"{0}@{1} -p {2}".format(user, master_node_ip, master_node_port))

if host is False:
ssh_command.add_argument("\'sudo docker exec -it spark /bin/bash\'")
Expand Down
26 changes: 26 additions & 0 deletions docs/50-sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ Find some samples and getting stated tutorial in the `examples/sdk/` directory o

### Client

- `cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, internal: bool = False)`

Copy a file to every node in the given cluster

Parameters:
- cluster_id: str
- the id of the cluster
- source_path: str
- the local path to the file to be copied
- destination_path: str
- the path (including the file name) that the file should be placed on each node.
- internal: bool
- if True, connects to the cluster using the local IP. Only set to True if the node's internal IP address is resolvable by the client.

- `cluster_run(self, cluster_id: str, command: str, internal: bool = False)`

Run a command on every node in the given cluster

Parameters:
- cluster_id: str
- the id of the cluster
- command: str
- the command to run on each node
- internal: bool
- if True, connects to the cluster using the local IP. Only set to True if the node's internal IP address is resolvable by the client.

- `create_cluster(self, cluster_conf: aztk.spark.models.ClusterConfiguration, wait=False)`

Create an AZTK cluster with the given cluster configuration
Expand Down

0 comments on commit 1eaa1b6

Please sign in to comment.