Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Feature: SDK refactor #622

Merged
merged 52 commits into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a88f087
start refactor
jafreck Jun 23, 2018
35da215
continue refactor for cluster and job functions
jafreck Jun 25, 2018
0da3f73
fix imports
jafreck Jun 25, 2018
1a821b7
fixes
jafreck Jun 26, 2018
5f1542d
fixes
jafreck Jun 26, 2018
f1faed9
refactor integration test secrets management
jafreck Jun 27, 2018
202591d
fix cluster create, add new test
jafreck Jun 27, 2018
4fa150e
add tests for new sdk api and fix bugs
jafreck Jun 27, 2018
a47ab07
fix naming and bugs
jafreck Jun 27, 2018
457adfa
update job operations naming, bug fixes
jafreck Jun 27, 2018
880e66d
fix cluster tests
jafreck Jun 27, 2018
1d07be9
fix joboperations and tests
jafreck Jun 28, 2018
8c3b289
update cli and fix some bugs
jafreck Jul 2, 2018
bd16653
start fixes
jafreck Jul 3, 2018
adb0e0a
fix pylint errors, bugs
jafreck Jul 9, 2018
6830614
add deprecated warning checks, rename tests
jafreck Jul 9, 2018
52d27f9
add docstrings for baseoperations
jafreck Jul 10, 2018
581f9c8
add docstrings
jafreck Jul 10, 2018
b159c61
docstrings, add back compat for coreclient, fix init for spark client
jafreck Jul 10, 2018
c6da487
whitespace
jafreck Jul 10, 2018
4aa8774
docstrings, whitespace
jafreck Jul 10, 2018
8cf4667
docstrings, fixes
jafreck Jul 10, 2018
b4c13e1
docstrings, fixes
jafreck Jul 10, 2018
5769e71
fix the sdk documentation, bugs
jafreck Jul 10, 2018
2342326
fix method call
jafreck Jul 10, 2018
b3af9ec
pool_id->id
jafreck Jul 12, 2018
9b18ac5
rename ids
jafreck Jul 12, 2018
be7c408
cluster_id->id
jafreck Jul 12, 2018
935bf71
cluster_id->id
jafreck Jul 12, 2018
15111aa
add todo
jafreck Jul 12, 2018
e585751
fixes
jafreck Jul 12, 2018
dd2b6ac
add some todos
jafreck Jul 12, 2018
a737237
rename pool to cluster, add todo for nodes params
jafreck Jul 13, 2018
7042be9
add todos for nodes param removal
jafreck Jul 13, 2018
32c9d2c
update functions names
jafreck Jul 13, 2018
cfdf132
remove deprecated fucntion calls
jafreck Jul 13, 2018
d48f7e5
update docs and docstrings
jafreck Jul 17, 2018
c2de9b3
update docstrings
jafreck Jul 17, 2018
d6760c1
get rid of TODOs, fix docstrings
jafreck Jul 17, 2018
a418a37
remove unused setting
jafreck Jul 18, 2018
4749017
inheritance -> composition
jafreck Jul 21, 2018
2d0700c
fix models bugs
jafreck Jul 30, 2018
be31eac
fix create_user bug
jafreck Jul 30, 2018
f179b0d
update sdk_example.py
jafreck Jul 30, 2018
19865af
fix create user argument issue
jafreck Jul 30, 2018
96f46ee
update sdk_example.py
jafreck Jul 30, 2018
0a734a5
update doc
jafreck Jul 30, 2018
159566a
use Software model instead of string
jafreck Aug 2, 2018
42fed0f
add job wait flag, add cluster application wait functions
jafreck Aug 2, 2018
ecfda50
add docs for wait, update tests
jafreck Aug 2, 2018
a27e371
fix bug
jafreck Aug 2, 2018
6c6767c
add clientrequesterror catch to fix tests
jafreck Aug 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .style.yapf
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ based_on_style=pep8
spaces_before_comment=4
split_before_logical_operator=True
indent_width=4
column_limit=140
column_limit=120
split_arguments_when_comma_terminated=True
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
"python.formatting.provider": "yapf",
"python.venvPath": "${workspaceFolder}/.venv/",
"python.pythonPath": "${workspaceFolder}/.venv/Scripts/python.exe",
"python.unitTest.pyTestEnabled": true
"python.unitTest.pyTestEnabled": true,
}
1 change: 1 addition & 0 deletions aztk/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .client import CoreClient
1 change: 1 addition & 0 deletions aztk/client/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .base_operations import BaseOperations
223 changes: 223 additions & 0 deletions aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from aztk import models
from aztk.internal import cluster_data
from aztk.utils import ssh as ssh_lib

from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate on each line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what VSCode's "sort imports" spits out. Unfortunately, yapf doesn't do import formatting (or comment/docstring formatting).

I think we should align to some standard import formatter, but not sure the vscode python extension one is the right tool.

generate_user_on_cluster, generate_user_on_node, get_application_log, get_remote_login_settings,
node_run, run, ssh_into_node)


class BaseOperations:
"""Base operations that all other operations have as an attribute

Attributes:
batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the
Azure Batch service.
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`): Model that holds AZTK secrets used to authenticate
with Azure and the clusters.
"""

def __init__(self, context):
self.batch_client = context['batch_client']
self.blob_client = context['blob_client']
self.secrets_configuration = context['secrets_configuration']

def get_cluster_config(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node

Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key
or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.

Returns:
:obj:`aztk.models.ClusterConfiguration`: Object representing the cluster's configuration
"""
return self.get_cluster_data(id).read_cluster_config()

def get_cluster_data(self, id: str) -> cluster_data.ClusterData:
"""Gets the ClusterData object to manage data related to the given cluster

Args:
id (:obj:`str`): the id of the cluster to get

Returns:
:obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster
"""
return cluster_data.ClusterData(self.blob_client, id)

def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
"""Open an ssh tunnel to a node

Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.

Returns:
:obj:`None`
"""
ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal)

def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None):
"""Create a user on a node

Args:
id (:obj:`str`): id of the cluster to create the user on.
node_id (:obj:`str`): id of the node in the cluster to create the user on.
username (:obj:`str`): name of the user to create.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
password (:obj:`str`, optional): password for the user, must use ssh_key or password.

Returns:
:obj:`None`
"""
return create_user_on_node.create_user_on_node(self, id, node_id, username, ssh_key, password)

#TODO: remove nodes as param
def create_user_on_cluster(self, id, nodes, username, ssh_pub_key=None, password=None):
"""Create a user on every node in the cluster

Args:
username (:obj:`str`): name of the user to create.
id (:obj:`str`): id of the cluster to create the user on.
nodes (:obj:`List[ComputeNode]`): list of nodes to create the user on
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.

Returns:
:obj:`None`
"""
return create_user_on_cluster.create_user_on_cluster(self, id, nodes, username, ssh_pub_key, password)

def generate_user_on_node(self, id, node_id):
"""Create a user with an autogenerated username and ssh_key on the given node.

Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.

Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_node.generate_user_on_node(self, id, node_id)

#TODO: remove nodes as param
def generate_user_on_cluster(self, id, nodes):
"""Create a user with an autogenerated username and ssh_key on the cluster

Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.

Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_cluster.generate_user_on_cluster(self, id, nodes)

def delete_user_on_node(self, id: str, node_id: str, username: str) -> str:
"""Delete a user on a node

Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.

Returns:
:obj:`None`
"""
return delete_user_on_node.delete_user(self, id, node_id, username)

#TODO: remove nodes as param
def delete_user_on_cluster(self, username, id, nodes):
"""Delete a user on every node in the cluster

Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.

Returns:
:obj:`None`
"""
return delete_user_on_cluster.delete_user_on_cluster(self, username, id, nodes)

def node_run(self, id, node_id, command, internal, container_name=None, timeout=None):
"""Run a bash command on the given node

Args:
id (:obj:`str`): the id of the cluster to run the command on.
node_id (:obj:`str`): the id of the node in the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.

Returns:
:obj:`aztk.models.NodeOutput`: object containing the output of the run command
"""
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout)

def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster

Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node in the cluster

Returns:
:obj:`aztk.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node
"""
return get_remote_login_settings.get_remote_login_settings(self, id, node_id)

def run(self, id, command, internal, container_name=None, timeout=None):
"""Run a bash command on every node in the cluster

Args:
id (:obj:`str`): the id of the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if true, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.

Returns:
:obj:`List[azkt.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command
"""
return run.cluster_run(self, id, command, internal, container_name, timeout)

def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0):
"""Get the log for a running or completed application

Args:
id (:obj:`str`): the id of the cluster to run the command on.
application_name (:obj:`str`): str
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved.
Only use this if streaming the log as it is being written. Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved.
Only useful is streaming the log as it is being written. Only used if tail is True.

Returns:
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self, id, application_name, tail, current_bytes)
Empty file.
11 changes: 11 additions & 0 deletions aztk/client/base/helpers/create_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import concurrent.futures


#TODO: remove nodes param
def create_user_on_cluster(base_operations, id, nodes, username, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, username, ssh_pub_key, password): node
for node in nodes
}
concurrent.futures.wait(futures)
42 changes: 42 additions & 0 deletions aztk/client/base/helpers/create_user_on_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datetime import datetime, timedelta, timezone

import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error

from aztk import models
from aztk.utils import get_ssh_key


def __create_user(self, id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong format of args

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are the old deprecated methods, I'm going to leave the docstrings as is (since they will be removed soon).

All of the new user-facing functions have docstrings in the proper format.

:param node: the node to add the user to
:param username: username of the user to add
:param password: password of the user to add
:param ssh_key: ssh_key of the user to add
"""
# Create new ssh user for the given node
self.batch_client.compute_node.add_user(
id,
node_id,
batch_models.ComputeNodeUser(
name=username,
is_admin=True,
password=password,
ssh_public_key=get_ssh_key.get_user_public_key(ssh_key, self.secrets_configuration),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365),
),
)


def create_user_on_node(base_client, id, node_id, username, ssh_key=None, password=None):
try:
__create_user(
base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
except batch_error.BatchErrorException as error:
try:
base_client.delete_user_on_node(id, node_id, username)
base_client.create_user_on_node(id=id, node_id=node_id, username=username, ssh_key=ssh_key)
except batch_error.BatchErrorException as error:
raise error
7 changes: 7 additions & 0 deletions aztk/client/base/helpers/delete_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import concurrent.futures

#TODO: remove nodes param
def delete_user_on_cluster(base_client, id, nodes, username):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(base_client.delete_user_on_node, id, node.id, username) for node in nodes]
concurrent.futures.wait(futures)
9 changes: 9 additions & 0 deletions aztk/client/base/helpers/delete_user_on_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
def delete_user(self, pool_id: str, node_id: str, username: str) -> str:
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't looks like to be too much of those left. Could change them all now to the new format

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring is not published. I can update it, but it will only be discoverable in the source.

Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
"""
# Delete a user on the given node
self.batch_client.compute_node.delete_user(pool_id, node_id, username)
20 changes: 20 additions & 0 deletions aztk/client/base/helpers/generate_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import concurrent.futures

from Cryptodome.PublicKey import RSA

from aztk.utils import secure_utils


#TODO: remove nodes param
def generate_user_on_cluster(base_operations, id, nodes):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, generated_username, ssh_pub_key): node
for node in nodes
}
concurrent.futures.wait(futures)

return generated_username, ssh_key
11 changes: 11 additions & 0 deletions aztk/client/base/helpers/generate_user_on_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from Cryptodome.PublicKey import RSA

from aztk.utils import secure_utils


def generate_user_on_node(base_client, pool_id, node_id):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
base_client.create_user_on_node(pool_id, node_id, generated_username, ssh_pub_key)
return generated_username, ssh_key
Loading