This repository has been archived by the owner on Feb 3, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feature: Plugin V2: Running plugin on host (#461)
- Loading branch information
1 parent
12450fb
commit de78983
Showing
43 changed files
with
481 additions
and
230 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ | |
""" | ||
|
||
from .configuration_base import * | ||
from .docker_cmd import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import os | ||
from aztk.utils.command_builder import CommandBuilder | ||
|
||
class DockerCmd: | ||
""" | ||
Class helping to write a docker command | ||
""" | ||
|
||
def __init__(self, name: str, docker_repo: str, cmd: str, gpu_enabled=False): | ||
if gpu_enabled: | ||
self.cmd = CommandBuilder('nvidia-docker run') | ||
else: | ||
self.cmd = CommandBuilder('docker run') | ||
self.cmd.add_option('--net', 'host') | ||
self.cmd.add_option('--name', name) | ||
self.cmd.add_argument('-d') | ||
self.cmd.add_argument(docker_repo) | ||
self.cmd.add_argument(cmd) | ||
|
||
|
||
def add_env(self, env: str, value: str): | ||
self.cmd.add_option('-e', '{0}={1}'.format(env, value)) | ||
|
||
def pass_env(self, env: str): | ||
""" | ||
Give the value of an environment variable in the main process to the docker image | ||
""" | ||
self.cmd.add_option('-e', '{0}'.format(env)) | ||
|
||
def share_folder(self, folder: str): | ||
self.cmd.add_option('-v', '{0}:{0}'.format(folder)) | ||
|
||
def open_port(self, port: int): | ||
self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI | ||
|
||
|
||
def to_str(self): | ||
return self.cmd.to_str() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,74 @@ | ||
import os | ||
from core import config | ||
from install import pick_master, spark, scripts, create_user, plugins | ||
from install import pick_master, spark, scripts, create_user, plugins, spark_container | ||
import wait_until_master_selected | ||
from aztk.models.plugins import PluginTarget | ||
from aztk.internal import cluster_data | ||
|
||
def read_cluster_config(): | ||
data = cluster_data.ClusterData(config.blob_client, config.pool_id) | ||
cluster_config = data.read_cluster_config() | ||
print("Got cluster config", cluster_config) | ||
return cluster_config | ||
|
||
def setup_node(): | ||
def setup_host(docker_repo: str): | ||
""" | ||
Code to be run on the node(NOT in a container) | ||
""" | ||
client = config.batch_client | ||
|
||
create_user.create_user(batch_client=client) | ||
|
||
spark.setup_conf() | ||
|
||
if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['MIXED_MODE'] == "False": | ||
if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['AZTK_MIXED_MODE'] == "False": | ||
is_master = pick_master.find_master(client) | ||
else: | ||
is_master = False | ||
wait_until_master_selected.main() | ||
|
||
is_worker = not is_master or os.environ["AZTK_WORKER_ON_MASTER"] | ||
master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id)) | ||
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id) | ||
|
||
os.environ["MASTER_IP"] = master_node.ip_address | ||
|
||
if is_master: | ||
setup_as_master() | ||
plugins.setup_plugins(is_master=True, is_worker=True) | ||
scripts.run_custom_scripts(is_master=True, is_worker=True) | ||
else: | ||
setup_as_worker() | ||
plugins.setup_plugins(is_master=False, is_worker=True) | ||
scripts.run_custom_scripts(is_master=False, is_worker=True) | ||
os.environ["AZTK_IS_MASTER"] = "1" | ||
if is_worker: | ||
os.environ["AZTK_IS_WORKER"] = "1" | ||
|
||
open("/tmp/setup_complete", 'a').close() | ||
os.environ["AZTK_MASTER_IP"] = master_node.ip_address | ||
|
||
cluster_conf = read_cluster_config() | ||
|
||
spark_container.start_spark_container( | ||
docker_repo=docker_repo, | ||
gpu_enabled=os.environ.get("AZTK_GPU_ENABLED") == "true", | ||
plugins=cluster_conf.plugins, | ||
) | ||
plugins.setup_plugins(target=PluginTarget.Host, is_master=is_master, is_worker=is_worker) | ||
|
||
|
||
def setup_spark_container(): | ||
""" | ||
Code run in the main spark container | ||
""" | ||
is_master = os.environ["AZTK_IS_MASTER"] | ||
is_worker = os.environ["AZTK_IS_WORKER"] | ||
print("Setting spark container. Master: ", is_master, ", Worker: ", is_worker) | ||
|
||
print("Copying spark setup config") | ||
spark.setup_conf() | ||
print("Done copying spark setup config") | ||
|
||
master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id)) | ||
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id) | ||
|
||
def setup_as_master(): | ||
print("Setting up as master.") | ||
spark.setup_connection() | ||
spark.start_spark_master() | ||
if os.environ["WORKER_ON_MASTER"] == "True": | ||
|
||
if is_master: | ||
spark.start_spark_master() | ||
|
||
if is_worker: | ||
spark.start_spark_worker() | ||
|
||
def setup_as_worker(): | ||
print("Setting up as worker.") | ||
spark.setup_connection() | ||
spark.start_spark_worker() | ||
plugins.setup_plugins(target=PluginTarget.SparkContainer, is_master=is_master, is_worker=is_worker) | ||
scripts.run_custom_scripts(is_master=is_master, is_worker=is_worker) | ||
|
||
open("/tmp/setup_complete", 'a').close() |
Oops, something went wrong.