Skip to content

Commit

Permalink
overhaul subprocess runner
Browse files Browse the repository at this point in the history
  • Loading branch information
prashantmital committed Apr 10, 2020
1 parent afa8ba4 commit 72f5e74
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 21 deletions.
32 changes: 11 additions & 21 deletions astrolabe/spec_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
import json
import logging
import os
import signal
import subprocess
import sys
from time import sleep
from urllib.parse import urlencode

from pymongo import MongoClient
from tabulate import tabulate
import junitparser
import yaml
Expand All @@ -34,8 +30,8 @@
from astrolabe.poller import BooleanCallablePoller
from astrolabe.utils import (
assert_subset, encode_cdata, get_cluster_name,
get_test_name_from_spec_file, load_test_data, SingleTestXUnitLogger,
Timer)
get_test_name_from_spec_file, load_test_data,
DriverWorkloadSubprocessRunner, SingleTestXUnitLogger, Timer)


LOGGER = logging.getLogger(__name__)
Expand All @@ -55,11 +51,8 @@ def __init__(self, *, client, test_name, cluster_name, specification,
# Initialize attribute used for memoization of connection string.
self.__connection_string = None

# Account for platform-specific interrupt signals.
if sys.platform != 'win32':
self.sigint = signal.SIGINT
else:
self.sigint = signal.CTRL_C_EVENT
# Initialize wrapper class for running workload executor.
self.workload_runner = DriverWorkloadSubprocessRunner()

# Validate and store organization and group.
self.organization = get_one_organization_by_name(
Expand Down Expand Up @@ -166,12 +159,10 @@ def run(self, persist_cluster=False):
LOGGER.info("Starting workload executor")
connection_string = self.get_connection_string()
driver_workload = json.dumps(self.spec.driverWorkload)
worker_subprocess = subprocess.Popen([
self.config.workload_executor, connection_string,
driver_workload], preexec_fn=os.setsid,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.workload_runner.spawn(
self.config.workload_executor, connection_string, driver_workload)
LOGGER.info("Started workload executor [PID: {}]".format(
worker_subprocess.pid))
self.workload_runner.pid))

# Step-3: begin maintenance routine.
final_config = self.spec.maintenancePlan.final
Expand Down Expand Up @@ -204,11 +195,10 @@ def run(self, persist_cluster=False):

# Step-5: interrupt driver workload and capture streams
LOGGER.info("Stopping workload executor [PID: {}]".format(
worker_subprocess.pid))
os.killpg(worker_subprocess.pid, self.sigint)
stdout, stderr = worker_subprocess.communicate(timeout=10)
self.workload_runner.pid))
stdout, stderr = self.workload_runner.terminate()
LOGGER.info("Stopped workload executor [exit code: {}]".format(
worker_subprocess.returncode))
self.workload_runner.returncode))

# Stop the timer
timer.stop()
Expand All @@ -223,7 +213,7 @@ def run(self, persist_cluster=False):
err_info = {'numErrors': -1, 'numFailures': -1}

if err_info['numErrors'] or err_info['numFailures'] \
or worker_subprocess.returncode != 0:
or self.workload_runner.returncode != 0:
LOGGER.info("FAILED: {!r}".format(self.id))
# Write xunit logs for failed tests.
errmsg = ("Number of errors: {numErrors}\n"
Expand Down
42 changes: 42 additions & 0 deletions astrolabe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

import logging
import os
import signal
import subprocess
import sys
from functools import partial
from hashlib import sha256
from time import monotonic

Expand Down Expand Up @@ -135,3 +139,41 @@ def load_test_data(connection_string, driver_workload):
driver_workload.collection)
coll.drop()
coll.insert(driver_workload.testData)


class DriverWorkloadSubprocessRunner:
"""Convenience wrapper to run a workload executor in a subprocess."""
def __init__(self):
self.is_windows = False
if sys.platform in ("win32", "cygwin"):
self.is_windows = True
self.workload_subprocess = None

@property
def pid(self):
return self.workload_subprocess.pid

@property
def returncode(self):
return self.workload_subprocess.returncode

def spawn(self, *, workload_executor, connection_string, driver_workload):
if not self.is_windows:
self.workload_subprocess = subprocess.Popen([
workload_executor, connection_string, driver_workload],
preexec_fn=os.setsid, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
self.workload_subprocess = subprocess.Popen([
workload_executor, connection_string, driver_workload],
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return self.workload_subprocess

def terminate(self):
if not self.is_windows:
os.killpg(self.workload_subprocess.pid, signal.SIGINT)
else:
os.kill(self.workload_subprocess.pid, signal.CTRL_C_EVENT)
stdout, stderr = self.workload_subprocess.communicate(timeout=10)
return stdout, stderr

0 comments on commit 72f5e74

Please sign in to comment.