Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Internal: fix pylint warnings (#651)
Browse files Browse the repository at this point in the history
* inital, remove unused imports

* run yapf

* remove unused imports and variables, fix declaration outside init

* fix some pylint warnings, add ssh_into_master

* remove unused imports

* unused variables

* string and function normalization

* stop using list comprehension for side effects, make method function

* stop using protected member

* various pylint fixes

* formatting

* formatting

* add retry decorator with tests

* start adding retry decorator, retry docker compose download

* update pip and tests

* logic fix

* change no delete if

* factor out reused functions

* fix wait_for_all_nodes

* fix download return type bug

* test vsts ci update

* temporarily disable integration tests

* syntax fix

* update vsts build

* add back integration tests, remove debug branch

* remove parallel unit tests

* more verbose clis

* update pylint

* typo

* fix imports

* function returns nothing, don't return

* make iterator list

* change debug value
  • Loading branch information
jafreck authored Aug 25, 2018
1 parent 0a9ce94 commit 828162e
Show file tree
Hide file tree
Showing 142 changed files with 1,980 additions and 1,724 deletions.
15 changes: 10 additions & 5 deletions .vsts-ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
trigger:
- master


phases:
- phase: Test
queue: Hosted Linux Preview
Expand All @@ -24,16 +23,22 @@ phases:
displayName: yapf
- script: |
pylint -j 2 -E aztk aztk_cli
pylint -jobs 2 --errors-only aztk aztk_cli
condition: succeeded()
displayName: pylint
displayName: pylint error check
- script: |
pytest -n 20 --ignore=tests/integration_tests
pytest --ignore=tests/integration_tests
condition: succeeded()
displayName: unit tests
- script: |
pytest -n 75
pytest --numprocesses=75
condition: succeeded()
displayName: integration tests
- script: |
pylint -jobs 2 --disable=fixme aztk aztk_cli
continueOnError: true
condition: succeeded()
displayName: pylint report
48 changes: 30 additions & 18 deletions aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from aztk import models
from aztk.internal import cluster_data
from aztk.utils import ssh as ssh_lib

from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
generate_user_on_cluster, generate_user_on_node, get_application_log, get_remote_login_settings,
node_run, run, ssh_into_node)
from .helpers import (
create_user_on_cluster,
create_user_on_node,
delete_user_on_cluster,
delete_user_on_node,
generate_user_on_cluster,
generate_user_on_node,
get_application_log,
get_remote_login_settings,
node_run,
run,
ssh_into_node,
)


class BaseOperations:
Expand All @@ -15,14 +24,14 @@ class BaseOperations:
Azure Batch service.
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`): Model that holds AZTK secrets used to authenticate
with Azure and the clusters.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`):
Model that holds AZTK secrets used to authenticate with Azure and the clusters.
"""

def __init__(self, context):
self.batch_client = context['batch_client']
self.blob_client = context['blob_client']
self.secrets_configuration = context['secrets_configuration']
self.batch_client = context["batch_client"]
self.blob_client = context["blob_client"]
self.secrets_configuration = context["secrets_configuration"]

def get_cluster_configuration(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node
Expand Down Expand Up @@ -62,7 +71,8 @@ def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
Expand All @@ -89,15 +99,16 @@ def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None
"""
return create_user_on_node.create_user_on_node(self, id, node_id, username, ssh_key, password)

#TODO: remove nodes as param
# TODO: remove nodes as param
def create_user_on_cluster(self, id, nodes, username, ssh_pub_key=None, password=None):
"""Create a user on every node in the cluster
Args:
username (:obj:`str`): name of the user to create.
id (:obj:`str`): id of the cluster to create the user on.
nodes (:obj:`List[ComputeNode]`): list of nodes to create the user on
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
Returns:
Expand All @@ -117,7 +128,7 @@ def generate_user_on_node(self, id, node_id):
"""
return generate_user_on_node.generate_user_on_node(self, id, node_id)

#TODO: remove nodes as param
# TODO: remove nodes as param
def generate_user_on_cluster(self, id, nodes):
"""Create a user with an autogenerated username and ssh_key on the cluster
Expand All @@ -143,7 +154,7 @@ def delete_user_on_node(self, id: str, node_id: str, username: str) -> str:
"""
return delete_user_on_node.delete_user(self, id, node_id, username)

#TODO: remove nodes as param
# TODO: remove nodes as param
def delete_user_on_cluster(self, username, id, nodes):
"""Delete a user on every node in the cluster
Expand Down Expand Up @@ -212,10 +223,11 @@ def get_application_log(self, id: str, application_name: str, tail=False, curren
Args:
id (:obj:`str`): the id of the cluster to run the command on.
application_name (:obj:`str`): str
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved.
Only use this if streaming the log as it is being written. Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved.
Only useful is streaming the log as it is being written. Only used if tail is True.
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes.
Otherwise, the whole log will be retrieved. Only use this if streaming the log as it is being written.
Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes
are retrieved. Only useful is streaming the log as it is being written. Only used if tail is True.
Returns:
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application.
Expand Down
2 changes: 1 addition & 1 deletion aztk/client/base/helpers/create_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import concurrent.futures


#TODO: remove nodes param
# TODO: remove nodes param
def create_user_on_cluster(base_operations, id, nodes, username, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
Expand Down
1 change: 0 additions & 1 deletion aztk/client/base/helpers/create_user_on_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error

from aztk import models
from aztk.utils import get_ssh_key


Expand Down
2 changes: 1 addition & 1 deletion aztk/client/base/helpers/delete_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import concurrent.futures


#TODO: remove nodes param
# TODO: remove nodes param
def delete_user_on_cluster(base_client, id, nodes, username):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(base_client.delete_user_on_node, id, node.id, username) for node in nodes]
Expand Down
4 changes: 2 additions & 2 deletions aztk/client/base/helpers/generate_user_on_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from aztk.utils import secure_utils


#TODO: remove nodes param
# TODO: remove nodes param
def generate_user_on_cluster(base_operations, id, nodes):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
ssh_pub_key = ssh_key.publickey().exportKey("OpenSSH").decode("utf-8")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, generated_username, ssh_pub_key): node
Expand Down
2 changes: 1 addition & 1 deletion aztk/client/base/helpers/generate_user_on_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
def generate_user_on_node(base_client, pool_id, node_id):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
ssh_pub_key = ssh_key.publickey().exportKey("OpenSSH").decode("utf-8")
base_client.create_user_on_node(pool_id, node_id, generated_username, ssh_pub_key)
return generated_username, ssh_key
25 changes: 13 additions & 12 deletions aztk/client/base/helpers/get_application_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error

from aztk import error
from aztk import models
from aztk import error, models
from aztk.utils import constants, helpers

output_file = constants.TASK_WORKING_DIR + \
"/" + constants.SPARK_SUBMIT_LOGS_FILE
output_file = constants.TASK_WORKING_DIR + "/" + constants.SPARK_SUBMIT_LOGS_FILE


def __check_task_node_exist(batch_client, cluster_id: str, task: batch_models.CloudTask) -> bool:
Expand Down Expand Up @@ -50,17 +48,18 @@ def __get_output_file_properties(batch_client, cluster_id: str, application_name

def get_log_from_storage(blob_client, container_name, application_name, task):
try:
blob = blob_client.get_blob_to_text(container_name, application_name + '/' + constants.SPARK_SUBMIT_LOGS_FILE)
blob = blob_client.get_blob_to_text(container_name, application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE)
except azure.common.AzureMissingResourceHttpError:
raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.")

return models.ApplicationLog(
name=application_name,
cluster_id=container_name,
application_state=task.state._value_,
application_state=task.state.name,
log=blob.content,
total_bytes=blob.properties.content_length,
exit_code=task.execution_info.exit_code)
exit_code=task.execution_info.exit_code,
)


def get_log(batch_client, blob_client, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
Expand Down Expand Up @@ -88,18 +87,20 @@ def get_log(batch_client, blob_client, cluster_id: str, application_name: str, t
return models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state._value_,
application_state=task.state.name,
log=content,
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code)
exit_code=task.execution_info.exit_code,
)
else:
return models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state._value_,
log='',
application_state=task.state.name,
log="",
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code)
exit_code=task.execution_info.exit_code,
)


def get_application_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
Expand Down
5 changes: 3 additions & 2 deletions aztk/client/base/helpers/node_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ def node_run(base_client, cluster_id, node_id, command, internal, container_name
generated_username,
node_rls.ip_address,
node_rls.port,
ssh_key=ssh_key.exportKey().decode('utf-8'),
ssh_key=ssh_key.exportKey().decode("utf-8"),
container_name=container_name,
timeout=timeout)
timeout=timeout,
)
return output
finally:
base_client.delete_user_on_node(cluster_id, node.id, generated_username)
5 changes: 3 additions & 2 deletions aztk/client/base/helpers/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ def cluster_run(base_operations, cluster_id, command, internal, container_name=N
command,
generated_username,
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8'),
ssh_key=ssh_key.exportKey().decode("utf-8"),
container_name=container_name,
timeout=timeout))
timeout=timeout,
))
return output
except OSError as exc:
raise exc
Expand Down
Loading

0 comments on commit 828162e

Please sign in to comment.