Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: Spark add output logs flag (#468)
Browse files Browse the repository at this point in the history
* add output flag to cluster submit

* add output flag to cluster app-logs

* add output flag to job get-app-logs

* sort imports

* make spinner context
  • Loading branch information
jafreck authored Apr 5, 2018
1 parent 8889059 commit 32de752
Show file tree
Hide file tree
Showing 22 changed files with 113 additions and 59 deletions.
4 changes: 2 additions & 2 deletions aztk_cli/spark/endpoints/cluster/cluster_add_user.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import argparse
import typing

import aztk.spark
from aztk_cli import log
from aztk_cli import utils, config
from aztk_cli import config, log, utils


def setup_parser(parser: argparse.ArgumentParser):
Expand Down
20 changes: 16 additions & 4 deletions aztk_cli/spark/endpoints/cluster/cluster_app_logs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import os
import typing

import aztk
from aztk_cli import config, utils

from aztk_cli import utils, config

def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand All @@ -14,7 +16,12 @@ def setup_parser(parser: argparse.ArgumentParser):
required=True,
help='The unique id of your job name')

parser.add_argument('--tail', dest='tail', action='store_true')
output_group = parser.add_mutually_exclusive_group()

output_group.add_argument('--output',
help='Path to the file you wish to output to. If not \
specified, output is printed to stdout')
output_group.add_argument('--tail', dest='tail', action='store_true')


def execute(args: typing.NamedTuple):
Expand All @@ -23,5 +30,10 @@ def execute(args: typing.NamedTuple):
if args.tail:
utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.app_name)
else:
app_logs = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name)
print(app_logs.log)
app_log = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name)
if args.output:
with utils.Spinner():
with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f:
f.write(app_log.log)
else:
print(app_log.log)
1 change: 1 addition & 0 deletions aztk_cli/spark/endpoints/cluster/cluster_copy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import typing

import aztk.spark
from aztk_cli import config

Expand Down
23 changes: 9 additions & 14 deletions aztk_cli/spark/endpoints/cluster/cluster_create.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os
import argparse
import os
import typing

import aztk.spark
from aztk.spark.models import ClusterConfiguration, UserConfiguration
from aztk_cli import log
from aztk_cli import config, log, utils
from aztk_cli.config import load_aztk_spark_config
from aztk_cli import utils, config


def setup_parser(parser: argparse.ArgumentParser):
Expand Down Expand Up @@ -70,19 +70,14 @@ def execute(args: typing.NamedTuple):
cluster_conf.user_configuration = None

utils.print_cluster_conf(cluster_conf, wait)
spinner = utils.Spinner()
spinner.start()

# create spark cluster
cluster = spark_client.create_cluster(
cluster_conf,
wait=wait
)

spinner.stop()
with utils.Spinner():
# create spark cluster
cluster = spark_client.create_cluster(
cluster_conf,
wait=wait
)

if wait:
log.info("Cluster %s created successfully.", cluster.id)
else:
log.info("Cluster %s is being provisioned.", cluster.id)

3 changes: 2 additions & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_delete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import argparse
import typing

import aztk
from aztk_cli import log, config
from aztk_cli import config, log


def setup_parser(parser: argparse.ArgumentParser):
Expand Down
4 changes: 2 additions & 2 deletions aztk_cli/spark/endpoints/cluster/cluster_get.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import argparse
import typing

import aztk
from aztk_cli import log
from aztk_cli import utils, config
from aztk_cli import config, log, utils


def setup_parser(parser: argparse.ArgumentParser):
Expand Down
3 changes: 2 additions & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_list.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import argparse
import typing

import aztk
from aztk_cli import utils, config
from aztk_cli import config, utils


def setup_parser(_: argparse.ArgumentParser):
Expand Down
4 changes: 3 additions & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import argparse
import typing

import aztk.spark
from aztk_cli import utils, config
from aztk_cli import config, utils


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand Down
10 changes: 5 additions & 5 deletions aztk_cli/spark/endpoints/cluster/cluster_ssh.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import argparse
import typing
from aztk_cli import log
from aztk_cli import utils, config
from aztk_cli.config import SshConfig
import aztk

import azure.batch.models.batch_error as batch_error

import aztk
from aztk.models import ClusterConfiguration
from aztk_cli import config, log, utils
from aztk_cli.config import SshConfig


def setup_parser(parser: argparse.ArgumentParser):
Expand Down Expand Up @@ -106,4 +107,3 @@ def print_plugin_ports(cluster_config: ClusterConfiguration):

url = "{0}{1}".format(http_prefix, port.public_port)
utils.log_property(label, url)

23 changes: 20 additions & 3 deletions aztk_cli/spark/endpoints/cluster/cluster_submit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import os
import sys
import typing
from aztk_cli import utils, config, log

import aztk.spark
from aztk_cli import config, log, utils


def setup_parser(parser: argparse.ArgumentParser):
Expand Down Expand Up @@ -64,6 +66,10 @@ def setup_parser(parser: argparse.ArgumentParser):
help='Number of times the Spark job may be retried \
if there is a failure')

parser.add_argument('--output',
help='Path to the file you wish to output to. If not \
specified, output is printed to stdout')

parser.add_argument('app',
help='App jar OR python file to execute. Use absolute \
path to reference file.')
Expand All @@ -73,6 +79,9 @@ def setup_parser(parser: argparse.ArgumentParser):


def execute(args: typing.NamedTuple):
if not args.wait and args.output:
raise aztk.error.AztkError("--output flag requires --wait flag")

spark_client = aztk.spark.Client(config.load_aztk_secrets())
jars = []
py_files = []
Expand Down Expand Up @@ -141,6 +150,14 @@ def execute(args: typing.NamedTuple):
)

if args.wait:
exit_code = utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.name)
sys.exit(exit_code)
if not args.output:
exit_code = utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.name)
else:
with utils.Spinner():
spark_client.wait_until_application_done(cluster_id=args.cluster_id, task_id=args.name)
application_log = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.name)
with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f:
f.write(application_log.log)
exit_code = application_log.exit_code

sys.exit(exit_code)
3 changes: 2 additions & 1 deletion aztk_cli/spark/endpoints/init.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import argparse
import os
import typing
from distutils.dir_util import copy_tree

import aztk.utils.constants as constants


Expand Down
3 changes: 2 additions & 1 deletion aztk_cli/spark/endpoints/job/delete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import argparse
import typing

import aztk.spark
from aztk_cli import log, config
from aztk_cli import config, log


def setup_parser(parser: argparse.ArgumentParser):
Expand Down
7 changes: 4 additions & 3 deletions aztk_cli/spark/endpoints/job/get.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import typing
import time
import typing

import aztk.spark
from aztk_cli import config
from aztk_cli import utils
from aztk_cli import config, utils


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand Down
7 changes: 4 additions & 3 deletions aztk_cli/spark/endpoints/job/get_app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import typing
import time
import typing

import aztk.spark
from aztk_cli import config
from aztk_cli import utils
from aztk_cli import config, utils


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand Down
17 changes: 14 additions & 3 deletions aztk_cli/spark/endpoints/job/get_app_logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import argparse
import os
import typing

import aztk.spark
from aztk_cli import utils, config
from aztk_cli import config, utils


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand All @@ -12,9 +15,17 @@ def setup_parser(parser: argparse.ArgumentParser):
dest='app_name',
required=True,
help='The unique id of your job name')
parser.add_argument('--output',
help='Path to the file you wish to output to. If not \
specified, output is printed to stdout')


def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets())
app_logs = spark_client.get_job_application_log(args.job_id, args.app_name)
print(app_logs.log)
app_log = spark_client.get_job_application_log(args.job_id, args.app_name)
if args.output:
with utils.Spinner():
with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f:
f.write(app_log.log)
else:
print(app_log.log)
7 changes: 4 additions & 3 deletions aztk_cli/spark/endpoints/job/list.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import typing
import time
import typing

import aztk.spark
from aztk_cli import config
from aztk_cli import utils
from aztk_cli import config, utils


def setup_parser(_: argparse.ArgumentParser):
# No arguments for list yet
Expand Down
5 changes: 3 additions & 2 deletions aztk_cli/spark/endpoints/job/list_apps.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import argparse
import typing

import aztk.spark
from aztk_cli import config
from aztk_cli import utils
from aztk_cli import config, utils


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand Down
7 changes: 4 additions & 3 deletions aztk_cli/spark/endpoints/job/stop.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import typing
import time
import typing

import aztk.spark
from aztk_cli import config
from aztk_cli import utils
from aztk_cli import config, utils


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
Expand Down
7 changes: 3 additions & 4 deletions aztk_cli/spark/endpoints/job/stop_app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import argparse
import typing
import time
import typing

import aztk.spark
from aztk_cli import config
from aztk_cli import utils
from aztk_cli import log
from aztk_cli import config, log, utils


def setup_parser(parser: argparse.ArgumentParser):
Expand Down
5 changes: 3 additions & 2 deletions aztk_cli/spark/endpoints/job/submit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import argparse
import typing
import time
import typing

import aztk.spark
from aztk_cli import config, utils, log
from aztk_cli import config, log, utils
from aztk_cli.config import JobConfig, load_aztk_spark_config


Expand Down
3 changes: 2 additions & 1 deletion aztk_cli/spark/endpoints/spark.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import typing

from . import init
from .cluster import cluster
from .job import job
from . import init


def setup_parser(parser: argparse.ArgumentParser):
subparsers = parser.add_subparsers(
Expand Down
6 changes: 6 additions & 0 deletions aztk_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ def __init__(self, delay=None):
self.spinner_generator = self.spinning_cursor()
if delay and float(delay): self.delay = delay

def __enter__(self):
return self.start()

def __exit__(self, exc_type, exc_val, exc_tb):
return self.stop()

def spinner_task(self):
while self.busy:
sys.stdout.write(next(self.spinner_generator))
Expand Down

0 comments on commit 32de752

Please sign in to comment.