This repository has been archived by the owner on Feb 3, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* start implementation of cluster debug utility * update debug program * update debug * fix output directory structure * cleanup output, add error checking * sort imports * start untar * extract tar * add debug.py to pylintc ignore, line too long * crlf->lf * add app logs * call get_spark_app_logs, typos * add docs * remove debug.py from pylintrc ignore * added debug.py back to pylint ignore * change pylint ignore * remove commented log * update cluster_run * refactor cluster_copy * update debug, add spinner for run and copy * make new sdk cluster_download endpoint
- Loading branch information
Showing
10 changed files
with
383 additions
and
71 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import os | ||
from aztk.utils import ssh | ||
from aztk.utils.command_builder import CommandBuilder | ||
from aztk import models as aztk_models | ||
import azure.batch.models as batch_models | ||
|
||
def run(spark_client, cluster_id, output_directory): | ||
# copy debug program to each node | ||
spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True) | ||
ssh_cmd = _build_diagnostic_ssh_command() | ||
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True) | ||
local_path = os.path.join(os.path.abspath(output_directory), "debug", "debug.zip") | ||
remote_path = "/tmp/debug.zip" | ||
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True) | ||
# write run output to debug/ directory | ||
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f: | ||
[f.write(line + '\n') for node_id, result in run_output for line in result] | ||
return output | ||
|
||
|
||
def _build_diagnostic_ssh_command(): | ||
return "sudo rm -rf /tmp/debug.zip; "\ | ||
"sudo apt-get install -y python3-pip; "\ | ||
"sudo -H pip3 install --upgrade pip; "\ | ||
"sudo -H pip3 install docker; "\ | ||
"sudo python3 /tmp/debug.py" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
""" | ||
Diagnostic program that runs on each node in the cluster | ||
This program must be run with sudo | ||
""" | ||
import io | ||
import json | ||
import os | ||
import socket | ||
import tarfile | ||
from subprocess import STDOUT, CalledProcessError, check_output | ||
from zipfile import ZIP_DEFLATED, ZipFile | ||
|
||
import docker # pylint: disable=import-error | ||
|
||
|
||
def main(): | ||
zipf = create_zip_archive() | ||
|
||
# general node diagnostics | ||
zipf.writestr("hostname.txt", data=get_hostname()) | ||
zipf.writestr("df.txt", data=get_disk_free()) | ||
|
||
# docker container diagnostics | ||
docker_client = docker.from_env() | ||
for filename, data in get_docker_diagnostics(docker_client): | ||
zipf.writestr(filename, data=data) | ||
|
||
zipf.close() | ||
|
||
|
||
def create_zip_archive(): | ||
zip_file_path = "/tmp/debug.zip" | ||
return ZipFile(zip_file_path, "w", ZIP_DEFLATED) | ||
|
||
|
||
def get_hostname(): | ||
return socket.gethostname() | ||
|
||
|
||
def cmd_check_output(cmd): | ||
try: | ||
output = check_output(cmd, shell=True, stderr=STDOUT) | ||
except CalledProcessError as e: | ||
return "CMD: {0}\n"\ | ||
"returncode: {1}"\ | ||
"output: {2}".format(e.cmd, e.returncode, e.output) | ||
else: | ||
return output | ||
|
||
|
||
def get_disk_free(): | ||
return cmd_check_output("df -h") | ||
|
||
|
||
def get_docker_diagnostics(docker_client): | ||
''' | ||
returns list of tuples (filename, data) to be written in the zip | ||
''' | ||
output = [] | ||
output.append(get_docker_images(docker_client)) | ||
logs = get_docker_containers(docker_client) | ||
for item in logs: | ||
output.append(item) | ||
|
||
return output | ||
|
||
|
||
def get_docker_images(docker_client): | ||
output = "" | ||
try: | ||
images = docker_client.images.list() | ||
for image in images: | ||
output += json.dumps(image.attrs, sort_keys=True, indent=4) | ||
return ("docker-images.txt", output) | ||
except docker.errors.APIError as e: | ||
return ("docker-images.err", e.__str__()) | ||
|
||
|
||
def get_docker_containers(docker_client): | ||
container_attrs = "" | ||
logs = [] | ||
try: | ||
containers = docker_client.containers.list() | ||
for container in containers: | ||
container_attrs += json.dumps(container.attrs, sort_keys=True, indent=4) | ||
# get docker container logs | ||
logs.append((container.name + "/docker.log", container.logs())) | ||
logs.append(get_docker_process_status(container)) | ||
if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers | ||
logs.extend(get_container_aztk_script(container)) | ||
logs.extend(get_spark_logs(container)) | ||
logs.extend(get_spark_app_logs(container)) | ||
|
||
logs.append(("docker-containers.txt", container_attrs)) | ||
return logs | ||
except docker.errors.APIError as e: | ||
return [("docker-containers.err", e.__str__())] | ||
|
||
|
||
def get_docker_process_status(container): | ||
try: | ||
exit_code, output = container.exec_run("ps -auxw", tty=True, privileged=True) | ||
out_file_name = container.name + "/ps_aux.txt" | ||
if exit_code == 0: | ||
return (out_file_name, output) | ||
else: | ||
return (out_file_name, "exit_code: {0}\n{1}".format(exit_code, output)) | ||
except docker.errors.APIError as e: | ||
return (container.name + "ps_aux.err", e.__str__()) | ||
|
||
|
||
def get_container_aztk_script(container): | ||
aztk_path = "/mnt/batch/tasks/startup/wd" | ||
try: | ||
stream, _ = container.get_archive(aztk_path) # second item is stat info | ||
return extract_tar_in_memory(container, stream) | ||
except docker.errors.APIError as e: | ||
return (container.name + "/" + "aztk-scripts.err", e.__str__()) | ||
|
||
|
||
def get_spark_logs(container): | ||
spark_logs_path = "/home/spark-current/logs" | ||
try: | ||
stream, _ = container.get_archive(spark_logs_path) # second item is stat info | ||
return extract_tar_in_memory(container, stream) | ||
except docker.errors.APIError as e: | ||
return [(container.name + "/" + "spark-logs.err", e.__str__())] | ||
|
||
|
||
def get_spark_app_logs(container): | ||
spark_app_logs_path = "/home/spark-current/work" | ||
try: | ||
stream, _ = container.get_archive(spark_app_logs_path) | ||
return extract_tar_in_memory(container, stream) | ||
except docker.errors.APIError as e: | ||
return [(container.name + "/" + "spark-work-logs.err", e.__str__())] | ||
|
||
|
||
def filter_members(members): | ||
skip_files = ["id_rsa", "id_rsa.pub", "docker.log"] | ||
skip_extensions = [".pyc", ".zip"] | ||
for tarinfo in members: | ||
if (os.path.splitext(tarinfo.name)[1] not in skip_extensions and | ||
os.path.basename(tarinfo.name) not in skip_files): | ||
yield tarinfo | ||
|
||
|
||
def extract_tar_in_memory(container, data): | ||
data = io.BytesIO(b''.join([item for item in data])) | ||
tarf = tarfile.open(fileobj=data) | ||
logs = [] | ||
for member in filter_members(tarf): | ||
file_bytes = tarf.extractfile(member) | ||
if file_bytes is not None: | ||
logs.append((container.name + "/" + member.name, b''.join(file_bytes.readlines()))) | ||
return logs | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.