Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: Plugin V2: Running plugin on host (#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
timotheeguerin authored Apr 24, 2018
1 parent 12450fb commit de78983
Show file tree
Hide file tree
Showing 43 changed files with 481 additions and 230 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@
"python.formatting.provider": "yapf",
"python.formatting.yapfArgs": [
"--style=.style.yapf"
]
],
"python.venvPath": "${workspaceFolder}/ENV",
"python.pythonPath": "${workspaceFolder}\\ENV\\Scripts\\python.exe"
}
2 changes: 1 addition & 1 deletion aztk/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class AztkError(Exception):
def __init__(self, message: str = None):
def __init__(self, message: str=None):
super().__init__(message)

class ClusterNotReadyError(AztkError):
Expand Down
1 change: 1 addition & 0 deletions aztk/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
"""

from .configuration_base import *
from .docker_cmd import *
6 changes: 3 additions & 3 deletions aztk/internal/cluster_data/node_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import fnmatch
import io
import json
import os
import yaml
import zipfile
from pathlib import Path
from typing import List
import yaml
from aztk.spark import models
from aztk.utils import constants, file_utils, secure_utils
from aztk.error import InvalidCustomScriptError
Expand Down Expand Up @@ -147,7 +146,8 @@ def _add_plugins(self):
execute='{0}/{1}'.format(plugin.name, plugin.execute),
args=plugin.args,
env=plugin.env,
runOn=plugin.run_on.value,
target=plugin.target.value,
target_role=plugin.target_role.value,
))

self.zipf.writestr(os.path.join('plugins', 'plugins-manifest.yaml'), yaml.dump(data))
Expand Down
38 changes: 38 additions & 0 deletions aztk/internal/docker_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
from aztk.utils.command_builder import CommandBuilder

class DockerCmd:
"""
Class helping to write a docker command
"""

def __init__(self, name: str, docker_repo: str, cmd: str, gpu_enabled=False):
if gpu_enabled:
self.cmd = CommandBuilder('nvidia-docker run')
else:
self.cmd = CommandBuilder('docker run')
self.cmd.add_option('--net', 'host')
self.cmd.add_option('--name', name)
self.cmd.add_argument('-d')
self.cmd.add_argument(docker_repo)
self.cmd.add_argument(cmd)


def add_env(self, env: str, value: str):
self.cmd.add_option('-e', '{0}={1}'.format(env, value))

def pass_env(self, env: str):
"""
Give the value of an environment variable in the main process to the docker image
"""
self.cmd.add_option('-e', '{0}'.format(env))

def share_folder(self, folder: str):
self.cmd.add_option('-v', '{0}:{0}'.format(folder))

def open_port(self, port: int):
self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI


def to_str(self):
return self.cmd.to_str()
1 change: 1 addition & 0 deletions aztk/models/plugins/internal/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PluginManager:
jupyterlab=plugins.JupyterLabPlugin,
rstudio_server=plugins.RStudioServerPlugin,
hdfs=plugins.HDFSPlugin,
simple=plugins.SimplePlugin,
spark_ui_proxy=plugins.SparkUIProxyPlugin,
)

Expand Down
52 changes: 35 additions & 17 deletions aztk/models/plugins/plugin_configuration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import inspect
from typing import List, Union
from enum import Enum
from .plugin_file import PluginFile
from typing import List, Union
from aztk.internal import ConfigurationBase
from aztk.error import InvalidPluginConfigurationError
from .plugin_file import PluginFile

class PluginTarget(Enum):
"""
Where this plugin should run
"""
SparkContainer = "spark-container",
Host = "host",


class PluginTargetRole(Enum):
Master = "master"
Worker = "worker"
All = "all-nodes"



class PluginPort:
"""
Expand All @@ -12,8 +27,7 @@ class PluginPort:
:param name: [Optional] name to differentiate ports if you have multiple
"""

def __init__(self, internal: int, public: Union[int, bool] = False, name=None):

def __init__(self, internal: int, public: Union[int, bool]=False, name=None):
self.internal = internal
self.expose_publicly = bool(public)
self.public_port = None
Expand All @@ -26,11 +40,6 @@ def __init__(self, internal: int, public: Union[int, bool] = False, name=None):
self.name = name


class PluginRunTarget(Enum):
Master = "master"
Worker = "worker"
All = "all-nodes"



class PluginConfiguration(ConfigurationBase):
Expand All @@ -45,15 +54,17 @@ class PluginConfiguration(ConfigurationBase):

def __init__(self,
name: str,
ports: List[PluginPort] = None,
files: List[PluginFile] = None,
execute: str = None,
ports: List[PluginPort]=None,
files: List[PluginFile]=None,
execute: str=None,
args=None,
env=None,
run_on: PluginRunTarget = PluginRunTarget.Master):
target_role: PluginTargetRole=PluginTargetRole.Master,
target: PluginTarget=PluginTarget.SparkContainer):
self.name = name
# self.docker_image = docker_image
self.run_on = run_on
self.target = target
self.target_role = target_role
self.ports = ports or []
self.files = files or []
self.args = args or []
Expand All @@ -64,11 +75,18 @@ def has_arg(self, name: str):
for x in self.args:
if x.name == name:
return True
else:
return False
return False

def validate(self):
self._validate_required([
"name",
"execute",
])

if not isinstance(self.target, PluginTarget):
raise InvalidPluginConfigurationError(
"Target must be of type Plugin target but was {0}".format(self.target))

if not isinstance(self.target_role, PluginTargetRole):
raise InvalidPluginConfigurationError(
"Target role must be of type Plugin target role but was {0}".format(self.target))
22 changes: 11 additions & 11 deletions aztk/node_scripts/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
'/providers/[^/]+'
'/[^/]+Accounts/(?P<account>[^/]+)$')

batch_account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
batch_account_key = os.environ["BATCH_ACCOUNT_KEY"]
batch_service_url = os.environ["BATCH_SERVICE_URL"]
tenant_id = os.environ["SP_TENANT_ID"]
client_id = os.environ["SP_CLIENT_ID"]
credential = os.environ["SP_CREDENTIAL"]
batch_resource_id = os.environ["SP_BATCH_RESOURCE_ID"]
storage_resource_id = os.environ["SP_STORAGE_RESOURCE_ID"]
batch_account_name = os.environ.get("AZ_BATCH_ACCOUNT_NAME")
batch_account_key = os.environ.get("BATCH_ACCOUNT_KEY")
batch_service_url = os.environ.get("BATCH_SERVICE_URL")
tenant_id = os.environ.get("SP_TENANT_ID")
client_id = os.environ.get("SP_CLIENT_ID")
credential = os.environ.get("SP_CREDENTIAL")
batch_resource_id = os.environ.get("SP_BATCH_RESOURCE_ID")
storage_resource_id = os.environ.get("SP_STORAGE_RESOURCE_ID")

pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
Expand All @@ -33,9 +33,9 @@
spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"]
spark_job_ui_port = os.environ["SPARK_JOB_UI_PORT"]

storage_account_name = os.environ["STORAGE_ACCOUNT_NAME"]
storage_account_key = os.environ["STORAGE_ACCOUNT_KEY"]
storage_account_suffix = os.environ["STORAGE_ACCOUNT_SUFFIX"]
storage_account_name = os.environ.get("STORAGE_ACCOUNT_NAME")
storage_account_key = os.environ.get("STORAGE_ACCOUNT_KEY")
storage_account_suffix = os.environ.get("STORAGE_ACCOUNT_SUFFIX")

def get_blob_client() -> blob.BlockBlobService:
if not storage_resource_id:
Expand Down
11 changes: 6 additions & 5 deletions aztk/node_scripts/docker_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
# This file is the entry point of the docker container.

set -e
echo "Initializing spark container"

# --------------------
# Setup custom scripts
# --------------------
custom_script_dir=$DOCKER_WORKING_DIR/custom-scripts
aztk_dir=$DOCKER_WORKING_DIR/aztk
custom_script_dir=$AZTK_WORKING_DIR/custom-scripts
aztk_dir=$AZTK_WORKING_DIR/aztk

# -----------------------
# Preload jupyter samples
Expand All @@ -28,11 +29,11 @@ done
echo "Starting setup using Docker"

$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/pip install -r $(dirname $0)/requirements.txt
export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR
echo 'export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR' >> ~/.bashrc
export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR
echo 'export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR' >> ~/.bashrc

echo "Running main.py script"
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py install
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py setup-spark-container

# sleep to keep container running
while true; do sleep 1; done
6 changes: 3 additions & 3 deletions aztk/node_scripts/install/create_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from datetime import datetime, timezone, timedelta
import yaml
'''
Creates a user if the user configuration file at $DOCKER_WORKING_DIR/user.yaml exists
Creates a user if the user configuration file at $AZTK_WORKING_DIR/user.yaml exists
'''

def create_user(batch_client):
path = os.path.join(os.environ['DOCKER_WORKING_DIR'], "user.yaml")
path = os.path.join(os.environ['AZTK_WORKING_DIR'], "user.yaml")

if not os.path.isfile(path):
print("No user to create.")
Expand Down Expand Up @@ -43,7 +43,7 @@ def decrypt_password(user_conf):
tag = user_conf['tag']

# Read private key
with open(os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa'), encoding='UTF-8') as f:
with open(os.path.join(os.environ['AZTK_WORKING_DIR'], 'id_rsa'), encoding='UTF-8') as f:
private_key = RSA.import_key(f.read())
# Decrypt the session key with the public RSA key
cipher_rsa = PKCS1_OAEP.new(private_key)
Expand Down
75 changes: 51 additions & 24 deletions aztk/node_scripts/install/install.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,74 @@
import os
from core import config
from install import pick_master, spark, scripts, create_user, plugins
from install import pick_master, spark, scripts, create_user, plugins, spark_container
import wait_until_master_selected
from aztk.models.plugins import PluginTarget
from aztk.internal import cluster_data

def read_cluster_config():
data = cluster_data.ClusterData(config.blob_client, config.pool_id)
cluster_config = data.read_cluster_config()
print("Got cluster config", cluster_config)
return cluster_config

def setup_node():
def setup_host(docker_repo: str):
"""
Code to be run on the node(NOT in a container)
"""
client = config.batch_client

create_user.create_user(batch_client=client)

spark.setup_conf()

if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['MIXED_MODE'] == "False":
if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['AZTK_MIXED_MODE'] == "False":
is_master = pick_master.find_master(client)
else:
is_master = False
wait_until_master_selected.main()

is_worker = not is_master or os.environ["AZTK_WORKER_ON_MASTER"]
master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id))
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id)

os.environ["MASTER_IP"] = master_node.ip_address

if is_master:
setup_as_master()
plugins.setup_plugins(is_master=True, is_worker=True)
scripts.run_custom_scripts(is_master=True, is_worker=True)
else:
setup_as_worker()
plugins.setup_plugins(is_master=False, is_worker=True)
scripts.run_custom_scripts(is_master=False, is_worker=True)
os.environ["AZTK_IS_MASTER"] = "1"
if is_worker:
os.environ["AZTK_IS_WORKER"] = "1"

open("/tmp/setup_complete", 'a').close()
os.environ["AZTK_MASTER_IP"] = master_node.ip_address

cluster_conf = read_cluster_config()

spark_container.start_spark_container(
docker_repo=docker_repo,
gpu_enabled=os.environ.get("AZTK_GPU_ENABLED") == "true",
plugins=cluster_conf.plugins,
)
plugins.setup_plugins(target=PluginTarget.Host, is_master=is_master, is_worker=is_worker)


def setup_spark_container():
"""
Code run in the main spark container
"""
is_master = os.environ["AZTK_IS_MASTER"]
is_worker = os.environ["AZTK_IS_WORKER"]
print("Setting spark container. Master: ", is_master, ", Worker: ", is_worker)

print("Copying spark setup config")
spark.setup_conf()
print("Done copying spark setup config")

master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id))
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id)

def setup_as_master():
print("Setting up as master.")
spark.setup_connection()
spark.start_spark_master()
if os.environ["WORKER_ON_MASTER"] == "True":

if is_master:
spark.start_spark_master()

if is_worker:
spark.start_spark_worker()

def setup_as_worker():
print("Setting up as worker.")
spark.setup_connection()
spark.start_spark_worker()
plugins.setup_plugins(target=PluginTarget.SparkContainer, is_master=is_master, is_worker=is_worker)
scripts.run_custom_scripts(is_master=is_master, is_worker=is_worker)

open("/tmp/setup_complete", 'a').close()
Loading

0 comments on commit de78983

Please sign in to comment.