From 0677b3d25e0fb3b148bbcdeaee65fa96fe9551f3 Mon Sep 17 00:00:00 2001 From: jiata Date: Thu, 15 Jun 2017 00:05:54 -0700 Subject: [PATCH 1/2] syntax fixes --- bin/spark-cluster-create | 69 +++++----- bin/spark-cluster-create-user | 60 ++++----- bin/spark-cluster-delete | 18 ++- bin/spark-cluster-get | 20 ++- bin/spark-cluster-list | 14 +- bin/spark-cluster-ssh | 103 +++++++-------- bin/spark-submit | 234 ++++++++++++++++++---------------- dtde/clusterlib.py | 98 +++++++------- dtde/util.py | 69 +++++----- 9 files changed, 341 insertions(+), 344 deletions(-) diff --git a/bin/spark-cluster-create b/bin/spark-cluster-create index 37252425..30ea475f 100755 --- a/bin/spark-cluster-create +++ b/bin/spark-cluster-create @@ -10,59 +10,60 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None - _vm_count = None - _vm_size = None - _custom_script = None - - _wait = True + pool_id = None + vm_count = None + vm_size = None + custom_script = None + wait = True # parse arguments - parser = argparse.ArgumentParser(prog="az_spark") - - parser.add_argument("--cluster-id", required=True, - help="the unique name of your spark cluster") - parser.add_argument("--cluster-size", type=int, required=True, - help="number of vms in your cluster") - parser.add_argument("--cluster-vm-size", required=True, - help="size of each vm in your cluster") - parser.add_argument("--custom-script", - help="absolute path of custom bash script (.sh) to run on each node") + parser = argparse.ArgumentParser(prog='az_spark') + + parser.add_argument('--cluster-id', required=True, + help='The unique id of your spark cluster') + parser.add_argument('--cluster-size', type=int, required=True, + help='Number of vms in your cluster') + parser.add_argument('--cluster-vm-size', required=True, + help='VM size for nodes in your cluster') + parser.add_argument('--custom-script', + help='Absolute path of custom bash script (.sh) to run on each node') parser.add_argument('--wait', dest='wait', action='store_true') parser.add_argument('--no-wait', dest='wait', action='store_false') parser.set_defaults(wait=False) args = parser.parse_args() - print() if args.cluster_id is not None: - _pool_id = args.cluster_id - print("spark cluster id: %s" % _pool_id) + pool_id = args.cluster_id if args.cluster_size is not None: - _vm_count = args.cluster_size - print("spark cluster size: %i" % _vm_count) + vm_count = args.cluster_size if args.cluster_vm_size is not None: - _vm_size = args.cluster_vm_size - print("spark cluster vm size: %s" % _vm_size) + vm_size = args.cluster_vm_size if args.custom_script is not None: - _custom_script = args.custom_script - print("path to custom script: %s" % _custom_script) + custom_script = args.custom_script if args.wait is not None: if args.wait == False: - _wait = False - print("wait for cluster: %r" % _wait) + wait = False + + print('-------------------------------------------') + print('spark cluster id: {}'.format(pool_id)) + print('spark cluster size: {}'.format(vm_count)) + print('spark cluster vm size: {}'.format(vm_size)) + print('path to custom script: {}'.format(custom_script)) + print('wait for cluster: {}'.format(wait)) + print('-------------------------------------------') # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -90,8 +91,8 @@ if __name__ == '__main__': clusterlib.create_cluster( batch_client, blob_client, - _custom_script, - _pool_id, - _vm_count, - _vm_size, - _wait) + custom_script, + pool_id, + vm_count, + vm_size, + wait) diff --git a/bin/spark-cluster-create-user b/bin/spark-cluster-create-user index cc76eb98..3d5c31ad 100755 --- a/bin/spark-cluster-create-user +++ b/bin/spark-cluster-create-user @@ -10,42 +10,44 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None - _username = 'admin' - _password = 'pass123!' + pool_id = None + username = 'admin' + password = 'pass123!' # parse arguments - parser = argparse.ArgumentParser(prog="az_spark") + parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument("--cluster-id", required=True, - help="the unique name of your spark cluster") - parser.add_argument("-u", "--user", - help="the relative path to your spark app in your directory") - parser.add_argument("-p", "--password", - help="the relative path to your spark app in your directory") + parser.add_argument('--cluster-id', required=True, + help='The unique id of your spark cluster') + parser.add_argument('-u', '--username', + help='The usernameto access your spark cluster\'s head node') + parser.add_argument('-p', '--password', + help='The password to access your spark cluster\'s head node') args = parser.parse_args() - print() if args.cluster_id is not None: - _pool_id = args.cluster_id - print("spark cluster id: %s" % _pool_id) + pool_id = args.cluster_id - if args.user is not None: - _username = args.user - print("az_spark username: %s" % _username) + if args.username is not None: + username = args.username if args.password is not None: - _password = args.password - print("az_spark password: %s" % _password) + password = args.password + + print('-------------------------------------------') + print('spark cluster id: {}'.format(pool_id)) + print('username: {}'.format(username)) + print('password: {}'.format(password)) + print('-------------------------------------------') # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -61,19 +63,7 @@ if __name__ == '__main__': # get ssh command clusterlib.create_user( batch_client, - _pool_id, - _username, - _password) + pool_id, + username, + password) - - - - - - - - - - - - diff --git a/bin/spark-cluster-delete b/bin/spark-cluster-delete index ab458564..d00cef33 100755 --- a/bin/spark-cluster-delete +++ b/bin/spark-cluster-delete @@ -10,28 +10,26 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None + pool_id = None # parse arguments - parser = argparse.ArgumentParser(prog="az_spark") + parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument("--cluster-id", required=True, - help="the unique name of your spark cluster") + parser.add_argument('--cluster-id', required=True, + help='The unique id of your spark cluster') args = parser.parse_args() - print() if args.cluster_id is not None: - _pool_id = args.cluster_id - print("delete cluster id: %s" % _pool_id) + pool_id = args.cluster_id # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -47,5 +45,5 @@ if __name__ == '__main__': # Delete specified cluster clusterlib.delete_cluster( batch_client, - _pool_id) + pool_id) diff --git a/bin/spark-cluster-get b/bin/spark-cluster-get index 1f63183b..ba10736a 100755 --- a/bin/spark-cluster-get +++ b/bin/spark-cluster-get @@ -10,30 +10,26 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None - _vm_count = None - _vm_size = None - _wait = True + pool_id = None # parse arguments - parser = argparse.ArgumentParser(prog="az_spark") + parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument("--cluster-id", required=True, - help="the unique name of your spark cluster") + parser.add_argument('--cluster-id', required=True, + help='The unique id of your spark cluster') args = parser.parse_args() - print() if args.cluster_id is not None: - _pool_id = args.cluster_id + pool_id = args.cluster_id # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -49,4 +45,4 @@ if __name__ == '__main__': # create spark cluster clusterlib.get_cluster_details( batch_client, - _pool_id) + pool_id) diff --git a/bin/spark-cluster-list b/bin/spark-cluster-list index 1229e35c..dff9e636 100755 --- a/bin/spark-cluster-list +++ b/bin/spark-cluster-list @@ -10,22 +10,17 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None - _vm_count = None - _vm_size = None - _wait = True - # parse arguments - parser = argparse.ArgumentParser(prog="az_spark") + parser = argparse.ArgumentParser(prog='az_spark') args = parser.parse_args() # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -39,5 +34,4 @@ if __name__ == '__main__': batch_service_url) # create spark cluster - clusterlib.list_clusters( - batch_client) + clusterlib.list_clusters(batch_client) diff --git a/bin/spark-cluster-ssh b/bin/spark-cluster-ssh index f65f53cb..58e35ea5 100755 --- a/bin/spark-cluster-ssh +++ b/bin/spark-cluster-ssh @@ -1,7 +1,8 @@ #!/usr/bin/env python from dtde import clusterlib, util -import argparse import os +import argparse +import os try: import configparser @@ -9,73 +10,75 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None - _jupyter = None - _webui = None - _masterui = None - _user = None - _connect = True + pool_id = None + jupyter = None + webui = None + masterui = None + username = None + connect = True # parse arguments - parser = argparse.ArgumentParser(prog="az_spark") + parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument("--cluster-id", required=True, - help="the unique name of your spark cluster") + parser.add_argument('--cluster-id', required=True, + help='The unique id of your spark cluster') parser.add_argument('--masterui', - help="open spark's webui on local port") + help='Local port to port spark\'s master UI to') parser.add_argument('--webui', - help="open spark's webui on local port") + help='Local port to port spark\'s webui to') parser.add_argument('--jupyter', - help="open jupyter on local port") - parser.add_argument('--user', - help="ssh user") + help='Local port to port jupyter to') + parser.add_argument('-u', '--username', + help='Username to spark cluster') parser.add_argument('--no-connect', - dest='noconnect', action='store_false', default=True, - help="Do not create the ssh session. Only print out \ - the command to run.") + help='Do not create the ssh session. Only print out \ + the command to run.') args = parser.parse_args() - - print() + + if (args.no_connect is True and args.username is None): + print('You must specify a username in order to connect automatically.') + exit() + if args.cluster_id is not None: - _pool_id = args.cluster_id - print("spark cluster id: %s" % _pool_id) + pool_id = args.cluster_id if args.masterui is not None: - _masterui = args.masterui - print("open masterui: %s" % _masterui) + masterui = args.masterui if args.webui is not None: - _webui = args.webui - print("open webui: %s" % _webui) + webui = args.webui if args.jupyter is not None: - _jupyter = args.jupyter - print("open jupyter: %s" % _jupyter) - - if args.user is not None: - _user = args.user - print("ssh user: %s" % _user) - - if args.noconnect is not None: - if (args.noconnect is True and _user is None): - _connect = False - elif (args.noconnect is False): - _connect = False - print("connect: %s" % _connect) - - if (args.noconnect is True and _user is None): - print("You must specify a user in order to connect automatically") + jupyter = args.jupyter + + if args.username is not None: + username = args.username + + if args.no_connect is not None: + if (args.no_connect is True and username is None): + connect = False + elif (args.no_connect is False): + connect = False + + print('-------------------------------------------') + print('spark cluster id: {}'.format(pool_id)) + print('open masterui: {}'.format(masterui)) + print('open webui: {}'.format(webui)) + print('open jupyter: {}'.format(jupyter)) + print('ssh username: {}'.format(username)) + print('connect: {}'.format(connect)) + print('-------------------------------------------') # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -91,9 +94,9 @@ if __name__ == '__main__': # get ssh command clusterlib.ssh( batch_client, - pool_id = _pool_id, - masterui = _masterui, - webui = _webui, - jupyter = _jupyter, - username = _user, - connect = _connect) + pool_id = pool_id, + masterui = masterui, + webui = webui, + jupyter = jupyter, + username = username, + connect = connect) diff --git a/bin/spark-submit b/bin/spark-submit index 22bdefd4..5b98e669 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -10,170 +10,180 @@ except ImportError: import ConfigParser as configparser # Path to config -_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') +CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': - _pool_id = None - _wait = True + pool_id = None + wait = True # spark submit options we're passing through - _name = None - _app = None - _app_args = [] - _main_class = None - _jars = [] - _py_files = [] - _files = [] - _driver_java_options = None - _driver_library_path = None - _driver_class_path = None - _driver_memory = None - _executor_memory = None - _driver_cores = None - _executor_cores = None + name = None + app = None + app_args = [] + main_class = None + jars = [] + py_files = [] + files = [] + driver_java_options = None + driver_library_path = None + driver_class_path = None + driver_memory = None + executor_memory = None + driver_cores = None + executor_cores = None # parse arguments parser = argparse.ArgumentParser( - prog="spark-app-submit", - usage="spark-app-submit \n \ - --cluster-id CLUSTER_ID \n \ - --name APP_NAME \n \ - [options] \n \ - \n \ - [app arguments]") + prog='az-spark', + usage='spark-submit \n \ + --cluster-id CLUSTER_ID \n \ + --name APP_NAME \n \ + [options] \n \ + \n \ + [app arguments]') - parser.add_argument("--cluster-id", required=True, - help="the unique name of your spark cluster") + parser.add_argument('--cluster-id', required=True, + help='The unique id of your spark cluster') - parser.add_argument("--name", required=True, - help="a name for your application") + parser.add_argument('--name', required=True, + help='a name for your application') parser.add_argument('--wait', dest='wait', action='store_true', - help="Wait for app to complete") + help='Wait for app to complete') parser.add_argument('--no-wait', dest='wait', action='store_false', - help="Do not wait for app to complete") + help='Do not wait for app to complete') parser.set_defaults(wait=True) - parser.add_argument("--class", dest="main_class", - help="Your application's main class (for Java only).") + parser.add_argument('--class', dest='main_class', + help='Your application\'s main class (for Java only).') - parser.add_argument("--jars", - help="Comma-separated list of local jars to include \ + parser.add_argument('--jars', + help='Comma-separated list of local jars to include \ on the driver and executor classpaths. Use \ - absolute path to reference files.") + absolute path to reference files.') - parser.add_argument("--py-files", - help="Comma-separated list of .zip, .egg, or .py files \ + parser.add_argument('--py-files', + help='Comma-separated list of .zip, .egg, or .py files \ to place on the PYTHONPATH for Python apps. Use \ - absolute path to reference files.") + absolute path to reference files.') - parser.add_argument("--files", - help="Comma-separated list of .zip, .egg, or .py files \ + parser.add_argument('--files', + help='Comma-separated list of .zip, .egg, or .py files \ to place on the PYTHONPATH for Python apps. Use \ - absolute path ot reference files.") + absolute path ot reference files.') - parser.add_argument("--driver-java-options", - help="Extra Java options to pass to the driver.") + parser.add_argument('--driver-java-options', + help='Extra Java options to pass to the driver.') - parser.add_argument("--driver-library-path", - help="Extra library path entries to pass to the driver.") + parser.add_argument('--driver-library-path', + help='Extra library path entries to pass to the driver.') - parser.add_argument("--driver-class-path", - help="Extra class path entries to pass to the driver. \ + parser.add_argument('--driver-class-path', + help='Extra class path entries to pass to the driver. \ Note that jars added with --jars are automatically \ - included in the classpath.") + included in the classpath.') - parser.add_argument("--driver-memory", + parser.add_argument('--driver-memory', help="Memory for driver (e.g. 1000M, 2G) (Default: 1024M).") - parser.add_argument("--executor-memory", - help="Memory per executor (e.g. 1000M, 2G) (Default: 1G).") + parser.add_argument('--executor-memory', + help='Memory per executor (e.g. 1000M, 2G) (Default: 1G).') - parser.add_argument("--driver-cores", - help="Cores for driver (Default: 1).") + parser.add_argument('--driver-cores', + help='Cores for driver (Default: 1).') - parser.add_argument("--executor-cores", - help="Number of cores per executor. (Default: All \ - available cores on the worker") + parser.add_argument('--executor-cores', + help='Number of cores per executor. (Default: All \ + available cores on the worker') - parser.add_argument("application", nargs='*', - help="App jar OR python file to execute. Use absolute \ - path to reference file.") + parser.add_argument('application', nargs='*', + help='App jar OR python file to execute. Use absolute \ + path to reference file.') args = parser.parse_args() - print() - if args.cluster_id is not None: - _pool_id = args.cluster_id - print("Spark cluster id: %s" % _pool_id) + pool_id = args.cluster_id if args.name is not None: - _name = args.name - print("Spark app name: %s" % _name) + name = args.name if args.wait is not None: if args.wait == False: - _wait = False - print("Wait for app completion: %r" % _wait) + wait = False if args.main_class is not None: - _main_class = args.main_class - print("Entry point class: %s" % _main_class) + main_class = args.main_class if args.jars is not None: - _jars = args.jars.replace(' ','').split(',') - print("JARS: %s" % _jars) + jars = args.jars.replace(' ','').split(',') if args.py_files is not None: - _py_files = args.py_files.replace(' ','').split(',') - print("PY_Files: %s" % _py_files) + py_files = args.py_files.replace(' ','').split(',') if args.files is not None: - _files = args.py_files.replace(' ','').split(',') - print("Files: %s" % _files) + files = args.py_files.replace(' ','').split(',') if args.driver_java_options is not None: - _driver_java_options = args.driver_java_options - print("Driver java options: %s" % _driver_java_options) + driver_java_options = args.driver_java_options if args.driver_library_path is not None: - _driver_library_path = args.driver_library_path - print("Driver library path: %s" % _driver_library_path) + driver_library_path = args.driver_library_path if args.driver_class_path is not None: - _driver_class_path = args.driver_class_path - print("Driver class path: %s" % _driver_class_path) + driver_class_path = args.driver_class_path if args.driver_memory is not None: - _driver_memory = args.driver_memory - print("Driver memory: %s" % _driver_memory) + driver_memory = args.driver_memory if args.executor_memory is not None: - _executor_memory = args.executor_memory - print("Executor memory: %s" % _executor_memory) + executor_memory = args.executor_memory if args.driver_cores is not None: - _driver_cores = args.driver_cores - print("Driver cores: %s" % _driver_cores) + driver_cores = args.driver_cores if args.executor_cores is not None: - _executor_cores = args.executor_cores - print("Executor cores: %s" % _executor_cores) + executor_cores = args.executor_cores if args.application is not None: - _app = args.application[0] + app = args.application[0] if len(args.application) > 1: - _app_args = args.application[1:] - print("Application: %s" % _app) - print("Application arguments: %s" % _app_args) - - print() + app_args = args.application[1:] + + print('-------------------------------------------') + print('Spark cluster id: {}'.format(pool_id)) + print('Spark app name: {}'.format(name)) + print('Wait for app completion: {}'.format(wait)) + if main_class is not None: + print('Entry point class: {}'.format(main_class)) + if jars: + print('JARS: {}'.format(jars)) + if py_files: + print('PY_Files: {}'.format(py_files)) + if files: + print('Files: {}'.format(files)) + if driver_java_options is not None: + print('Driver java options: {}'.format(driver_java_options)) + if driver_library_path is not None: + print('Driver library path: {}'.format(driver_library_path)) + if driver_class_path is not None: + print('Driver class path: {}'.format(driver_class_path)) + if driver_memory is not None: + print('Driver memory: {}'.format(driver_memory)) + if executor_memory is not None: + print('Executor memory: {}'.format(executor_memory)) + if driver_cores is not None: + print('Driver cores: {}'.format(driver_cores)) + if executor_cores is not None: + print('Executor cores: {}'.format(executor_cores)) + print('Application: {}'.format(app)) + print('Application arguments: {}'.format(app_args)) + print('-------------------------------------------') # Read config file global_config = configparser.ConfigParser() - global_config.read(_config_path) + global_config.read(CONFIG_PATH) # Set up batch configuration batch_account_key = global_config.get('Batch', 'batchaccountkey') @@ -201,19 +211,19 @@ if __name__ == '__main__': joblib.submit_app( batch_client, blob_client, - pool_id = _pool_id, - name = _name, - app = _app, - app_args = _app_args, - wait = _wait, - main_class = _main_class, - jars = _jars, - py_files = _py_files, - files = _files, - driver_java_options = _driver_java_options, - driver_library_path = _driver_library_path, - driver_class_path = _driver_class_path, - driver_memory = _driver_memory, - executor_memory = _executor_memory, - driver_cores = _driver_cores, - executor_cores = _executor_cores) + pool_id = pool_id, + name = name, + app = app, + app_args = app_args, + wait = wait, + main_class = main_class, + jars = jars, + py_files = py_files, + files = files, + driver_java_options = driver_java_options, + driver_library_path = driver_library_path, + driver_class_path = driver_class_path, + driver_memory = driver_memory, + executor_memory = executor_memory, + driver_cores = driver_cores, + executor_cores = executor_cores) diff --git a/dtde/clusterlib.py b/dtde/clusterlib.py index 9a38dc9b..4d95806f 100644 --- a/dtde/clusterlib.py +++ b/dtde/clusterlib.py @@ -6,39 +6,27 @@ from subprocess import call def cluster_install_cmd(custom_script_file): - - run_custom_script = '' - if custom_script_file is not None: - run_custom_script = '/bin/sh -c ' + custom_script_file - - return [ + ret = [ # setup spark home and permissions for spark folder 'export SPARK_HOME=/dsvm/tools/spark/current', 'export PATH=$PATH:$SPARK_HOME/bin', 'chmod -R 777 $SPARK_HOME', - 'chmod -R 777 /usr/local/share/jupyter/kernels', + 'chmod -R 777 /usr/local/share/jupyter/kernels' + ] + + if custom_script_file is not None: + ret.extend([ + # To avoid error: "sudo: sorry, you must have a tty to run sudo" + 'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers', + '/bin/sh -c {}'.format(custom_script_file) + ]) - # To avoid error: "sudo: sorry, you must have a tty to run sudo" - 'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers', - run_custom_script, + ret.extend(['exit 0']) - 'exit 0' - ] + return ret def cluster_connect_cmd(): return [ - # print env vars for debug - 'echo CCP_NODES:', - 'echo $CCP_NODES', - 'echo AZ_BATCH_NODE_LIST:', - 'echo $AZ_BATCH_NODE_LIST', - 'echo AZ_BATCH_HOST_LIST:', - 'echo $AZ_BATCH_HOST_LIST', - 'echo AZ1_BATCH_MASTER_NODE:', - 'echo $AZ_BATCH_MASTER_NODE', - 'echo AZ_BATCH_IS_CURRENT_NODE_MASTER:', - 'echo $AZ_BATCH_IS_CURRENT_NODE_MASTER', - # set SPARK_HOME environment vars 'export SPARK_HOME=/dsvm/tools/spark/current', 'export PATH=$PATH:$SPARK_HOME/bin', @@ -128,9 +116,9 @@ def create_cluster( """ # vm image - _publisher = 'microsoft-ads' - _offer = 'linux-data-science-vm' - _sku = 'linuxdsvm' + publisher = 'microsoft-ads' + offer = 'linux-data-science-vm' + sku = 'linuxdsvm' # reuse pool_id as job_id job_id = pool_id @@ -152,7 +140,7 @@ def create_cluster( # Get a verified node agent sku sku_to_use, image_ref_to_use = \ util.select_latest_verified_vm_image_with_node_agent_sku( - batch_client, _publisher, _offer, _sku) + batch_client, publisher, offer, sku) # Confiure the pool pool = batch_models.PoolAddParameter( @@ -209,10 +197,10 @@ def create_cluster( batch_client.task.add(job_id = job_id, task = task) except batch_models.batch_error.BatchErrorException as err: util.print_batch_exception(err) - if err.error.code != "JobExists": + if err.error.code != 'JobExists': raise else: - print("Job {!r} already exists".format(job_id)) + print('Job {!r} already exists'.format(job_id)) # Wait for the app to finish if wait == True: @@ -249,19 +237,23 @@ def create_user( def get_cluster_details( batch_client, pool_id): + """ + print out specified cluster info + """ pool = batch_client.pool.get(pool_id) if (pool.state == batch_models.PoolState.deleting): print nodes = batch_client.compute_node.list(pool_id=pool_id) - visible_state = pool.allocation_state.value if pool.state.value is "active" else pool.state.value - node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated) if pool.state.value is "resizing" or (pool.state.value is "deleting" and pool.allocation_state.value is "resizing") else '{}'.format(pool.current_dedicated) + visible_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value + node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated) if pool.state.value is 'resizing' or (pool.state.value is 'deleting' and pool.allocation_state.value is 'resizing') else '{}'.format(pool.current_dedicated) - print("State: {}".format(visible_state)) - print("Node Size: {}".format(pool.vm_size)) - print("Nodes: {}".format(node_count)) + print() + print('State: {}'.format(visible_state)) + print('Node Size: {}'.format(pool.vm_size)) + print('Nodes: {}'.format(node_count)) print() - node_label = "Nodes" + node_label = 'Nodes' print_format = '{:<34}| {:<15} | {:<21}| {:<8}' print_format_underline = '{:-<34}|{:-<17}|{:-<22}|{:-<8}' print(print_format.format(node_label, 'State', 'IP:Port', 'Master')) @@ -271,12 +263,15 @@ def get_cluster_details( for node in nodes: ip, port = util.get_connection_info(batch_client, pool_id, node.id) - print (print_format.format(node.id, node.state.value, "{}:{}".format(ip, port), - "*" if node.id == master_node else "")) + print (print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port), + '*' if node.id == master_node else '')) print() def list_clusters( batch_client): + """ + print out all clusters + """ print_format = '{:<34}| {:<10}| {:<20}| {:<7}' print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}' @@ -284,10 +279,10 @@ def list_clusters( print(print_format.format('Cluster', 'State', 'VM Size', 'Nodes')) print(print_format_underline.format('','','','')) for pool in pools: - pool_state = pool.allocation_state.value if pool.state.value is "active" else pool.state.value + pool_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value node_count = pool.current_dedicated - if pool_state is "resizing" or (pool_state is "deleting" and pool.allocation_state.value is "resizing"): + if pool_state is 'resizing' or (pool_state is 'deleting' and pool.allocation_state.value is 'resizing'): node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated) print(print_format.format(pool.id, @@ -310,9 +305,9 @@ def delete_cluster( if batch_client.pool.exists(pool_id) == True: batch_client.pool.delete(pool_id) batch_client.job.delete(job_id) - print("\nThe pool, '%s', is being deleted" % pool_id) + print('The pool, \'{}\', is being deleted'.format(pool_id)) else: - print("\nThe pool, '%s', does not exist" % pool_id) + print('The pool, \'{}\', does not exist'.format(pool_id)) def ssh( batch_client, @@ -343,25 +338,24 @@ def ssh( master_node_port = remote_login_settings.remote_login_port # build ssh tunnel command - ssh_command = "ssh " + ssh_command = 'ssh ' if masterui is not None: - ssh_command += "-L " + str(masterui) + ":localhost:" + str(constants._MASTER_UI_PORT) + " " + ssh_command += '-L ' + str(masterui) + ':localhost:' + str(constants._MASTER_UI_PORT) + ' ' if webui is not None: - ssh_command += "-L " + str(webui) + ":localhost:" + str(constants._WEBUI_PORT) + " " + ssh_command += '-L ' + str(webui) + ':localhost:' + str(constants._WEBUI_PORT) + ' ' if jupyter is not None: - ssh_command += "-L " + str(jupyter) + ":localhost:" + str(constants._JUPYTER_PORT) + " " + ssh_command += '-L ' + str(jupyter) + ':localhost:' + str(constants._JUPYTER_PORT) + ' ' if ports is not None: for port in ports: - ssh_command += "-L " + str(port[0]) + ":localhost:" + str(port[1]) + " " + ssh_command += '-L ' + str(port[0]) + ':localhost:' + str(port[1]) + ' ' - user = username if username is not None else ""; - ssh_command += user + "@" + str(master_node_ip) + " -p " + str(master_node_port) + user = username if username is not None else ''; + ssh_command += user + '@' + str(master_node_ip) + ' -p ' + str(master_node_port) ssh_command_array = ssh_command.split() if (not connect): print('\nuse the following command to connect to your spark head node:') - print() - print('\t%s' % ssh_command) - print() + print('\n\t{}\n'.format(ssh_command)) else: call(ssh_command_array) + diff --git a/dtde/util.py b/dtde/util.py index 675c06a8..949c86dc 100644 --- a/dtde/util.py +++ b/dtde/util.py @@ -12,7 +12,6 @@ _STANDARD_OUT_FILE_NAME = 'stdout.txt' _STANDARD_ERROR_FILE_NAME = 'stderr.txt' -# _SAMPLES_CONFIG_FILE_NAME = 'configuration.cfg' def create_batch_client( batch_account_key, @@ -20,8 +19,10 @@ def create_batch_client( batch_service_url): """ Creates a batch client object + :param str batch_account_key: batch account key + :param str batch_account_name: batch account name + :param str batch_service_url: batch service url """ - # Set up SharedKeyCredentials credentials = batch_auth.SharedKeyCredentials( batch_account_name, @@ -44,8 +45,10 @@ def create_blob_client( storage_account_suffix): """ Creates a blob client object + :param str storage_account_key: storage account key + :param str storage_account_name: storage account name + :param str storage_account_suffix: storage account suffix """ - # Set up BlockBlobStorage blob_client = blob.BlockBlobService( account_name = storage_account_name, @@ -55,7 +58,8 @@ def create_blob_client( return blob_client def wait_for_tasks_to_complete(batch_client, job_id, timeout): - """Waits for all the tasks in a particular job to complete. + """ + Waits for all the tasks in a particular job to complete. :param batch_client: The batch client to use. :type batch_client: `batchserviceclient.BatchServiceClient` :param str job_id: The id of the job to monitor. @@ -78,7 +82,6 @@ def wait_for_tasks_to_complete(batch_client, job_id, timeout): def upload_file_to_container(block_blob_client, container_name, file_path, use_full_path): """ Uploads a local file to an Azure Blob storage container. - :param block_blob_client: A blob service client. :type block_blob_client: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the Azure Blob storage container. @@ -87,18 +90,12 @@ def upload_file_to_container(block_blob_client, container_name, file_path, use_f :return: A ResourceFile initialized with a SAS URL appropriate for Batch tasks. """ - blob_name = None if (use_full_path): blob_name = file_path else: blob_name = os.path.basename(file_path) - ''' - print('\nUploading file {} to container [{}]...'.format(file_path, - container_name)) - ''' - block_blob_client.create_container(container_name, fail_on_exist=False) @@ -120,7 +117,8 @@ def upload_file_to_container(block_blob_client, container_name, file_path, use_f blob_source=sas_url) def print_configuration(config): - """Prints the configuration being used as a dictionary + """ + Prints the configuration being used as a dictionary :param config: The configuration. :type config: `configparser.ConfigParser` """ @@ -131,6 +129,11 @@ def print_configuration(config): print(configuration_dict) def get_master_node_id(batch_client, pool_id): + """ + Uploads a local file to an Azure Blob storage container. + :param block_blob_client: A blob service client. + :type block_blob_client: `azure.storage.blob.BlockBlobService` + """ # Currently, the jobId == poolId so this is safe to assume job_id = pool_id tasks = batch_client.task.list(job_id=job_id) @@ -148,14 +151,14 @@ def get_master_node_id(batch_client, pool_id): return "" def create_pool_if_not_exist(batch_client, pool, wait=True): - """Creates the specified pool if it doesn't already exist + """ + Creates the specified pool if it doesn't already exist :param batch_client: The batch client to use. :type batch_client: `batchserviceclient.BatchServiceClient` :param pool: The pool to create. :type pool: `batchserviceclient.models.PoolAddParameter` """ try: - # print("\nAttempting to create pool:", pool.id) batch_client.pool.add(pool) if wait: wait_for_all_nodes_state(batch_client, pool, frozenset( @@ -163,17 +166,18 @@ def create_pool_if_not_exist(batch_client, pool, wait=True): batch_models.ComputeNodeState.unusable, batch_models.ComputeNodeState.idle) )) - print("\nCreated pool:", pool.id) + print("Created pool: {}".format(pool.id)) else: - print("\nCreating pool:", pool.id) + print("Creating pool: {}".format(pool.id)) except batch_models.BatchErrorException as e: if e.error.code != "PoolExists": raise else: - print("\nPool {!r} already exists".format(pool.id)) + print("Pool {!r} already exists.".format(pool.id)) def wait_for_all_nodes_state(batch_client, pool, node_state): - """Waits for all nodes in pool to reach any specified state in set + """ + Waits for all nodes in pool to reach any specified state in set :param batch_client: The batch client to use. :type batch_client: `batchserviceclient.BatchServiceClient` :param pool: The pool containing the node. @@ -183,7 +187,6 @@ def wait_for_all_nodes_state(batch_client, pool, node_state): :return: list of `batchserviceclient.models.ComputeNode` """ print('Waiting for all nodes in pool {} to reach desired state...'.format(pool.id)) - i = 0 while True: # refresh pool to ensure that there is no resize error pool = batch_client.pool.get(pool.id) @@ -196,17 +199,16 @@ def wait_for_all_nodes_state(batch_client, pool, node_state): if (len(nodes) >= pool.target_dedicated and all(node.state in node_state for node in nodes)): return nodes - i += 1 ''' - if i % 3 == 0: - print('waiting for {} nodes to reach desired state...'.format( - pool.target_dedicated)) + print('waiting for {} nodes to reach desired state...'.format( + pool.target_dedicated)) ''' - time.sleep(10) + time.sleep(1) def select_latest_verified_vm_image_with_node_agent_sku( batch_client, publisher, offer, sku_starts_with): - """Select the latest verified image that Azure Batch supports given + """ + Select the latest verified image that Azure Batch supports given a publisher, offer and sku (starts with filter). :param batch_client: The batch client to use. :type batch_client: `batchserviceclient.BatchServiceClient` @@ -235,7 +237,8 @@ def select_latest_verified_vm_image_with_node_agent_sku( def create_sas_token( block_blob_client, container_name, blob_name, permission, expiry=None, timeout=None): - """Create a blob sas token + """ + Create a blob sas token :param block_blob_client: The storage block blob client to use. :type block_blob_client: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the container to upload the blob to. @@ -259,8 +262,8 @@ def create_sas_token( def upload_blob_and_create_sas( block_blob_client, container_name, blob_name, file_name, expiry, timeout=None): - """Uploads a file from local disk to Azure Storage and creates - a SAS for it. + """ + Uploads a file from local disk to Azure Storage and creates a SAS for it. :param block_blob_client: The storage block blob client to use. :type block_blob_client: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the container to upload the blob to. @@ -299,7 +302,8 @@ def upload_blob_and_create_sas( def wrap_commands_in_shell(commands): - """Wrap commands in a shell + """ + Wrap commands in a shell :param list commands: list of commands to wrap :param str ostype: OS type, linux or windows :rtype: str @@ -309,6 +313,13 @@ def wrap_commands_in_shell(commands): ';'.join(commands)) def get_connection_info(batch_client, pool_id, node_id): + """ + Get connection info of specified node in pool + :param batch_client: The batch client to use. + :type batch_client: `batchserviceclient.BatchServiceClient` + :param str pool_id: The pool id to look up + :param str node_id: The node id to look up + """ rls = batch_client.compute_node.get_remote_login_settings( pool_id, node_id) remote_ip = rls.remote_login_ip_address From 6bf092a4fe14745ff0b4b4769290e41abdc53a12 Mon Sep 17 00:00:00 2001 From: jiata Date: Thu, 15 Jun 2017 01:11:03 -0700 Subject: [PATCH 2/2] streamlined command params --- bin/spark-cluster-create | 14 +++++++------- bin/spark-cluster-create-user | 2 +- bin/spark-cluster-delete | 4 ++-- bin/spark-cluster-get | 2 +- bin/spark-cluster-ssh | 2 +- bin/spark-submit | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/bin/spark-cluster-create b/bin/spark-cluster-create index 30ea475f..eb1b4011 100755 --- a/bin/spark-cluster-create +++ b/bin/spark-cluster-create @@ -23,11 +23,11 @@ if __name__ == '__main__': # parse arguments parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument('--cluster-id', required=True, + parser.add_argument('--id', dest='cluster_id', required=True, help='The unique id of your spark cluster') - parser.add_argument('--cluster-size', type=int, required=True, + parser.add_argument('--size', type=int, required=True, help='Number of vms in your cluster') - parser.add_argument('--cluster-vm-size', required=True, + parser.add_argument('--vm-size', required=True, help='VM size for nodes in your cluster') parser.add_argument('--custom-script', help='Absolute path of custom bash script (.sh) to run on each node') @@ -40,11 +40,11 @@ if __name__ == '__main__': if args.cluster_id is not None: pool_id = args.cluster_id - if args.cluster_size is not None: - vm_count = args.cluster_size + if args.size is not None: + vm_count = args.size - if args.cluster_vm_size is not None: - vm_size = args.cluster_vm_size + if args.vm_size is not None: + vm_size = args.vm_size if args.custom_script is not None: custom_script = args.custom_script diff --git a/bin/spark-cluster-create-user b/bin/spark-cluster-create-user index 3d5c31ad..8c684247 100755 --- a/bin/spark-cluster-create-user +++ b/bin/spark-cluster-create-user @@ -21,7 +21,7 @@ if __name__ == '__main__': # parse arguments parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument('--cluster-id', required=True, + parser.add_argument('--id', dest='cluster_id', required=True, help='The unique id of your spark cluster') parser.add_argument('-u', '--username', help='The usernameto access your spark cluster\'s head node') diff --git a/bin/spark-cluster-delete b/bin/spark-cluster-delete index d00cef33..51affcea 100755 --- a/bin/spark-cluster-delete +++ b/bin/spark-cluster-delete @@ -19,9 +19,9 @@ if __name__ == '__main__': # parse arguments parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument('--cluster-id', required=True, + parser.add_argument(dest='cluster_id', help='The unique id of your spark cluster') - + args = parser.parse_args() if args.cluster_id is not None: diff --git a/bin/spark-cluster-get b/bin/spark-cluster-get index ba10736a..0ddbbd36 100755 --- a/bin/spark-cluster-get +++ b/bin/spark-cluster-get @@ -19,7 +19,7 @@ if __name__ == '__main__': # parse arguments parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument('--cluster-id', required=True, + parser.add_argument(dest='cluster_id', help='The unique id of your spark cluster') args = parser.parse_args() diff --git a/bin/spark-cluster-ssh b/bin/spark-cluster-ssh index 58e35ea5..906ebc60 100755 --- a/bin/spark-cluster-ssh +++ b/bin/spark-cluster-ssh @@ -24,7 +24,7 @@ if __name__ == '__main__': # parse arguments parser = argparse.ArgumentParser(prog='az_spark') - parser.add_argument('--cluster-id', required=True, + parser.add_argument('--id', dest="cluster_id", required=True, help='The unique id of your spark cluster') parser.add_argument('--masterui', help='Local port to port spark\'s master UI to') diff --git a/bin/spark-submit b/bin/spark-submit index 5b98e669..9dfe45c5 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -37,13 +37,13 @@ if __name__ == '__main__': parser = argparse.ArgumentParser( prog='az-spark', usage='spark-submit \n \ - --cluster-id CLUSTER_ID \n \ + --id CLUSTER_ID \n \ --name APP_NAME \n \ [options] \n \ \n \ [app arguments]') - parser.add_argument('--cluster-id', required=True, + parser.add_argument('--id', dest='cluster_id', required=True, help='The unique id of your spark cluster') parser.add_argument('--name', required=True,