diff --git a/.evergreen/config.yml b/.evergreen/config.yml index e024b5a5..8f49d51b 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -44,13 +44,22 @@ functions: working_dir: astrolabe-src command: | git clone --recursive --branch ${DRIVER_REVISION} ${DRIVER_REPOSITORY} - # Install driver. + # Install driver on *nix platforms. - command: subprocess.exec params: working_dir: astrolabe-src - continue_on_err: true # Because drivers are not required to provide this script. + continue_on_err: true # Because script may not exist OR platform may not be *nix. add_expansions_to_env: true - binary: .evergreen/${DRIVER_DIRNAME}/install-driver.sh + command: | + .evergreen/${DRIVER_DIRNAME}/install-driver.sh + # Install driver on Windows. + - command: subprocess.exec + params: + working_dir: astrolabe-src + continue_on_err: true # Because script may not exist OR platform may not be Windows. + add_expansions_to_env: true + command: | + C:/cygwin/bin/sh .evergreen/${DRIVER_DIRNAME}/install-driver.sh "run test": # Run the Atlas Planned Maintenance Test. @@ -65,7 +74,7 @@ functions: ATLAS_API_PASSWORD: ${atlas_secret} add_expansions_to_env: true command: | - astrolabevenv/${PYTHON_BIN_DIR}/astrolabe spec-tests run-one tests/${TEST_NAME}.yaml -e .evergreen/${DRIVER_DIRNAME}/workload-executor.sh + astrolabevenv/${PYTHON_BIN_DIR}/astrolabe spec-tests run-one tests/${TEST_NAME}.yaml "delete test cluster": # Delete the cluster that was used to run the test. @@ -175,22 +184,27 @@ axes: display_name: CPython-2.7 variables: PYTHON_BINARY: "/opt/python/2.7/bin/python" + WORKLOAD_EXECUTOR: "$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python .evergreen/$DRIVER_DIRNAME/workload-executor.py" - id: python36 display_name: CPython-3.6 variables: PYTHON_BINARY: "/opt/python/3.6/bin/python3" + WORKLOAD_EXECUTOR: "$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python .evergreen/$DRIVER_DIRNAME/workload-executor.py" - id: python37 display_name: CPython-3.7 variables: PYTHON_BINARY: "/opt/python/3.7/bin/python3" + WORKLOAD_EXECUTOR: "$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python .evergreen/$DRIVER_DIRNAME/workload-executor.py" - id: python38 display_name: CPython-3.8 variables: PYTHON_BINARY: "/opt/python/3.8/bin/python3" + WORKLOAD_EXECUTOR: "$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python .evergreen/$DRIVER_DIRNAME/workload-executor.py" - id: python37-windows display_name: CPython-3.7-Windows variables: PYTHON_BINARY: "C:/python/Python37/python.exe" + WORKLOAD_EXECUTOR: "$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python.exe .evergreen/$DRIVER_DIRNAME/workload-executor.py" buildvariants: - matrix_name: "tests-python" diff --git a/.evergreen/python/pymongo/workload-executor.py b/.evergreen/python/pymongo/workload-executor.py index d01f46d7..b7c4f702 100644 --- a/.evergreen/python/pymongo/workload-executor.py +++ b/.evergreen/python/pymongo/workload-executor.py @@ -3,6 +3,7 @@ import copy import json import re +import signal import sys import traceback @@ -12,6 +13,27 @@ from bson.py3compat import iteritems +NUM_FAILURES = 0 +NUM_ERRORS = 0 +WIN32 = sys.platform == 'win32' + + +def handler(signum, frame): + global NUM_ERRORS, NUM_FAILURES + print("Caught KeyboardInterrupt. Exiting gracefully.") + print( + json.dumps( + {"numErrors": NUM_ERRORS, "numFailures": NUM_FAILURES}), + file=sys.stderr) + exit(0) + + +if WIN32: + signal.signal(signal.SIGBREAK, handler) +else: + signal.signal(signal.SIGINT, handler) + + def camel_to_snake(camel): # Regex to convert CamelCase to snake_case. snake = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel) @@ -47,9 +69,16 @@ def run_operation(objects, prepared_operation): assert result == expected_result +def connect(srv_address): + if WIN32: + import certifi + return MongoClient(srv_address, tlsCAFile=certifi.where()) + return MongoClient(srv_address) + + def workload_runner(srv_address, workload_spec): # Do not modify connection string and do not add any extra options. - client = MongoClient(srv_address) + client = connect(srv_address) # Create test entities. database = client.get_database(workload_spec["database"]) @@ -58,27 +87,19 @@ def workload_runner(srv_address, workload_spec): # Run operations operations = workload_spec["operations"] - num_failures = 0 - num_errors = 0 + global NUM_FAILURES, NUM_ERRORS ops = [prepare_operation(op) for op in operations] - try: - while True: - try: - for op in ops: - run_operation(objects, op) - except AssertionError: - traceback.print_exc(file=sys.stdout) - num_failures += 1 - except Exception: # Don't catch Keyboard Interrupt here or you can never exit - traceback.print_exc(file=sys.stdout) - num_errors += 1 - except KeyboardInterrupt: - print("Caught KeyboardInterrupt. Exiting gracefully.") - print( - json.dumps( - {"numErrors": num_errors, "numFailures": num_failures}), - file=sys.stderr) + while True: + try: + for op in ops: + run_operation(objects, op) + except AssertionError: + traceback.print_exc(file=sys.stdout) + NUM_FAILURES += 1 + except Exception: # Don't catch Keyboard Interrupt here or you can never exit + traceback.print_exc(file=sys.stdout) + NUM_ERRORS += 1 if __name__ == '__main__': diff --git a/.evergreen/python/pymongo/workload-executor.sh b/.evergreen/python/pymongo/workload-executor.sh index f07724b8..ff99f8f7 100755 --- a/.evergreen/python/pymongo/workload-executor.sh +++ b/.evergreen/python/pymongo/workload-executor.sh @@ -3,4 +3,4 @@ set -o errexit # Exit the script with error if any of the commands fail trap "exit 0" INT -"$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python" ".evergreen/$DRIVER_DIRNAME/workload-executor.py" "$1" "$2" +"$PYMONGO_VIRTUALENV_NAME/$PYTHON_BIN_DIR/python.exe" ".evergreen/$DRIVER_DIRNAME/workload-executor.py" "$1" "$2" diff --git a/astrolabe/cli.py b/astrolabe/cli.py index 410fa73d..8ba1bd4d 100644 --- a/astrolabe/cli.py +++ b/astrolabe/cli.py @@ -70,8 +70,8 @@ help='Frequency (in Hz) at which to poll API endpoints.') WORKLOADEXECUTOR_OPTION = click.option( - '-e', '--workload-executor', required=True, type=click.Path( - exists=True, file_okay=True, dir_okay=False, resolve_path=True), + '-e', '--workload-executor', required=True, type=click.STRING, + envvar="WORKLOAD_EXECUTOR", help='Absolute or relative path to the workload-executor') CLUSTERNAMESALT_OPTION = click.option( diff --git a/astrolabe/spec_runner.py b/astrolabe/spec_runner.py index 196351e1..6d790277 100644 --- a/astrolabe/spec_runner.py +++ b/astrolabe/spec_runner.py @@ -15,13 +15,9 @@ import json import logging import os -import signal -import subprocess -import sys from time import sleep from urllib.parse import urlencode -from pymongo import MongoClient from tabulate import tabulate import junitparser import yaml @@ -34,7 +30,8 @@ from astrolabe.poller import BooleanCallablePoller from astrolabe.utils import ( assert_subset, encode_cdata, get_cluster_name, - get_test_name_from_spec_file, SingleTestXUnitLogger, Timer) + get_test_name_from_spec_file, load_test_data, + DriverWorkloadSubprocessRunner, SingleTestXUnitLogger, Timer) LOGGER = logging.getLogger(__name__) @@ -54,11 +51,8 @@ def __init__(self, *, client, test_name, cluster_name, specification, # Initialize attribute used for memoization of connection string. self.__connection_string = None - # Account for platform-specific interrupt signals. - if sys.platform != 'win32': - self.sigint = signal.SIGINT - else: - self.sigint = signal.CTRL_C_EVENT + # Initialize wrapper class for running workload executor. + self.workload_runner = DriverWorkloadSubprocessRunner() # Validate and store organization and group. self.organization = get_one_organization_by_name( @@ -157,25 +151,18 @@ def run(self, persist_cluster=False): LOGGER.info("Loading test data on cluster {!r}".format( self.cluster_name)) connection_string = self.get_connection_string() - client = MongoClient(connection_string, w="majority") - coll = client.get_database( - self.spec.driverWorkload.database).get_collection( - self.spec.driverWorkload.collection) - coll.drop() - coll.insert_many(test_data) + load_test_data(connection_string, self.spec.driverWorkload) LOGGER.info("Successfully loaded test data on cluster {!r}".format( self.cluster_name)) # Step-2: run driver workload. LOGGER.info("Starting workload executor") - connection_string = self.get_connection_string() - driver_workload = json.dumps(self.spec.driverWorkload) - worker_subprocess = subprocess.Popen([ - self.config.workload_executor, connection_string, - driver_workload], preexec_fn=os.setsid, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.workload_runner.spawn( + workload_executor=self.config.workload_executor, + connection_string=self.get_connection_string(), + driver_workload=json.dumps(self.spec.driverWorkload)) LOGGER.info("Started workload executor [PID: {}]".format( - worker_subprocess.pid)) + self.workload_runner.pid)) # Step-3: begin maintenance routine. final_config = self.spec.maintenancePlan.final @@ -208,11 +195,10 @@ def run(self, persist_cluster=False): # Step-5: interrupt driver workload and capture streams LOGGER.info("Stopping workload executor [PID: {}]".format( - worker_subprocess.pid)) - os.killpg(worker_subprocess.pid, self.sigint) - stdout, stderr = worker_subprocess.communicate(timeout=10) + self.workload_runner.pid)) + stdout, stderr = self.workload_runner.terminate() LOGGER.info("Stopped workload executor [exit code: {}]".format( - worker_subprocess.returncode)) + self.workload_runner.returncode)) # Stop the timer timer.stop() @@ -227,7 +213,7 @@ def run(self, persist_cluster=False): err_info = {'numErrors': -1, 'numFailures': -1} if err_info['numErrors'] or err_info['numFailures'] \ - or worker_subprocess.returncode != 0: + or self.workload_runner.returncode != 0: LOGGER.info("FAILED: {!r}".format(self.id)) # Write xunit logs for failed tests. errmsg = ("Number of errors: {numErrors}\n" diff --git a/astrolabe/utils.py b/astrolabe/utils.py index 68984367..8f140ade 100644 --- a/astrolabe/utils.py +++ b/astrolabe/utils.py @@ -14,12 +14,17 @@ import logging import os +import signal +import subprocess +import sys from hashlib import sha256 from time import monotonic import click import junitparser +from pymongo import MongoClient + class ClickLogHandler(logging.Handler): """Handler for print log statements via Click's echo functionality.""" @@ -116,3 +121,57 @@ def get_cluster_name(test_name, name_salt): name_hash = sha256(test_name.encode('utf-8')) name_hash.update(name_salt.encode('utf-8')) return name_hash.hexdigest()[:10] + + +def load_test_data(connection_string, driver_workload): + """Insert the test data into the cluster.""" + kwargs = {'w': "majority"} + try: + import certifi + kwargs['tlsCAFile'] = certifi.where() + except ImportError: + pass + + client = MongoClient(connection_string, **kwargs) + coll = client.get_database( + driver_workload.database).get_collection( + driver_workload.collection) + coll.drop() + coll.insert(driver_workload.testData) + + +class DriverWorkloadSubprocessRunner: + """Convenience wrapper to run a workload executor in a subprocess.""" + def __init__(self): + self.is_windows = False + if sys.platform in ("win32", "cygwin"): + self.is_windows = True + self.workload_subprocess = None + + @property + def pid(self): + return self.workload_subprocess.pid + + @property + def returncode(self): + return self.workload_subprocess.returncode + + def spawn(self, *, workload_executor, connection_string, driver_workload): + args = workload_executor.split() + args.extend([connection_string, driver_workload]) + if not self.is_windows: + self.workload_subprocess = subprocess.Popen( + args, preexec_fn=os.setsid, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + else: + self.workload_subprocess = subprocess.Popen( + args, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return self.workload_subprocess + + def terminate(self): + if not self.is_windows: + os.killpg(self.workload_subprocess.pid, signal.SIGINT) + else: + os.kill(self.workload_subprocess.pid, signal.CTRL_BREAK_EVENT) + return self.workload_subprocess.communicate(timeout=10) diff --git a/setup.py b/setup.py index aed86271..5e4f12d6 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ import io import os +import sys from setuptools import setup @@ -21,6 +22,16 @@ readme_content = '' +# Dynamically generate requirements. +install_requires = [ + 'click>=7,<8', 'requests>=2,<3', + 'pymongo>=3.10,<4', 'dnspython>=1.16,<2', + 'pyyaml>=5,<6', 'tabulate>=0.8,<0.9', + 'junitparser>=1,<2'] +if sys.platform in ("win32", "cygwin"): + install_requires.append("certifi") + + setup( name='astrolabe', version=version['__version__'], @@ -34,14 +45,7 @@ license="Apache License, Version 2.0", python_requires=">=3.5", packages=["atlasclient", "astrolabe"], - install_requires=[ - 'click>=7,<8', - 'requests>=2,<3', - 'pymongo>=3.10,<4', - 'dnspython>=1.16,<2', - 'pyyaml>=5,<6', - 'tabulate>=0.8,<0.9', - 'junitparser>=1,<2'], + install_requires=install_requires, entry_points={ 'console_scripts': ['astrolabe=astrolabe.cli:cli']}, classifiers=[