-
Notifications
You must be signed in to change notification settings - Fork 66
Feature: SDK refactor #622
Changes from 47 commits
a88f087
35da215
0da3f73
1a821b7
5f1542d
f1faed9
202591d
4fa150e
a47ab07
457adfa
880e66d
1d07be9
8c3b289
bd16653
adb0e0a
6830614
52d27f9
581f9c8
b159c61
c6da487
4aa8774
8cf4667
b4c13e1
5769e71
2342326
b3af9ec
9b18ac5
be7c408
935bf71
15111aa
e585751
dd2b6ac
a737237
7042be9
32c9d2c
cfdf132
d48f7e5
c2de9b3
d6760c1
a418a37
4749017
2d0700c
be31eac
f179b0d
19865af
96f46ee
0a734a5
159566a
42fed0f
ecfda50
a27e371
6c6767c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .client import CoreClient |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .base_operations import BaseOperations |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
from aztk import models | ||
from aztk.internal import cluster_data | ||
from aztk.utils import ssh as ssh_lib | ||
|
||
from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node, | ||
generate_user_on_cluster, generate_user_on_node, get_application_log, get_remote_login_settings, | ||
node_run, run, ssh_into_node) | ||
|
||
|
||
class BaseOperations: | ||
"""Base operations that all other operations have as an attribute | ||
|
||
Attributes: | ||
batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the | ||
Azure Batch service. | ||
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage | ||
Blob service. | ||
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`): Model that holds AZTK secrets used to authenticate | ||
with Azure and the clusters. | ||
""" | ||
|
||
def __init__(self, context): | ||
self.batch_client = context['batch_client'] | ||
self.blob_client = context['blob_client'] | ||
self.secrets_configuration = context['secrets_configuration'] | ||
|
||
def get_cluster_config(self, id: str) -> models.ClusterConfiguration: | ||
"""Open an ssh tunnel to a node | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster the node is in | ||
node_id (:obj:`str`): the id of the node to open the ssh tunnel to | ||
username (:obj:`str`): the username to authenticate the ssh session | ||
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key | ||
or password. Defaults to None. | ||
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None. | ||
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications. | ||
The defined ports will be forwarded to the client. | ||
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP. | ||
Only use this if running within the same VNET as the cluster. Defaults to False. | ||
|
||
Returns: | ||
:obj:`aztk.models.ClusterConfiguration`: Object representing the cluster's configuration | ||
""" | ||
return self.get_cluster_data(id).read_cluster_config() | ||
|
||
def get_cluster_data(self, id: str) -> cluster_data.ClusterData: | ||
"""Gets the ClusterData object to manage data related to the given cluster | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to get | ||
|
||
Returns: | ||
:obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster | ||
""" | ||
return cluster_data.ClusterData(self.blob_client, id) | ||
|
||
def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False): | ||
"""Open an ssh tunnel to a node | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster the node is in | ||
node_id (:obj:`str`): the id of the node to open the ssh tunnel to | ||
username (:obj:`str`): the username to authenticate the ssh session | ||
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None. | ||
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None. | ||
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications. | ||
The defined ports will be forwarded to the client. | ||
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP. | ||
Only use this if running within the same VNET as the cluster. Defaults to False. | ||
|
||
Returns: | ||
:obj:`None` | ||
""" | ||
ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal) | ||
|
||
def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None): | ||
"""Create a user on a node | ||
|
||
Args: | ||
id (:obj:`str`): id of the cluster to create the user on. | ||
node_id (:obj:`str`): id of the node in the cluster to create the user on. | ||
username (:obj:`str`): name of the user to create. | ||
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. | ||
password (:obj:`str`, optional): password for the user, must use ssh_key or password. | ||
|
||
Returns: | ||
:obj:`None` | ||
""" | ||
return create_user_on_node.create_user_on_node(self, id, node_id, username, ssh_key, password) | ||
|
||
#TODO: remove nodes as param | ||
def create_user_on_cluster(self, id, nodes, username, ssh_pub_key=None, password=None): | ||
"""Create a user on every node in the cluster | ||
|
||
Args: | ||
username (:obj:`str`): name of the user to create. | ||
id (:obj:`str`): id of the cluster to create the user on. | ||
nodes (:obj:`List[ComputeNode]`): list of nodes to create the user on | ||
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None. | ||
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None. | ||
|
||
Returns: | ||
:obj:`None` | ||
""" | ||
return create_user_on_cluster.create_user_on_cluster(self, id, nodes, username, ssh_pub_key, password) | ||
|
||
def generate_user_on_node(self, id, node_id): | ||
"""Create a user with an autogenerated username and ssh_key on the given node. | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to generate the user on. | ||
node_id (:obj:`str`): the id of the node in the cluster to generate the user on. | ||
|
||
Returns: | ||
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`) | ||
""" | ||
return generate_user_on_node.generate_user_on_node(self, id, node_id) | ||
|
||
#TODO: remove nodes as param | ||
def generate_user_on_cluster(self, id, nodes): | ||
"""Create a user with an autogenerated username and ssh_key on the cluster | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to generate the user on. | ||
node_id (:obj:`str`): the id of the node in the cluster to generate the user on. | ||
|
||
Returns: | ||
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`) | ||
""" | ||
return generate_user_on_cluster.generate_user_on_cluster(self, id, nodes) | ||
|
||
def delete_user_on_node(self, id: str, node_id: str, username: str) -> str: | ||
"""Delete a user on a node | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to delete the user on. | ||
node_id (:obj:`str`): the id of the node in the cluster to delete the user on. | ||
username (:obj:`str`): the name of the user to delete. | ||
|
||
Returns: | ||
:obj:`None` | ||
""" | ||
return delete_user_on_node.delete_user(self, id, node_id, username) | ||
|
||
#TODO: remove nodes as param | ||
def delete_user_on_cluster(self, username, id, nodes): | ||
"""Delete a user on every node in the cluster | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to delete the user on. | ||
node_id (:obj:`str`): the id of the node in the cluster to delete the user on. | ||
username (:obj:`str`): the name of the user to delete. | ||
|
||
Returns: | ||
:obj:`None` | ||
""" | ||
return delete_user_on_cluster.delete_user_on_cluster(self, username, id, nodes) | ||
|
||
def node_run(self, id, node_id, command, internal, container_name=None, timeout=None): | ||
"""Run a bash command on the given node | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to run the command on. | ||
node_id (:obj:`str`): the id of the node in the cluster to run the command on. | ||
command (:obj:`str`): the bash command to execute on the node. | ||
internal (:obj:`bool`): if True, this will connect to the node using its internal IP. | ||
Only use this if running within the same VNET as the cluster. Defaults to False. | ||
container_name=None (:obj:`str`, optional): the name of the container to run the command in. | ||
If None, the command will run on the host VM. Defaults to None. | ||
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node. | ||
Defaults to None. | ||
|
||
Returns: | ||
:obj:`aztk.models.NodeOutput`: object containing the output of the run command | ||
""" | ||
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout) | ||
|
||
def get_remote_login_settings(self, id: str, node_id: str): | ||
"""Get the remote login information for a node in a cluster | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster the node is in | ||
node_id (:obj:`str`): the id of the node in the cluster | ||
|
||
Returns: | ||
:obj:`aztk.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node | ||
""" | ||
return get_remote_login_settings.get_remote_login_settings(self, id, node_id) | ||
|
||
def run(self, id, command, internal, container_name=None, timeout=None): | ||
"""Run a bash command on every node in the cluster | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to run the command on. | ||
command (:obj:`str`): the bash command to execute on the node. | ||
internal (:obj:`bool`): if true, this will connect to the node using its internal IP. | ||
Only use this if running within the same VNET as the cluster. Defaults to False. | ||
container_name=None (:obj:`str`, optional): the name of the container to run the command in. | ||
If None, the command will run on the host VM. Defaults to None. | ||
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node. | ||
Defaults to None. | ||
|
||
Returns: | ||
:obj:`List[azkt.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command | ||
""" | ||
return run.cluster_run(self, id, command, internal, container_name, timeout) | ||
|
||
def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0): | ||
"""Get the log for a running or completed application | ||
|
||
Args: | ||
id (:obj:`str`): the id of the cluster to run the command on. | ||
application_name (:obj:`str`): str | ||
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved. | ||
Only use this if streaming the log as it is being written. Defaults to False. | ||
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved. | ||
Only useful is streaming the log as it is being written. Only used if tail is True. | ||
|
||
Returns: | ||
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application. | ||
""" | ||
return get_application_log.get_application_log(self, id, application_name, tail, current_bytes) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
import concurrent.futures | ||
|
||
|
||
#TODO: remove nodes param | ||
def create_user_on_cluster(base_operations, id, nodes, username, ssh_pub_key=None, password=None): | ||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
futures = { | ||
executor.submit(base_operations.create_user_on_node, id, node.id, username, ssh_pub_key, password): node | ||
for node in nodes | ||
} | ||
concurrent.futures.wait(futures) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from datetime import datetime, timedelta, timezone | ||
|
||
import azure.batch.models as batch_models | ||
import azure.batch.models.batch_error as batch_error | ||
|
||
from aztk import models | ||
from aztk.utils import get_ssh_key | ||
|
||
|
||
def __create_user(self, id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str: | ||
""" | ||
Create a pool user | ||
:param pool: the pool to add the user to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrong format of args There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these are the old deprecated methods, I'm going to leave the docstrings as is (since they will be removed soon). All of the new user-facing functions have docstrings in the proper format. |
||
:param node: the node to add the user to | ||
:param username: username of the user to add | ||
:param password: password of the user to add | ||
:param ssh_key: ssh_key of the user to add | ||
""" | ||
# Create new ssh user for the given node | ||
self.batch_client.compute_node.add_user( | ||
id, | ||
node_id, | ||
batch_models.ComputeNodeUser( | ||
name=username, | ||
is_admin=True, | ||
password=password, | ||
ssh_public_key=get_ssh_key.get_user_public_key(ssh_key, self.secrets_configuration), | ||
expiry_time=datetime.now(timezone.utc) + timedelta(days=365), | ||
), | ||
) | ||
|
||
|
||
def create_user_on_node(base_client, id, node_id, username, ssh_key=None, password=None): | ||
try: | ||
__create_user( | ||
base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password) | ||
except batch_error.BatchErrorException as error: | ||
try: | ||
base_client.delete_user_on_node(id, node_id, username) | ||
base_client.create_user_on_node(id=id, node_id=node_id, username=username, ssh_key=ssh_key) | ||
except batch_error.BatchErrorException as error: | ||
raise error |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
import concurrent.futures | ||
|
||
#TODO: remove nodes param | ||
def delete_user_on_cluster(base_client, id, nodes, username): | ||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
futures = [executor.submit(base_client.delete_user_on_node, id, node.id, username) for node in nodes] | ||
concurrent.futures.wait(futures) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
def delete_user(self, pool_id: str, node_id: str, username: str) -> str: | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't looks like to be too much of those left. Could change them all now to the new format There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This docstring is not published. I can update it, but it will only be discoverable in the source. |
||
Create a pool user | ||
:param pool: the pool to add the user to | ||
:param node: the node to add the user to | ||
:param username: username of the user to add | ||
""" | ||
# Delete a user on the given node | ||
self.batch_client.compute_node.delete_user(pool_id, node_id, username) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import concurrent.futures | ||
|
||
from Cryptodome.PublicKey import RSA | ||
|
||
from aztk.utils import secure_utils | ||
|
||
|
||
#TODO: remove nodes param | ||
def generate_user_on_cluster(base_operations, id, nodes): | ||
generated_username = secure_utils.generate_random_string() | ||
ssh_key = RSA.generate(2048) | ||
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8') | ||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
futures = { | ||
executor.submit(base_operations.create_user_on_node, id, node.id, generated_username, ssh_pub_key): node | ||
for node in nodes | ||
} | ||
concurrent.futures.wait(futures) | ||
|
||
return generated_username, ssh_key |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from Cryptodome.PublicKey import RSA | ||
|
||
from aztk.utils import secure_utils | ||
|
||
|
||
def generate_user_on_node(base_client, pool_id, node_id): | ||
generated_username = secure_utils.generate_random_string() | ||
ssh_key = RSA.generate(2048) | ||
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8') | ||
base_client.create_user_on_node(pool_id, node_id, generated_username, ssh_pub_key) | ||
return generated_username, ssh_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate on each line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what VSCode's "sort imports" spits out. Unfortunately, yapf doesn't do import formatting (or comment/docstring formatting).
I think we should align to some standard import formatter, but not sure the vscode python extension one is the right tool.